You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/21 08:10:38 UTC
[1/2] kafka git commit: KAFKA-5922: Add SessionWindowedKStream
Repository: kafka
Updated Branches:
refs/heads/trunk b12ba240e -> a2da064cb
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
new file mode 100644
index 0000000..cc6ca05
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements TimeWindowedKStream<K, V> {
+
+ private final Windows<W> windows;
+ private final Serde<K> keySerde;
+ private final Serde<V> valSerde;
+ private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+
+ TimeWindowedKStreamImpl(final Windows<W> windows,
+ final InternalStreamsBuilder builder,
+ final Set<String> sourceNodes,
+ final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final boolean repartitionRequired) {
+ super(builder, name, sourceNodes);
+ Objects.requireNonNull(windows, "windows can't be null");
+ this.valSerde = valSerde;
+ this.keySerde = keySerde;
+ this.windows = windows;
+ this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
+ }
+
+ @Override
+ public KTable<Windowed<K>, Long> count() {
+ return doAggregate(
+ aggregateBuilder.countInitializer,
+ aggregateBuilder.countAggregator,
+ Serdes.Long());
+ }
+
+ @Override
+ public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
+ }
+
+
+ @Override
+ public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ return doAggregate(initializer, aggregator, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Serde<VR> serde) {
+ final String storeName = builder.newStoreName(AGGREGATE_NAME);
+ return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
+ AGGREGATE_NAME,
+ windowStoreBuilder(storeName, serde),
+ false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
+ materializedInternal.storeName(),
+ initializer,
+ aggregator),
+ AGGREGATE_NAME,
+ materialize(materializedInternal),
+ true);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ final String storeName = builder.newStoreName(REDUCE_NAME);
+ return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
+ REDUCE_NAME,
+ windowStoreBuilder(storeName, valSerde),
+ true);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+
+ return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
+ REDUCE_NAME,
+ materialize(materializedInternal),
+ false);
+ }
+
+ private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+ WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+ if (supplier == null) {
+ supplier = Stores.persistentWindowStore(materialized.storeName(),
+ windows.maintainMs(),
+ windows.segments,
+ windows.size(),
+ false);
+ }
+ final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ if (materialized.cachingEnabled()) {
+ builder.withCachingEnabled();
+ }
+ return builder;
+ }
+
+
+ private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+ return Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(
+ storeName,
+ windows.maintainMs(),
+ windows.segments,
+ windows.size(),
+ false),
+ keySerde,
+ aggValueSerde).withCachingEnabled();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
deleted file mode 100644
index 3992a79..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
-import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.util.Objects;
-import java.util.Set;
-
-import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
-import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
-
-public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
-
- private final Windows<W> windows;
- private final Serde<K> keySerde;
- private final Serde<V> valSerde;
- private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
-
- WindowedKStreamImpl(final Windows<W> windows,
- final InternalStreamsBuilder builder,
- final Set<String> sourceNodes,
- final String name,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final boolean repartitionRequired) {
- super(builder, name, sourceNodes);
- Objects.requireNonNull(windows, "windows can't be null");
- this.valSerde = valSerde;
- this.keySerde = keySerde;
- this.windows = windows;
- this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
- }
-
- @Override
- public KTable<Windowed<K>, Long> count() {
- return doAggregate(
- aggregateBuilder.countInitializer,
- aggregateBuilder.countAggregator,
- Serdes.Long());
- }
-
- @Override
- public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
- Objects.requireNonNull(materialized, "materialized can't be null");
- return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
- }
-
-
- @Override
- public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> aggregator) {
- Objects.requireNonNull(initializer, "initializer can't be null");
- Objects.requireNonNull(aggregator, "aggregator can't be null");
- return doAggregate(initializer, aggregator, null);
- }
-
- @SuppressWarnings("unchecked")
- private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> aggregator,
- final Serde<VR> serde) {
- final String storeName = builder.newStoreName(AGGREGATE_NAME);
- return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
- AGGREGATE_NAME,
- windowStoreBuilder(storeName, serde),
- false);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> aggregator,
- final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
- Objects.requireNonNull(initializer, "initializer can't be null");
- Objects.requireNonNull(aggregator, "aggregator can't be null");
- Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
- return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
- materializedInternal.storeName(),
- initializer,
- aggregator),
- AGGREGATE_NAME,
- materialize(materializedInternal),
- true);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
- Objects.requireNonNull(reducer, "reducer can't be null");
- final String storeName = builder.newStoreName(REDUCE_NAME);
- return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
- REDUCE_NAME,
- windowStoreBuilder(storeName, valSerde),
- true);
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
- Objects.requireNonNull(reducer, "reducer can't be null");
- Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
-
- return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
- REDUCE_NAME,
- materialize(materializedInternal),
- false);
- }
-
- private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
- WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
- if (supplier == null) {
- supplier = Stores.persistentWindowStore(materialized.storeName(),
- windows.maintainMs(),
- windows.segments,
- windows.size(),
- false);
- }
- final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
- materialized.keySerde(),
- materialized.valueSerde());
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
-
- if (materialized.cachingEnabled()) {
- builder.withCachingEnabled();
- }
- return builder;
- }
-
-
- private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
- return Stores.windowStoreBuilder(
- Stores.persistentWindowStore(
- storeName,
- windows.maintainMs(),
- windows.segments,
- windows.size(),
- false),
- keySerde,
- aggValueSerde).withCachingEnabled();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 81e8ef7..350facf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -521,7 +521,7 @@ public class KStreamAggregationIntegrationTest {
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
- .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
+ .count(SessionWindows.with(sessionGap).until(maintainMillis))
.toStream()
.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index efa027c..c8e011e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -257,7 +257,6 @@ public class KGroupedStreamImplTest {
});
doAggregateSessionWindows(results);
- assertNull(table.queryableStoreName());
}
private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
new file mode 100644
index 0000000..042f0f1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class SessionWindowedKStreamImplTest {
+
+ private static final String TOPIC = "input";
+ private final StreamsBuilder builder = new StreamsBuilder();
+
+ @Rule
+ public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final Merger<String, String> sessionMerger = new Merger<String, String>() {
+ @Override
+ public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+ return aggOne + "+" + aggTwo;
+ }
+ };
+ private SessionWindowedKStream<String, String> stream;
+
+ @Before
+ public void before() {
+ final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+ this.stream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SessionWindows.with(500));
+ }
+
+ @Test
+ public void shouldCountSessionWindowed() {
+ final Map<Windowed<String>, Long> results = new HashMap<>();
+ stream.count()
+ .toStream()
+ .foreach(new ForeachAction<Windowed<String>, Long>() {
+ @Override
+ public void apply(final Windowed<String> key, final Long value) {
+ results.put(key, value);
+ }
+ });
+
+ processData();
+ assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
+ assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
+ assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
+ }
+
+ @Test
+ public void shouldReduceWindowed() {
+ final Map<Windowed<String>, String> results = new HashMap<>();
+ stream.reduce(MockReducer.STRING_ADDER)
+ .toStream()
+ .foreach(new ForeachAction<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String value) {
+ results.put(key, value);
+ }
+ });
+
+ processData();
+ assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
+ assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
+ assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
+ }
+
+ @Test
+ public void shouldAggregateSessionWindowed() {
+ final Map<Windowed<String>, String> results = new HashMap<>();
+ stream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ sessionMerger)
+ .toStream()
+ .foreach(new ForeachAction<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String value) {
+ results.put(key, value);
+ }
+ });
+ processData();
+ assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
+ assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
+ assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldMaterializeCount() {
+ stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
+
+ processData();
+ final SessionStore<String, Long> store = (SessionStore<String, Long>) driver.allStateStores().get("count-store");
+ final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldMaterializeReduced() {
+ stream.reduce(MockReducer.STRING_ADDER,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("reduced")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ processData();
+ final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("reduced");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldMaterializeAggregated() {
+ stream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ sessionMerger,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ processData();
+ final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+ stream.aggregate(null, MockAggregator.TOSTRING_ADDER, sessionMerger);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+ stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
+ stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+ stream.reduce(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+ stream.aggregate(null,
+ MockAggregator.TOSTRING_ADDER,
+ sessionMerger,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+ stream.aggregate(MockInitializer.STRING_INIT,
+ null,
+ sessionMerger,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
+ stream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ null,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+ stream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ sessionMerger,
+ (Materialized) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+ stream.reduce(null,
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+ stream.reduce(MockReducer.STRING_ADDER,
+ null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+ stream.count(null);
+ }
+
+ private void processData() {
+ driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+ driver.setTime(10);
+ driver.process(TOPIC, "1", "1");
+ driver.setTime(15);
+ driver.process(TOPIC, "1", "2");
+ driver.setTime(600);
+ driver.process(TOPIC, "1", "3");
+ driver.process(TOPIC, "2", "1");
+ driver.flushState();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
new file mode 100644
index 0000000..93bcf33
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TimeWindowedKStreamImplTest {
+
+ private static final String TOPIC = "input";
+ private final StreamsBuilder builder = new StreamsBuilder();
+
+ @Rule
+ public final KStreamTestDriver driver = new KStreamTestDriver();
+ private TimeWindowedKStream<String, String> windowedStream;
+
+ @Before
+ public void before() {
+ final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+ windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .windowedBy(TimeWindows.of(500L));
+ }
+
+ @Test
+ public void shouldCountWindowed() {
+ final Map<Windowed<String>, Long> results = new HashMap<>();
+ windowedStream.count()
+ .toStream()
+ .foreach(new ForeachAction<Windowed<String>, Long>() {
+ @Override
+ public void apply(final Windowed<String> key, final Long value) {
+ results.put(key, value);
+ }
+ });
+
+ processData();
+ assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
+ assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
+ assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
+ }
+
+
+ @Test
+ public void shouldReduceWindowed() {
+ final Map<Windowed<String>, String> results = new HashMap<>();
+ windowedStream.reduce(MockReducer.STRING_ADDER)
+ .toStream()
+ .foreach(new ForeachAction<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String value) {
+ results.put(key, value);
+ }
+ });
+
+ processData();
+ assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
+ assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
+ assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
+ }
+
+ @Test
+ public void shouldAggregateWindowed() {
+ final Map<Windowed<String>, String> results = new HashMap<>();
+ windowedStream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER
+ )
+ .toStream()
+ .foreach(new ForeachAction<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String value) {
+ results.put(key, value);
+ }
+ });
+ processData();
+ assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
+ assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
+ assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldMaterializeCount() {
+ windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
+
+ processData();
+ final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
+ final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldMaterializeReduced() {
+ windowedStream.reduce(MockReducer.STRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ processData();
+ final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldMaterializeAggregated() {
+ windowedStream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ processData();
+ final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+ windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+ windowedStream.aggregate(MockInitializer.STRING_INIT, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+ windowedStream.reduce(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+ windowedStream.aggregate(null,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+ windowedStream.aggregate(MockInitializer.STRING_INIT,
+ null,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+ windowedStream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ (Materialized) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+ windowedStream.reduce(null,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+ windowedStream.reduce(MockReducer.STRING_ADDER,
+ null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+ windowedStream.count(null);
+ }
+
+ private void processData() {
+ driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+ driver.setTime(10);
+ driver.process(TOPIC, "1", "1");
+ driver.setTime(15);
+ driver.process(TOPIC, "1", "2");
+ driver.setTime(500);
+ driver.process(TOPIC, "1", "3");
+ driver.process(TOPIC, "2", "1");
+ driver.flushState();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
deleted file mode 100644
index f5de96f..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Serialized;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockAggregator;
-import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-public class WindowedKStreamImplTest {
-
- private static final String TOPIC = "input";
- private final StreamsBuilder builder = new StreamsBuilder();
-
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private WindowedKStream<String, String> windowedStream;
-
- @Before
- public void before() {
- final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
- windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(500L));
- }
-
- @Test
- public void shouldCountWindowed() {
- final Map<Windowed<String>, Long> results = new HashMap<>();
- windowedStream.count()
- .toStream()
- .foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long value) {
- results.put(key, value);
- }
- });
-
- processData();
- assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
- assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
- assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
- }
-
-
- @Test
- public void shouldReduceWindowed() {
- final Map<Windowed<String>, String> results = new HashMap<>();
- windowedStream.reduce(MockReducer.STRING_ADDER)
- .toStream()
- .foreach(new ForeachAction<Windowed<String>, String>() {
- @Override
- public void apply(final Windowed<String> key, final String value) {
- results.put(key, value);
- }
- });
-
- processData();
- assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
- assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
- assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
- }
-
- @Test
- public void shouldAggregateWindowed() {
- final Map<Windowed<String>, String> results = new HashMap<>();
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER
- )
- .toStream()
- .foreach(new ForeachAction<Windowed<String>, String>() {
- @Override
- public void apply(final Windowed<String> key, final String value) {
- results.put(key, value);
- }
- });
- processData();
- assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
- assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
- assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void shouldMaterializeCount() {
- windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.Long()));
-
- processData();
- final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store");
- final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void shouldMaterializeReduced() {
- windowedStream.reduce(MockReducer.STRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.String()));
-
- processData();
- final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced");
- final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void shouldMaterializeAggregated() {
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.String()));
-
- processData();
- final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated");
- final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
- windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
- windowedStream.aggregate(MockInitializer.STRING_INIT, null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
- windowedStream.reduce(null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
- windowedStream.aggregate(null,
- MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- null,
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
- windowedStream.aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- (Materialized) null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
- windowedStream.reduce(null,
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
- windowedStream.reduce(MockReducer.STRING_ADDER,
- null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
- windowedStream.count(null);
- }
-
- private void processData() {
- driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
- driver.setTime(10);
- driver.process(TOPIC, "1", "1");
- driver.setTime(15);
- driver.process(TOPIC, "1", "2");
- driver.setTime(500);
- driver.process(TOPIC, "1", "3");
- driver.process(TOPIC, "2", "1");
- driver.flushState();
- }
-
-}
\ No newline at end of file
[2/2] kafka git commit: KAFKA-5922: Add SessionWindowedKStream
Posted by gu...@apache.org.
KAFKA-5922: Add SessionWindowedKStream
Add `SessionWindowedKStream` and implementation. Deprecate existing `SessionWindow` `aggregate` methods on `KGroupedStream`
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3902 from dguy/kafka-5922
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2da064c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2da064c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2da064c
Branch: refs/heads/trunk
Commit: a2da064cbf01558d0af64adc9d6fc9444cd744ec
Parents: b12ba24
Author: Damian Guy <da...@gmail.com>
Authored: Thu Sep 21 16:10:17 2017 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Sep 21 16:10:17 2017 +0800
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedStream.java | 41 ++-
.../streams/kstream/SessionWindowedKStream.java | 268 ++++++++++++++++++
.../streams/kstream/TimeWindowedKStream.java | 276 +++++++++++++++++++
.../kafka/streams/kstream/WindowedKStream.java | 276 -------------------
.../kstream/internals/KGroupedStreamImpl.java | 49 ++--
.../internals/SessionWindowedKStreamImpl.java | 199 +++++++++++++
.../internals/TimeWindowedKStreamImpl.java | 179 ++++++++++++
.../kstream/internals/WindowedKStreamImpl.java | 179 ------------
.../KStreamAggregationIntegrationTest.java | 2 +-
.../internals/KGroupedStreamImplTest.java | 1 -
.../SessionWindowedKStreamImplTest.java | 264 ++++++++++++++++++
.../internals/TimeWindowedKStreamImplTest.java | 241 ++++++++++++++++
.../internals/WindowedKStreamImplTest.java | 241 ----------------
13 files changed, 1498 insertions(+), 718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 5621ab4..1ff1759 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -84,7 +84,9 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
+ * @deprecated use {@link #count(Materialized)
*/
+ @Deprecated
KTable<K, Long> count(final String queryableStoreName);
/**
@@ -143,7 +145,9 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
+ * @deprecated use {@link #count(Materialized)}
*/
+ @Deprecated
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -222,8 +226,10 @@ public interface KGroupedStream<K, V> {
* @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
- * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ * that represent the latest (rolling) count (i.e., number of records) for each key within a window.
+ * @deprecated use {@link #windowedBy(Windows)}
*/
+ @Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final String queryableStoreName);
@@ -298,7 +304,9 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ * @deprecated use {@link #windowedBy(Windows)}
*/
+ @Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier);
@@ -337,7 +345,9 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
/**
@@ -359,7 +369,9 @@ public interface KGroupedStream<K, V> {
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
/**
@@ -395,7 +407,9 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
final StateStoreSupplier<SessionStore> storeSupplier);
@@ -842,7 +856,9 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
final String queryableStoreName);
@@ -875,7 +891,9 @@ public interface KGroupedStream<K, V> {
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows);
@@ -939,7 +957,9 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
final StateStoreSupplier<SessionStore> storeSupplier);
@@ -1433,7 +1453,9 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
@@ -1476,7 +1498,9 @@ public interface KGroupedStream<K, V> {
* @param <T> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
@@ -1533,7 +1557,9 @@ public interface KGroupedStream<K, V> {
* @param <T> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
+ * @deprecated use {@link #windowedBy(SessionWindows)}
*/
+ @Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
@@ -1542,11 +1568,18 @@ public interface KGroupedStream<K, V> {
final StateStoreSupplier<SessionStore> storeSupplier);
/**
- * Create a new {@link WindowedKStream} instance that can be used to perform windowed aggregations.
+ * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.
* @param windows the specification of the aggregation {@link Windows}
* @param <W> the window type
- * @return an instance of {@link WindowedKStream}
+ * @return an instance of {@link TimeWindowedKStream}
+ */
+ <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows);
+
+ /**
+ * Create a new {@link SessionWindowedKStream} instance that can be used to perform session windowed aggregations.
+ * @param windows the specification of the aggregation {@link SessionWindows}
+ * @return an instance of {@link TimeWindowedKStream}
*/
- <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows);
+ SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
new file mode 100644
index 0000000..d8044ac
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.SessionStore;
+
+/**
+ * {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
+ * <p>
+ * {@link SessionWindows} are dynamic data driven windows.
+ * They have no fixed time boundaries, rather the size of the window is determined by the records.
+ * Please see {@link SessionWindows} for more details.
+ * <p>
+ * {@link SessionWindows} are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ *
+ * Furthermore, updates are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(SessionWindows)} .
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ * @see SessionWindows
+ */
+public interface SessionWindowedKStream<K, V> {
+
+ /**
+ * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ *
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ */
+ KTable<Windowed<K>, Long> count();
+
+ /**
+ * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the name provided with {@link Materialized}.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+ * the same window and key if caching is enabled on the {@link Materialized} instance.
+ * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+ * String key = "some-key";
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ */
+ KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via
+ * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+ * <p>
+ * The specified {@link Initializer} is applied once per session directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap,
+ * they are merged into a single session and the old sessions are discarded.
+ * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute
+ * aggregate functions like count (c.f. {@link #count()})
+ * <p>
+ * The default value serde from config will be used for serializing the result.
+ * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * @param initializer the instance of {@link Initializer}
+ * @param aggregator the instance of {@link Aggregator}
+ * @param sessionMerger the instance of {@link Merger}
+ * @param <T> the value type of the resulting {@link KTable}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator,
+ final Merger<? super K, T> sessionMerger);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the name provided with {@link Materialized}.
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via
+ * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+ * <p>
+ * The specified {@link Initializer} is applied once per session directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap,
+ * they are merged into a single session and the old sessions are discarded.
+ * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute
+ * aggregate functions like count (c.f. {@link #count()})
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+ * the same window and key if caching is enabled on the {@link Materialized} instance.
+ * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+ * <p>
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * <pre>{@code
+ * KafkaStreams streams = ... // some windowed aggregation on value type double
+ * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
+ * String key = "some-key";
+ * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * @param initializer the instance of {@link Initializer}
+ * @param aggregator the instance of {@link Aggregator}
+ * @param sessionMerger the instance of {@link Merger}
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Merger<? super K, VR> sessionMerger,
+ final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Combine values of this stream by the grouped key into {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}).
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+ * materialized view).
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer)} can be used to compute aggregate functions like sum, min,
+ * or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ *
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+ /**
+ * Combine values of this stream by the grouped key into {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}).
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
+ * provided by the given {@link Materialized} instance.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate (first argument) and the record's value (second argument):
+ * <pre>{@code
+ * // At the example of a Reducer<Long>
+ * new Reducer<Long>() {
+ * @Override
+ * public Long apply(Long aggValue, Long currValue) {
+ * return aggValue + currValue;
+ * }
+ * }</pre>
+ * <p>
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
+ * sum, min, or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+ * the same window and key if caching is enabled on the {@link Materialized} instance.
+ * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+ * String key = "some-key";
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
+ * alphanumerics, '.', '_' and '-'.
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
new file mode 100644
index 0000000..433f4e7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * {@code TimeWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
+ * {@link KStream} records.
+ * <p>
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
+ * <p>
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+
+ * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ */
+public interface TimeWindowedKStream<K, V> {
+
+ /**
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} key or value are ignored.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+ * represent the latest (rolling) count (i.e., number of records) for each key
+ */
+ KTable<Windowed<K>, Long> count();
+
+ /**
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} key or value are ignored.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+ * the same window and key if caching is enabled on the {@link Materialized} instance.
+ * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+ *
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ *
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+ * represent the latest (rolling) count (i.e., number of records) for each key
+ */
+ KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count()}).
+ * <p>
+ * The default value serde from config will be used for serializing the result.
+ * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the store name as provided with {@link Materialized}.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count()}).
+ * <p>
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+ * the same window and key if caching is enabled on the {@link Materialized} instance.
+ * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+ *
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ *
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Combine the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+ /**
+ * Combine the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the store name as provided with {@link Materialized}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+ * the same window and key if caching is enabled on the {@link Materialized} instance.
+ * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ *
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ *
+ *
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
deleted file mode 100644
index 35e0eeb..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.WindowStore;
-
-/**
- * {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
- * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
- * {@link KStream} records.
- * <p>
- * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
- * new (partitioned) windows resulting in a windowed {@link KTable}
- * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
- * <p>
- * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
- * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
- * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
- * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
- * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
- * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
- * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
-
- * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
- *
- * @param <K> Type of keys
- * @param <V> Type of values
- * @see KStream
- * @see KGroupedStream
- */
-public interface WindowedKStream<K, V> {
-
- /**
- * Count the number of records in this stream by the grouped key and the defined windows.
- * Records with {@code null} key or value are ignored.
- * <p>
- * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
- * the same window and key.
- * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
- * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
- * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
- * <p>
- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
- * user-specified in {@link StreamsConfig StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
- * and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
- * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
- * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
- * represent the latest (rolling) count (i.e., number of records) for each key
- */
- KTable<Windowed<K>, Long> count();
-
- /**
- * Count the number of records in this stream by the grouped key and the defined windows.
- * Records with {@code null} key or value are ignored.
- * <p>
- * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
- * the same window and key if caching is enabled on the {@link Materialized} instance.
- * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
- * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
- * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
- *
- * <p>
- * To query the local windowed {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ... // counting words
- * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
- *
- * String key = "some-word";
- * long fromTime = ...;
- * long toTime = ...;
- * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
- * represent the latest (rolling) count (i.e., number of records) for each key
- */
- KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
-
- /**
- * Aggregate the values of records in this stream by the grouped key.
- * Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
- * allows the result to have a different type than the input values.
- * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code queryableStoreName}.
- * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
- * <p>
- * The specified {@link Initializer} is applied once directly before the first input record is processed to
- * provide an initial intermediate aggregation result that is used to process the first record.
- * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
- * aggregate (or for the very first record using the intermediate aggregation result provided via the
- * {@link Initializer}) and the record's value.
- * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
- * count (c.f. {@link #count()}).
- * <p>
- * The default value serde from config will be used for serializing the result.
- * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
- * <p>
- * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
- * the same key.
- * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
- * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
- * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
- * <p>
- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
- * and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
- * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
- *
- * @param <VR> the value type of the resulting {@link KTable}
- * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
- * @param aggregator an {@link Aggregator} that computes a new aggregate result
- * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
- * latest (rolling) aggregate for each key
- */
- <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> aggregator);
-
- /**
- * Aggregate the values of records in this stream by the grouped key.
- * Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
- * allows the result to have a different type than the input values.
- * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the store name as provided with {@link Materialized}.
- * <p>
- * The specified {@link Initializer} is applied once directly before the first input record is processed to
- * provide an initial intermediate aggregation result that is used to process the first record.
- * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
- * aggregate (or for the very first record using the intermediate aggregation result provided via the
- * {@link Initializer}) and the record's value.
- * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
- * count (c.f. {@link #count()}).
- * <p>
- * <p>
- * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
- * the same window and key if caching is enabled on the {@link Materialized} instance.
- * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
- * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
- * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
- *
- * <p>
- * To query the local windowed {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ... // counting words
- * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
- *
- * String key = "some-word";
- * long fromTime = ...;
- * long toTime = ...;
- * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- *
- * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
- * @param aggregator an {@link Aggregator} that computes a new aggregate result
- * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
- * @param <VR> the value type of the resulting {@link KTable}
- * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
- * latest (rolling) aggregate for each key
- */
- <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> aggregator,
- final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
-
- /**
- * Combine the values of records in this stream by the grouped key.
- * Records with {@code null} key or value are ignored.
- * Combining implies that the type of the aggregate result is the same as the type of the input value.
- * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code queryableStoreName}.
- * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
- * <p>
- * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
- * aggregate and the record's value.
- * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
- * value as-is.
- * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
- * <p>
- * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
- * the same key.
- * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
- * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
- * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
- * <p>
- * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
- * and "-changelog" is a fixed suffix.
- * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
- *
- * @param reducer a {@link Reducer} that computes a new aggregate result
- * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
- * latest (rolling) aggregate for each key
- */
- KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
-
- /**
- * Combine the values of records in this stream by the grouped key.
- * Records with {@code null} key or value are ignored.
- * Combining implies that the type of the aggregate result is the same as the type of the input value.
- * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the store name as provided with {@link Materialized}.
- * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
- * <p>
- * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
- * aggregate and the record's value.
- * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
- * value as-is.
- * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
- * <p>
- * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
- * the same window and key if caching is enabled on the {@link Materialized} instance.
- * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
- * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
- * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
- * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
- * <p>
- * To query the local windowed {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ... // counting words
- * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
- *
- * String key = "some-word";
- * long fromTime = ...;
- * long toTime = ...;
- * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- *
- *
- * @param reducer a {@link Reducer} that computes a new aggregate result
- * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
- * latest (rolling) aggregate for each key
- */
- KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
- final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index ba037f5..4943314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -26,10 +26,11 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -315,7 +316,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde) {
- return aggregate(initializer, aggregator, sessionMerger, sessionWindows, aggValueSerde, (String) null);
+ return windowedBy(sessionWindows).aggregate(initializer,
+ aggregator,
+ sessionMerger,
+ Materialized.<K, T, SessionStore<Bytes, byte[]>>as(builder.newStoreName(AGGREGATE_NAME))
+ .withKeySerde(keySerde)
+ .withValueSerde(aggValueSerde));
}
@SuppressWarnings("unchecked")
@@ -340,26 +346,37 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@Override
- public <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows) {
- return new WindowedKStreamImpl<>(windows,
- builder,
- sourceNodes,
- name,
- keySerde,
- valSerde,
- repartitionRequired);
+ public <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows) {
+ return new TimeWindowedKStreamImpl<>(windows,
+ builder,
+ sourceNodes,
+ name,
+ keySerde,
+ valSerde,
+ repartitionRequired);
+ }
+
+ @Override
+ public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
+ return new SessionWindowedKStreamImpl<>(windows,
+ builder,
+ sourceNodes,
+ name,
+ keySerde,
+ valSerde,
+ aggregateBuilder);
}
@SuppressWarnings("unchecked")
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
- determineIsQueryable(queryableStoreName);
- return count(sessionWindows,
- storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME))
- .sessionWindowed(sessionWindows.maintainMs()).build());
+ Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
+ .withKeySerde(keySerde)
+ .withValueSerde(Serdes.Long());
+ return windowedBy(sessionWindows).count(materialized);
}
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
- return count(sessionWindows, (String) null);
+ return windowedBy(sessionWindows).count();
}
@Override
@@ -399,7 +416,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows) {
- return reduce(reducer, sessionWindows, (String) null);
+ return windowedBy(sessionWindows).reduce(reducer);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
new file mode 100644
index 0000000..0603853
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+
+public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> {
+ private final SessionWindows windows;
+ private final Serde<K> keySerde;
+ private final Serde<V> valSerde;
+ private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+ private final Merger<K, Long> countMerger = new Merger<K, Long>() {
+ @Override
+ public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
+ return aggOne + aggTwo;
+ }
+ };
+ private final Initializer<V> reduceInitializer = new Initializer<V>() {
+ @Override
+ public V apply() {
+ return null;
+ }
+ };
+
+
+ SessionWindowedKStreamImpl(final SessionWindows windows,
+ final InternalStreamsBuilder builder,
+ final Set<String> sourceNodes,
+ final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final GroupedStreamAggregateBuilder<K, V> aggregateBuilder) {
+ super(builder, name, sourceNodes);
+ this.windows = windows;
+ this.keySerde = keySerde;
+ this.valSerde = valSerde;
+ this.aggregateBuilder = aggregateBuilder;
+ }
+
+ @Override
+ public KTable<Windowed<K>, Long> count() {
+ return doAggregate(aggregateBuilder.countInitializer,
+ aggregateBuilder.countAggregator,
+ countMerger,
+ Serdes.Long());
+ }
+
+ @Override
+ public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return aggregate(aggregateBuilder.countInitializer,
+ aggregateBuilder.countAggregator,
+ countMerger,
+ materialized);
+ }
+
+ @Override
+ public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator,
+ final Merger<? super K, T> sessionMerger) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+ return doAggregate(initializer, aggregator, sessionMerger, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Merger<? super K, VR> sessionMerger,
+ final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ return (KTable<Windowed<K>, VR>) aggregateBuilder.build(
+ new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger),
+ AGGREGATE_NAME,
+ materialize(materializedInternal),
+ true);
+ }
+
+ @Override
+ public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ return doAggregate(reduceInitializer, aggregatorForReducer(reducer), mergerForAggregator(aggregatorForReducer(reducer)), valSerde);
+ }
+
+ @Override
+ public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer);
+ return aggregate(reduceInitializer, reduceAggregator, mergerForAggregator(reduceAggregator), materialized);
+ }
+
+
+ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
+ SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
+ if (supplier == null) {
+ supplier = Stores.persistentSessionStore(materialized.storeName(),
+ windows.maintainMs());
+ }
+ final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ if (materialized.cachingEnabled()) {
+ builder.withCachingEnabled();
+ }
+ return builder;
+ }
+
+ private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> aggregator) {
+ return new Merger<K, V>() {
+ @Override
+ public V apply(final K aggKey, final V aggOne, final V aggTwo) {
+ return aggregator.apply(aggKey, aggTwo, aggOne);
+ }
+ };
+ }
+
+ private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
+ return new Aggregator<K, V, V>() {
+ @Override
+ public V apply(final K aggKey, final V value, final V aggregate) {
+ if (aggregate == null) {
+ return value;
+ }
+ return reducer.apply(aggregate, value);
+ }
+ };
+ }
+
+ private <VR> StoreBuilder<SessionStore<K, VR>> storeBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+ return Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(
+ storeName,
+ windows.maintainMs()),
+ keySerde,
+ aggValueSerde).withCachingEnabled();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Merger<? super K, VR> merger,
+ final Serde<VR> serde) {
+ final String storeName = builder.newStoreName(AGGREGATE_NAME);
+ return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamSessionWindowAggregate<>(windows, storeName, initializer, aggregator, merger),
+ AGGREGATE_NAME,
+ storeBuilder(storeName, serde),
+ false);
+ }
+}