You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/21 08:10:38 UTC

[1/2] kafka git commit: KAFKA-5922: Add SessionWindowedKStream

Repository: kafka
Updated Branches:
  refs/heads/trunk b12ba240e -> a2da064cb


http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
new file mode 100644
index 0000000..cc6ca05
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements TimeWindowedKStream<K, V> {
+
+    private final Windows<W> windows;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+    TimeWindowedKStreamImpl(final Windows<W> windows,
+                            final InternalStreamsBuilder builder,
+                            final Set<String> sourceNodes,
+                            final String name,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde,
+                            final boolean repartitionRequired) {
+        super(builder, name, sourceNodes);
+        Objects.requireNonNull(windows, "windows can't be null");
+        this.valSerde = valSerde;
+        this.keySerde = keySerde;
+        this.windows = windows;
+        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return doAggregate(
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator,
+                Serdes.Long());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
+    }
+
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        return doAggregate(initializer, aggregator, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
+                                                     final Aggregator<? super K, ? super V, VR> aggregator,
+                                                     final Serde<VR> serde) {
+        final String storeName = builder.newStoreName(AGGREGATE_NAME);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
+                                                                AGGREGATE_NAME,
+                                                                windowStoreBuilder(storeName, serde),
+                                                                false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
+                                                                                             materializedInternal.storeName(),
+                                                                                             initializer,
+                                                                                             aggregator),
+                                                                AGGREGATE_NAME,
+                                                                materialize(materializedInternal),
+                                                                true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        final String storeName = builder.newStoreName(REDUCE_NAME);
+        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
+                                                               REDUCE_NAME,
+                                                               windowStoreBuilder(storeName, valSerde),
+                                                               true);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+
+        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
+                                                               REDUCE_NAME,
+                                                               materialize(materializedInternal),
+                                                               false);
+    }
+
+    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            supplier = Stores.persistentWindowStore(materialized.storeName(),
+                                                    windows.maintainMs(),
+                                                    windows.segments,
+                                                    windows.size(),
+                                                    false);
+        }
+        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
+                                                                                   materialized.keySerde(),
+                                                                                   materialized.valueSerde());
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+        return builder;
+    }
+
+
+    private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+        return Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(
+                        storeName,
+                        windows.maintainMs(),
+                        windows.segments,
+                        windows.size(),
+                        false),
+                keySerde,
+                aggValueSerde).withCachingEnabled();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
deleted file mode 100644
index 3992a79..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
-import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.util.Objects;
-import java.util.Set;
-
-import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
-import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
-
-public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
-
-    private final Windows<W> windows;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
-    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
-
-    WindowedKStreamImpl(final Windows<W> windows,
-                        final InternalStreamsBuilder builder,
-                        final Set<String> sourceNodes,
-                        final String name,
-                        final Serde<K> keySerde,
-                        final Serde<V> valSerde,
-                        final boolean repartitionRequired) {
-        super(builder, name, sourceNodes);
-        Objects.requireNonNull(windows, "windows can't be null");
-        this.valSerde = valSerde;
-        this.keySerde = keySerde;
-        this.windows = windows;
-        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
-    }
-
-    @Override
-    public KTable<Windowed<K>, Long> count() {
-        return doAggregate(
-                aggregateBuilder.countInitializer,
-                aggregateBuilder.countAggregator,
-                Serdes.Long());
-    }
-
-    @Override
-    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
-        Objects.requireNonNull(materialized, "materialized can't be null");
-        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
-    }
-
-
-    @Override
-    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                                  final Aggregator<? super K, ? super V, VR> aggregator) {
-        Objects.requireNonNull(initializer, "initializer can't be null");
-        Objects.requireNonNull(aggregator, "aggregator can't be null");
-        return doAggregate(initializer, aggregator, null);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
-                                                     final Aggregator<? super K, ? super V, VR> aggregator,
-                                                     final Serde<VR> serde) {
-        final String storeName = builder.newStoreName(AGGREGATE_NAME);
-        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
-                                                                AGGREGATE_NAME,
-                                                                windowStoreBuilder(storeName, serde),
-                                                                false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                                  final Aggregator<? super K, ? super V, VR> aggregator,
-                                                  final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
-        Objects.requireNonNull(initializer, "initializer can't be null");
-        Objects.requireNonNull(aggregator, "aggregator can't be null");
-        Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
-                                                                                             materializedInternal.storeName(),
-                                                                                             initializer,
-                                                                                             aggregator),
-                                                                AGGREGATE_NAME,
-                                                                materialize(materializedInternal),
-                                                                true);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
-        Objects.requireNonNull(reducer, "reducer can't be null");
-        final String storeName = builder.newStoreName(REDUCE_NAME);
-        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
-                                                               REDUCE_NAME,
-                                                               windowStoreBuilder(storeName, valSerde),
-                                                               true);
-
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
-        Objects.requireNonNull(reducer, "reducer can't be null");
-        Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-
-        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
-                                                               REDUCE_NAME,
-                                                               materialize(materializedInternal),
-                                                               false);
-    }
-
-    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
-        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
-        if (supplier == null) {
-            supplier = Stores.persistentWindowStore(materialized.storeName(),
-                                                    windows.maintainMs(),
-                                                    windows.segments,
-                                                    windows.size(),
-                                                    false);
-        }
-        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
-                                                                                   materialized.keySerde(),
-                                                                                   materialized.valueSerde());
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        if (materialized.cachingEnabled()) {
-            builder.withCachingEnabled();
-        }
-        return builder;
-    }
-
-
-    private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
-        return Stores.windowStoreBuilder(
-                Stores.persistentWindowStore(
-                        storeName,
-                        windows.maintainMs(),
-                        windows.segments,
-                        windows.size(),
-                        false),
-                keySerde,
-                aggValueSerde).withCachingEnabled();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 81e8ef7..350facf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -521,7 +521,7 @@ public class KStreamAggregationIntegrationTest {
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
+                .count(SessionWindows.with(sessionGap).until(maintainMillis))
                 .toStream()
                 .foreach(new ForeachAction<Windowed<String>, Long>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index efa027c..c8e011e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -257,7 +257,6 @@ public class KGroupedStreamImplTest {
         });
 
         doAggregateSessionWindows(results);
-        assertNull(table.queryableStoreName());
     }
 
     private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
new file mode 100644
index 0000000..042f0f1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class SessionWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Merger<String, String> sessionMerger = new Merger<String, String>() {
+        @Override
+        public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+            return aggOne + "+" + aggTwo;
+        }
+    };
+    private SessionWindowedKStream<String, String> stream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        this.stream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SessionWindows.with(500));
+    }
+
+    @Test
+    public void shouldCountSessionWindowed() {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        stream.count()
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
+    }
+
+    @Test
+    public void shouldReduceWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        stream.reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
+        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
+    }
+
+    @Test
+    public void shouldAggregateSessionWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+        processData();
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
+        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
+        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeCount() {
+        stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store")
+                             .withKeySerde(Serdes.String())
+                             .withValueSerde(Serdes.Long()));
+
+        processData();
+        final SessionStore<String, Long> store = (SessionStore<String, Long>) driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeReduced() {
+        stream.reduce(MockReducer.STRING_ADDER,
+                      Materialized.<String, String, SessionStore<Bytes, byte[]>>as("reduced")
+                              .withKeySerde(Serdes.String())
+                              .withValueSerde(Serdes.String()));
+
+        processData();
+        final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("reduced");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeAggregated() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated")
+                                 .withKeySerde(Serdes.String())
+                                 .withValueSerde(Serdes.String()));
+
+        processData();
+        final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("aggregated");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        stream.aggregate(null, MockAggregator.TOSTRING_ADDER, sessionMerger);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        stream.reduce(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        stream.aggregate(null,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         null,
+                         sessionMerger,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         null,
+                         Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        stream.aggregate(MockInitializer.STRING_INIT,
+                         MockAggregator.TOSTRING_ADDER,
+                         sessionMerger,
+                         (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        stream.reduce(null,
+                      Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        stream.reduce(MockReducer.STRING_ADDER,
+                      null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        stream.count(null);
+    }
+
+    private void processData() {
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "1", "2");
+        driver.setTime(600);
+        driver.process(TOPIC, "1", "3");
+        driver.process(TOPIC, "2", "1");
+        driver.flushState();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
new file mode 100644
index 0000000..93bcf33
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TimeWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+        windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+                .windowedBy(TimeWindows.of(500L));
+    }
+
+    @Test
+    public void shouldCountWindowed() {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        windowedStream.count()
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
+    }
+
+
+    @Test
+    public void shouldReduceWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        windowedStream.reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
+    }
+
+    @Test
+    public void shouldAggregateWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER
+        )
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+                                     .withKeySerde(Serdes.String())
+                                     .withValueSerde(Serdes.Long()));
+
+        processData();
+        final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(MockReducer.STRING_ADDER,
+                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+                                      .withKeySerde(Serdes.String())
+                                      .withValueSerde(Serdes.String()));
+
+        processData();
+        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeAggregated() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+                                         .withKeySerde(Serdes.String())
+                                         .withValueSerde(Serdes.String()));
+
+        processData();
+        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        windowedStream.reduce(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        windowedStream.aggregate(null,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 null,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        windowedStream.reduce(null,
+                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        windowedStream.reduce(MockReducer.STRING_ADDER,
+                              null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        windowedStream.count(null);
+    }
+
+    private void processData() {
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "1", "2");
+        driver.setTime(500);
+        driver.process(TOPIC, "1", "3");
+        driver.process(TOPIC, "2", "1");
+        driver.flushState();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
deleted file mode 100644
index f5de96f..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Serialized;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockAggregator;
-import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-public class WindowedKStreamImplTest {
-
-    private static final String TOPIC = "input";
-    private final StreamsBuilder builder = new StreamsBuilder();
-
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private WindowedKStream<String, String> windowedStream;
-
-    @Before
-    public void before() {
-        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-        windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                .windowedBy(TimeWindows.of(500L));
-    }
-
-    @Test
-    public void shouldCountWindowed() {
-        final Map<Windowed<String>, Long> results = new HashMap<>();
-        windowedStream.count()
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Long value) {
-                        results.put(key, value);
-                    }
-                });
-
-        processData();
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
-    }
-
-
-    @Test
-    public void shouldReduceWindowed() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
-        windowedStream.reduce(MockReducer.STRING_ADDER)
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
-
-        processData();
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
-        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
-    }
-
-    @Test
-    public void shouldAggregateWindowed() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER
-        )
-                .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value) {
-                        results.put(key, value);
-                    }
-                });
-        processData();
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
-        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
-        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldMaterializeCount() {
-        windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
-                                     .withKeySerde(Serdes.String())
-                                     .withValueSerde(Serdes.Long()));
-
-        processData();
-        final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
-        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldMaterializeReduced() {
-        windowedStream.reduce(MockReducer.STRING_ADDER,
-                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
-                                      .withKeySerde(Serdes.String())
-                                      .withValueSerde(Serdes.String()));
-
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldMaterializeAggregated() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
-                                         .withKeySerde(Serdes.String())
-                                         .withValueSerde(Serdes.String()));
-
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
-        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT, null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
-        windowedStream.reduce(null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
-        windowedStream.aggregate(null,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 null,
-                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 (Materialized) null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
-        windowedStream.reduce(null,
-                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
-        windowedStream.reduce(MockReducer.STRING_ADDER,
-                              null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
-        windowedStream.count(null);
-    }
-
-    private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "1", "2");
-        driver.setTime(500);
-        driver.process(TOPIC, "1", "3");
-        driver.process(TOPIC, "2", "1");
-        driver.flushState();
-    }
-
-}
\ No newline at end of file


[2/2] kafka git commit: KAFKA-5922: Add SessionWindowedKStream

Posted by gu...@apache.org.
KAFKA-5922: Add SessionWindowedKStream

Add `SessionWindowedKStream` and implementation. Deprecate existing `SessionWindow` `aggregate` methods on `KGroupedStream`

Author: Damian Guy <da...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3902 from dguy/kafka-5922


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2da064c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2da064c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2da064c

Branch: refs/heads/trunk
Commit: a2da064cbf01558d0af64adc9d6fc9444cd744ec
Parents: b12ba24
Author: Damian Guy <da...@gmail.com>
Authored: Thu Sep 21 16:10:17 2017 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Sep 21 16:10:17 2017 +0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   |  41 ++-
 .../streams/kstream/SessionWindowedKStream.java | 268 ++++++++++++++++++
 .../streams/kstream/TimeWindowedKStream.java    | 276 +++++++++++++++++++
 .../kafka/streams/kstream/WindowedKStream.java  | 276 -------------------
 .../kstream/internals/KGroupedStreamImpl.java   |  49 ++--
 .../internals/SessionWindowedKStreamImpl.java   | 199 +++++++++++++
 .../internals/TimeWindowedKStreamImpl.java      | 179 ++++++++++++
 .../kstream/internals/WindowedKStreamImpl.java  | 179 ------------
 .../KStreamAggregationIntegrationTest.java      |   2 +-
 .../internals/KGroupedStreamImplTest.java       |   1 -
 .../SessionWindowedKStreamImplTest.java         | 264 ++++++++++++++++++
 .../internals/TimeWindowedKStreamImplTest.java  | 241 ++++++++++++++++
 .../internals/WindowedKStreamImplTest.java      | 241 ----------------
 13 files changed, 1498 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 5621ab4..1ff1759 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -84,7 +84,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
+     * @deprecated use {@link #count(Materialized)
      */
+    @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
 
     /**
@@ -143,7 +145,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
+     * @deprecated use {@link #count(Materialized)}
      */
+    @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
@@ -222,8 +226,10 @@ public interface KGroupedStream<K, V> {
      * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
-     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window.
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                        final String queryableStoreName);
 
@@ -298,7 +304,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                        final StateStoreSupplier<WindowStore> storeSupplier);
 
@@ -337,7 +345,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
 
     /**
@@ -359,7 +369,9 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
 
     /**
@@ -395,7 +407,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier  user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
                                     final StateStoreSupplier<SessionStore> storeSupplier);
 
@@ -842,7 +856,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
                                   final String queryableStoreName);
@@ -875,7 +891,9 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows);
 
@@ -939,7 +957,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier     user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
                                   final StateStoreSupplier<SessionStore> storeSupplier);
@@ -1433,7 +1453,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                          final Aggregator<? super K, ? super V, T> aggregator,
                                          final Merger<? super K, T> sessionMerger,
@@ -1476,7 +1498,9 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                          final Aggregator<? super K, ? super V, T> aggregator,
                                          final Merger<? super K, T> sessionMerger,
@@ -1533,7 +1557,9 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                          final Aggregator<? super K, ? super V, T> aggregator,
                                          final Merger<? super K, T> sessionMerger,
@@ -1542,11 +1568,18 @@ public interface KGroupedStream<K, V> {
                                          final StateStoreSupplier<SessionStore> storeSupplier);
 
     /**
-     * Create a new {@link WindowedKStream} instance that can be used to perform windowed aggregations.
+     * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.
      * @param windows the specification of the aggregation {@link Windows}
      * @param <W>     the window type
-     * @return an instance of {@link WindowedKStream}
+     * @return an instance of {@link TimeWindowedKStream}
+     */
+    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows);
+
+    /**
+     * Create a new {@link SessionWindowedKStream} instance that can be used to perform session windowed aggregations.
+     * @param windows the specification of the aggregation {@link SessionWindows}
+     * @return an instance of {@link TimeWindowedKStream}
      */
-    <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows);
+    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
new file mode 100644
index 0000000..d8044ac
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.SessionStore;
+
+/**
+ * {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
+ * <p>
+ * {@link SessionWindows} are dynamic data driven windows.
+ * They have no fixed time boundaries, rather the size of the window is determined by the records.
+ * Please see {@link SessionWindows} for more details.
+ * <p>
+ * {@link SessionWindows} are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ *
+ * Furthermore, updates are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(SessionWindows)} .
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ * @see SessionWindows
+ */
+public interface SessionWindowedKStream<K, V> {
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     *
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     */
+    KTable<Windowed<K>, Long> count();
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the name provided with {@link Materialized}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     */
+    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via
+     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap,
+     * they are merged into a single session and the old sessions are discarded.
+     * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute
+     * aggregate functions like count (c.f. {@link #count()})
+     * <p>
+     * The default value serde from config will be used for serializing the result.
+     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                         final Aggregator<? super K, ? super V, T> aggregator,
+                                         final Merger<? super K, T> sessionMerger);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the name provided with {@link Materialized}.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via
+     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap,
+     * they are merged into a single session and the old sessions are discarded.
+     * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute
+     * aggregate functions like count (c.f. {@link #count()})
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param materialized   an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @param <VR>           the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator,
+                                           final Merger<? super K, VR> sessionMerger,
+                                           final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}).
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view).
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer)} can be used to compute aggregate functions like sum, min,
+     * or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     *
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}).
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
+     * provided by the given {@link Materialized} instance.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
+     * sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
+     * alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @param materialized     an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
new file mode 100644
index 0000000..433f4e7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * {@code TimeWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
+ * {@link KStream} records.
+ * <p>
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
+ * <p>
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+
+ * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ */
+public interface TimeWindowedKStream<K, V> {
+
+    /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<Windowed<K>, Long> count();
+
+    /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     *
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count()}).
+     * <p>
+     * The default value serde from config will be used for serializing the result.
+     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the store name as provided with {@link Materialized}.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count()}).
+     * <p>
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     *
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator,
+                                           final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the store name as provided with {@link Materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     *
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
deleted file mode 100644
index 35e0eeb..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.WindowStore;
-
-/**
- * {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
- * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
- * {@link KStream} records.
- * <p>
- * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
- * new (partitioned) windows resulting in a windowed {@link KTable}
- * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
- * <p>
- * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
- * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
- * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
- * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
- * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
- * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
- * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
-
- * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
- *
- * @param <K> Type of keys
- * @param <V> Type of values
- * @see KStream
- * @see KGroupedStream
- */
-public interface WindowedKStream<K, V> {
-
-    /**
-     * Count the number of records in this stream by the grouped key and the defined windows.
-     * Records with {@code null} key or value are ignored.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
-     * the same window and key.
-     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * <p>
-     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
-     * user-specified in {@link StreamsConfig StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
-     * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
-     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
-     * represent the latest (rolling) count (i.e., number of records) for each key
-     */
-    KTable<Windowed<K>, Long> count();
-
-    /**
-     * Count the number of records in this stream by the grouped key and the defined windows.
-     * Records with {@code null} key or value are ignored.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
-     * the same window and key if caching is enabled on the {@link Materialized} instance.
-     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
-     *
-     * <p>
-     * To query the local windowed {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
-     *
-     * String key = "some-word";
-     * long fromTime = ...;
-     * long toTime = ...;
-     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
-     * represent the latest (rolling) count (i.e., number of records) for each key
-     */
-    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
-
-    /**
-     * Aggregate the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
-     * allows the result to have a different type than the input values.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code queryableStoreName}.
-     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
-     * <p>
-     * The specified {@link Initializer} is applied once directly before the first input record is processed to
-     * provide an initial intermediate aggregation result that is used to process the first record.
-     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
-     * aggregate (or for the very first record using the intermediate aggregation result provided via the
-     * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count()}).
-     * <p>
-     * The default value serde from config will be used for serializing the result.
-     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
-     * the same key.
-     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * <p>
-     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
-     * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
-     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     *
-     * @param <VR>          the value type of the resulting {@link KTable}
-     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
-     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                           final Aggregator<? super K, ? super V, VR> aggregator);
-
-    /**
-     * Aggregate the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
-     * allows the result to have a different type than the input values.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the store name as provided with {@link Materialized}.
-     * <p>
-     * The specified {@link Initializer} is applied once directly before the first input record is processed to
-     * provide an initial intermediate aggregation result that is used to process the first record.
-     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
-     * aggregate (or for the very first record using the intermediate aggregation result provided via the
-     * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count()}).
-     * <p>
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
-     * the same window and key if caching is enabled on the {@link Materialized} instance.
-     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
-     *
-     * <p>
-     * To query the local windowed {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
-     *
-     * String key = "some-word";
-     * long fromTime = ...;
-     * long toTime = ...;
-     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     *
-     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
-     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
-     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
-     * @param <VR>          the value type of the resulting {@link KTable}
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                           final Aggregator<? super K, ? super V, VR> aggregator,
-                                           final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
-
-    /**
-     * Combine the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Combining implies that the type of the aggregate result is the same as the type of the input value.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code queryableStoreName}.
-     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
-     * <p>
-     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
-     * aggregate and the record's value.
-     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
-     * value as-is.
-     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
-     * the same key.
-     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * <p>
-     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
-     * and "-changelog" is a fixed suffix.
-     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
-
-    /**
-     * Combine the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Combining implies that the type of the aggregate result is the same as the type of the input value.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the store name as provided with {@link Materialized}.
-     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
-     * <p>
-     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
-     * aggregate and the record's value.
-     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
-     * value as-is.
-     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
-     * the same window and key if caching is enabled on the {@link Materialized} instance.
-     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
-     * <p>
-     * To query the local windowed {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
-     *
-     * String key = "some-word";
-     * long fromTime = ...;
-     * long toTime = ...;
-     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     *
-     *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
-                                  final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index ba037f5..4943314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -26,10 +26,11 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -315,7 +316,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 final Merger<? super K, T> sessionMerger,
                                                 final SessionWindows sessionWindows,
                                                 final Serde<T> aggValueSerde) {
-        return aggregate(initializer, aggregator, sessionMerger, sessionWindows, aggValueSerde, (String) null);
+        return windowedBy(sessionWindows).aggregate(initializer,
+                                                    aggregator,
+                                                    sessionMerger,
+                                                    Materialized.<K, T, SessionStore<Bytes, byte[]>>as(builder.newStoreName(AGGREGATE_NAME))
+                                                            .withKeySerde(keySerde)
+                                                            .withValueSerde(aggValueSerde));
     }
 
     @SuppressWarnings("unchecked")
@@ -340,26 +346,37 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @Override
-    public <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows) {
-        return new WindowedKStreamImpl<>(windows,
-                                         builder,
-                                         sourceNodes,
-                                         name,
-                                         keySerde,
-                                         valSerde,
-                                         repartitionRequired);
+    public <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows) {
+        return new TimeWindowedKStreamImpl<>(windows,
+                                             builder,
+                                             sourceNodes,
+                                             name,
+                                             keySerde,
+                                             valSerde,
+                                             repartitionRequired);
+    }
+
+    @Override
+    public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
+        return new SessionWindowedKStreamImpl<>(windows,
+                                                builder,
+                                                sourceNodes,
+                                                name,
+                                                keySerde,
+                                                valSerde,
+                                                aggregateBuilder);
     }
 
     @SuppressWarnings("unchecked")
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
-        determineIsQueryable(queryableStoreName);
-        return count(sessionWindows,
-                     storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME))
-                             .sessionWindowed(sessionWindows.maintainMs()).build());
+        Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
+                .withKeySerde(keySerde)
+                .withValueSerde(Serdes.Long());
+        return windowedBy(sessionWindows).count(materialized);
     }
 
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
-        return count(sessionWindows, (String) null);
+        return windowedBy(sessionWindows).count();
     }
 
     @Override
@@ -399,7 +416,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows) {
 
-        return reduce(reducer, sessionWindows, (String) null);
+        return windowedBy(sessionWindows).reduce(reducer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
new file mode 100644
index 0000000..0603853
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+
+public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> {
+    private final SessionWindows windows;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+    private final Merger<K, Long> countMerger = new Merger<K, Long>() {
+        @Override
+        public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
+            return aggOne + aggTwo;
+        }
+    };
+    private final Initializer<V> reduceInitializer = new Initializer<V>() {
+        @Override
+        public V apply() {
+            return null;
+        }
+    };
+
+
+    SessionWindowedKStreamImpl(final SessionWindows windows,
+                               final InternalStreamsBuilder builder,
+                               final Set<String> sourceNodes,
+                               final String name,
+                               final Serde<K> keySerde,
+                               final Serde<V> valSerde,
+                               final GroupedStreamAggregateBuilder<K, V> aggregateBuilder) {
+        super(builder, name, sourceNodes);
+        this.windows = windows;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return doAggregate(aggregateBuilder.countInitializer,
+                           aggregateBuilder.countAggregator,
+                           countMerger,
+                           Serdes.Long());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return aggregate(aggregateBuilder.countInitializer,
+                         aggregateBuilder.countAggregator,
+                         countMerger,
+                         materialized);
+    }
+
+    @Override
+    public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                final Aggregator<? super K, ? super V, T> aggregator,
+                                                final Merger<? super K, T> sessionMerger) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+        return doAggregate(initializer, aggregator, sessionMerger, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Merger<? super K, VR> sessionMerger,
+                                                  final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(
+                new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger),
+                AGGREGATE_NAME,
+                materialize(materializedInternal),
+                true);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        return doAggregate(reduceInitializer, aggregatorForReducer(reducer), mergerForAggregator(aggregatorForReducer(reducer)), valSerde);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer);
+        return aggregate(reduceInitializer, reduceAggregator, mergerForAggregator(reduceAggregator), materialized);
+    }
+
+
+    private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
+        SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            supplier = Stores.persistentSessionStore(materialized.storeName(),
+                                                     windows.maintainMs());
+        }
+        final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(supplier,
+                                                                                     materialized.keySerde(),
+                                                                                     materialized.valueSerde());
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+        return builder;
+    }
+
+    private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> aggregator) {
+        return new Merger<K, V>() {
+            @Override
+            public V apply(final K aggKey, final V aggOne, final V aggTwo) {
+                return aggregator.apply(aggKey, aggTwo, aggOne);
+            }
+        };
+    }
+
+    private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
+        return new Aggregator<K, V, V>() {
+            @Override
+            public V apply(final K aggKey, final V value, final V aggregate) {
+                if (aggregate == null) {
+                    return value;
+                }
+                return reducer.apply(aggregate, value);
+            }
+        };
+    }
+
+    private <VR> StoreBuilder<SessionStore<K, VR>> storeBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+        return Stores.sessionStoreBuilder(
+                Stores.persistentSessionStore(
+                        storeName,
+                        windows.maintainMs()),
+                keySerde,
+                aggValueSerde).withCachingEnabled();
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
+                                                     final Aggregator<? super K, ? super V, VR> aggregator,
+                                                     final Merger<? super K, VR> merger,
+                                                     final Serde<VR> serde) {
+        final String storeName = builder.newStoreName(AGGREGATE_NAME);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamSessionWindowAggregate<>(windows, storeName, initializer, aggregator, merger),
+                                                                AGGREGATE_NAME,
+                                                                storeBuilder(storeName, serde),
+                                                                false);
+    }
+}