You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/08 02:18:40 UTC
[2/2] kafka git commit: KAFKA-2653: Add KStream/KTable Aggregation
and KTable Join APIs
KAFKA-2653: Add KStream/KTable Aggregation and KTable Join APIs
ping ymatsuda for reviews.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #730 from guozhangwang/K2653r
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40d731b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40d731b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40d731b8
Branch: refs/heads/trunk
Commit: 40d731b8712950122915795acca43886851a73b6
Parents: 4836e52
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jan 7 17:18:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 7 17:18:33 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/examples/KStreamJob.java | 2 +-
.../kafka/streams/examples/KTableJob.java | 111 +++++++++++++++++++
.../kafka/streams/kstream/Aggregator.java | 42 +++++++
.../streams/kstream/AggregatorSupplier.java | 23 ++++
.../kafka/streams/kstream/HoppingWindows.java | 79 +++++++++++++
.../kafka/streams/kstream/JoinWindowSpec.java | 91 ---------------
.../kafka/streams/kstream/JoinWindows.java | 110 ++++++++++++++++++
.../apache/kafka/streams/kstream/KStream.java | 89 +++++++++++++--
.../kafka/streams/kstream/KStreamBuilder.java | 6 +-
.../kafka/streams/kstream/KStreamWindowed.java | 38 -------
.../apache/kafka/streams/kstream/KTable.java | 90 ++++++++++++++-
.../streams/kstream/KeyValueToDoubleMapper.java | 23 ++++
.../streams/kstream/KeyValueToIntMapper.java | 23 ++++
.../streams/kstream/KeyValueToLongMapper.java | 23 ++++
.../kafka/streams/kstream/SlidingWindows.java | 67 +++++++++++
.../streams/kstream/TransformerSupplier.java | 1 -
.../kafka/streams/kstream/UnlimitedWindows.java | 63 +++++++++++
.../apache/kafka/streams/kstream/Window.java | 51 +++++++++
.../apache/kafka/streams/kstream/Windowed.java | 38 +++++++
.../apache/kafka/streams/kstream/Windows.java | 80 +++++++++++++
.../internals/DefaultWindowedDeserializer.java | 59 ++++++++++
.../internals/DefaultWindowedSerializer.java | 57 ++++++++++
.../kstream/internals/HoppingWindow.java | 37 +++++++
.../streams/kstream/internals/KStreamImpl.java | 111 +++++++++++++++----
.../streams/kstream/internals/KTableImpl.java | 69 ++++++++++++
.../kstream/internals/SlidingWindow.java | 38 +++++++
.../streams/kstream/internals/TopKSupplier.java | 106 ++++++++++++++++++
.../kstream/internals/UnlimitedWindow.java | 37 +++++++
.../streams/kstream/KStreamBuilderTest.java | 6 +-
.../kstream/internals/KStreamBranchTest.java | 2 +-
.../kstream/internals/KStreamFilterTest.java | 4 +-
.../kstream/internals/KStreamFlatMapTest.java | 2 +-
.../internals/KStreamFlatMapValuesTest.java | 2 +-
.../kstream/internals/KStreamImplTest.java | 33 +++++-
.../internals/KStreamKStreamJoinTest.java | 20 ++--
.../internals/KStreamKStreamLeftJoinTest.java | 14 +--
.../internals/KStreamKTableLeftJoinTest.java | 4 +-
.../kstream/internals/KStreamMapTest.java | 2 +-
.../kstream/internals/KStreamMapValuesTest.java | 2 +-
.../kstream/internals/KStreamTransformTest.java | 2 +-
.../internals/KStreamTransformValuesTest.java | 2 +-
41 files changed, 1456 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 819bd68..88a8955 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -45,7 +45,7 @@ public class KStreamJob {
KStreamBuilder builder = new KStreamBuilder();
- KStream<String, String> stream1 = builder.from("topic1");
+ KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, Integer> stream2 =
stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
new file mode 100644
index 0000000..45ff58e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+public class KTableJob {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable");
+ props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamingConfig config = new StreamingConfig(props);
+
+ Serializer<String> stringSerializer = new StringSerializer();
+ Deserializer<String> stringDeserializer = new StringDeserializer();
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ // stream aggregate
+ KStream<String, Long> stream1 = builder.stream("topic1");
+
+ @SuppressWarnings("unchecked")
+ KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() {
+ @Override
+ public long apply(String key, Long value) {
+ return value;
+ }
+ }, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer);
+
+ // table aggregation
+ KTable<String, String> table1 = builder.table("topic2");
+
+ KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() {
+ @Override
+ public String apply(String key, String value) {
+ return value;
+ }
+ }, new KeyValueToLongMapper<String, String>() {
+ @Override
+ public long apply(String key, String value) {
+ return Long.parseLong(value);
+ }
+ }, stringSerializer, stringDeserializer, "table2");
+
+ // stream-table join
+ KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() {
+ @Override
+ public Long apply(Long value1, Long value2) {
+ if (value2 == null)
+ return 0L;
+ else
+ return value1 * value2;
+ }
+ });
+
+ // table-table join
+ KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() {
+ @Override
+ public String apply(String value1, Long value2) {
+ if (value2 == null)
+ return value1 + "-null";
+ else if (value1 == null)
+ return "null-" + value2;
+ else
+ return value1 + "-" + value2;
+ }
+ });
+
+ wtable1.to("topic3");
+
+ KafkaStreaming kstream = new KafkaStreaming(builder, config);
+ kstream.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
new file mode 100644
index 0000000..d715fbd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Aggregator<K, V, T> {
+ /**
+ * Set the initial aggregate value
+ */
+ T initialValue();
+
+ /**
+ * When a new record with the aggregate key is added,
+ * updating the aggregate value for this key
+ */
+ T add(K aggKey, V value, T aggregate);
+
+ /**
+ * when an old record with the aggregate key is removed,
+ * updating the aggregate value for this key
+ */
+ T remove(K aggKey, V value, T aggregate);
+
+ /**
+ * Merge two aggregate values
+ */
+ T merge(T aggr1, T aggr2);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
new file mode 100644
index 0000000..6ed9125
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface AggregatorSupplier<K, V, T> {
+
+ Aggregator<K, V, T> get();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
new file mode 100644
index 0000000..d7141eb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.HoppingWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class HoppingWindows extends Windows<HoppingWindow> {
+
+ private static final long DEFAULT_SIZE_MS = 1000L;
+
+ public final long size;
+
+ public final long period;
+
+ private HoppingWindows(String name, long size, long period) {
+ super(name);
+
+ this.size = size;
+ this.period = period;
+ }
+
+ /**
+ * Returns a half-interval hopping window definition with the window size in milliseconds
+ * of the form [ N * default_size, N * default_size + default_size )
+ */
+ public static HoppingWindows of(String name) {
+ return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS);
+ }
+
+ /**
+ * Returns a new hopping window definition with the original size but reassign the window
+ * period in milliseconds of the form [ N * period, N * period + size )
+ */
+ public HoppingWindows with(long size) {
+ return new HoppingWindows(this.name, size, this.period);
+ }
+
+ /**
+ * Returns a new hopping window definition with the original size but reassign the window
+ * period in milliseconds of the form [ N * period, N * period + size )
+ */
+ public HoppingWindows every(long period) {
+ return new HoppingWindows(this.name, this.size, period);
+ }
+
+ @Override
+ public Collection<HoppingWindow> windowsFor(long timestamp) {
+ // TODO
+ return Collections.<HoppingWindow>emptyList();
+ }
+
+ @Override
+ public boolean equalTo(Windows other) {
+ if (!other.getClass().equals(HoppingWindows.class))
+ return false;
+
+ HoppingWindows otherWindows = (HoppingWindows) other;
+
+ return this.size == otherWindows.size && this.period == otherWindows.period;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
deleted file mode 100644
index 8f0f839..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * This class is used to specify the behaviour of windowed joins.
- */
-public class JoinWindowSpec {
-
- public final String name;
- public final long before;
- public final long after;
- public final long retention;
- public final int segments;
-
- private JoinWindowSpec(String name, long before, long after, long retention, int segments) {
- this.name = name;
- this.after = after;
- this.before = before;
- this.retention = retention;
- this.segments = segments;
- }
-
- public static JoinWindowSpec of(String name) {
- return new JoinWindowSpec(name, 0L, 0L, 0L, 3);
- }
-
- /**
- * Specifies that records of the same key are joinable if their timestamp stamps are within
- * timeDifference.
- *
- * @param timeDifference
- * @return
- */
- public JoinWindowSpec within(long timeDifference) {
- return new JoinWindowSpec(name, timeDifference, timeDifference, retention, segments);
- }
-
- /**
- * Specifies that records of the same key are joinable if their timestamp stamps are within
- * timeDifference, and if the timestamp of a record from the secondary stream is
- * is earlier than or equal to the timestamp of a record from the first stream.
- *
- * @param timeDifference
- * @return
- */
- public JoinWindowSpec before(long timeDifference) {
- return new JoinWindowSpec(name, timeDifference, 0L, retention, segments);
- }
-
- /**
- * Specifies that records of the same key are joinable if their timestamp stamps are within
- * timeDifference, and if the timestamp of a record from the secondary stream is
- * is later than or equal to the timestamp of a record from the first stream.
- *
- * @param timeDifference
- * @return
- */
- public JoinWindowSpec after(long timeDifference) {
- return new JoinWindowSpec(name, 0L, timeDifference, retention, segments);
- }
-
- /**
- * Specifies the retention period of windows
- * @param retentionPeriod
- * @return
- */
- public JoinWindowSpec retentionPeriod(long retentionPeriod) {
- return new JoinWindowSpec(name, before, after, retentionPeriod, segments);
- }
-
- public JoinWindowSpec segments(int segments) {
- return new JoinWindowSpec(name, before, after, retention, segments);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
new file mode 100644
index 0000000..50aff9d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+
+import java.util.Collection;
+
+/**
+ * This class is used to specify the behaviour of windowed joins.
+ */
+public class JoinWindows extends Windows<SlidingWindow> {
+
+ private static final int DEFAULT_NUM_SEGMENTS = 3;
+
+ public final long before;
+ public final long after;
+ public final int segments;
+
+ private JoinWindows(String name, long before, long after, int segments) {
+ super(name);
+
+ this.after = after;
+ this.before = before;
+ this.segments = segments;
+ }
+
+ public static JoinWindows of(String name) {
+ return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS);
+ }
+
+ /**
+ * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * timeDifference.
+ *
+ * @param timeDifference
+ * @return
+ */
+ public JoinWindows within(long timeDifference) {
+ return new JoinWindows(this.name, timeDifference, timeDifference, this.segments);
+ }
+
+ /**
+ * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * timeDifference, and if the timestamp of a record from the secondary stream is
+ * is earlier than or equal to the timestamp of a record from the first stream.
+ *
+ * @param timeDifference
+ * @return
+ */
+ public JoinWindows before(long timeDifference) {
+ return new JoinWindows(this.name, timeDifference, this.after, this.segments);
+ }
+
+ /**
+ * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * timeDifference, and if the timestamp of a record from the secondary stream is
+ * is later than or equal to the timestamp of a record from the first stream.
+ *
+ * @param timeDifference
+ * @return
+ */
+ public JoinWindows after(long timeDifference) {
+ return new JoinWindows(this.name, this.before, timeDifference, this.segments);
+ }
+
+ /**
+ * Specifies the number of segments to be used for rolling the window store,
+ * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
+ *
+ * @param segments
+ * @return
+ */
+ protected JoinWindows segments(int segments) {
+ return new JoinWindows(name, before, after, segments);
+ }
+
+ @Override
+ public Collection<SlidingWindow> windowsFor(long timestamp) {
+ // this function should never be called
+ throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
+ }
+
+ @Override
+ public boolean equalTo(Windows other) {
+ if (!other.getClass().equals(JoinWindows.class))
+ return false;
+
+ JoinWindows otherWindows = (JoinWindows) other;
+
+ return this.before == otherWindows.before && this.after == otherWindows.after;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 29115c7..dace7e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import java.util.Collection;
+
/**
* KStream is an abstraction of a stream of key-value pairs.
*
@@ -141,7 +143,7 @@ public interface KStream<K, V> {
/**
* Applies a stateful transformation to all elements in this stream.
*
- * @param transformerSupplier the class of TransformerDef
+ * @param transformerSupplier the class of valueTransformerSupplier
* @param stateStoreNames the names of the state store used by the processor
* @return the instance of KStream that contains transformed keys and values
*/
@@ -150,7 +152,7 @@ public interface KStream<K, V> {
/**
* Applies a stateful transformation to all values in this stream.
*
- * @param valueTransformerSupplier the class of TransformerDef
+ * @param valueTransformerSupplier the class of valueTransformerSupplier
* @param stateStoreNames the names of the state store used by the processor
* @return the instance of KStream that contains the keys and transformed values
*/
@@ -169,7 +171,7 @@ public interface KStream<K, V> {
*
* @param otherStream the instance of KStream joined with this stream
* @param joiner ValueJoiner
- * @param joinWindowSpec the specification of the join window
+ * @param windows the specification of the join window
* @param keySerializer key serializer,
* if not specified the default serializer defined in the configs will be used
* @param thisValueSerializer value serializer for this stream,
@@ -188,7 +190,7 @@ public interface KStream<K, V> {
<V1, V2> KStream<K, V2> join(
KStream<K, V1> otherStream,
ValueJoiner<V, V1, V2> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer,
@@ -201,7 +203,7 @@ public interface KStream<K, V> {
*
* @param otherStream the instance of KStream joined with this stream
* @param joiner ValueJoiner
- * @param joinWindowSpec the specification of the join window
+ * @param windows the specification of the join window
* @param keySerializer key serializer,
* if not specified the default serializer defined in the configs will be used
* @param thisValueSerializer value serializer for this stream,
@@ -220,7 +222,7 @@ public interface KStream<K, V> {
<V1, V2> KStream<K, V2> outerJoin(
KStream<K, V1> otherStream,
ValueJoiner<V, V1, V2> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
Serializer<V1> otherValueSerializer,
@@ -233,6 +235,7 @@ public interface KStream<K, V> {
*
* @param otherStream the instance of KStream joined with this stream
* @param joiner ValueJoiner
+ * @param windows the specification of the join window
* @param keySerializer key serializer,
* if not specified the default serializer defined in the configs will be used
* @param otherValueSerializer value serializer for other stream,
@@ -247,7 +250,7 @@ public interface KStream<K, V> {
<V1, V2> KStream<K, V2> leftJoin(
KStream<K, V1> otherStream,
ValueJoiner<V, V1, V2> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerializer,
Serializer<V1> otherValueSerializer,
Deserializer<K> keyDeserializer,
@@ -258,9 +261,79 @@ public interface KStream<K, V> {
*
* @param ktable the instance of KTable joined with this stream
* @param joiner ValueJoiner
- * @param <V1> the value type of the other stream
+ * @param <V1> the value type of the table
* @param <V2> the value type of the new stream
*/
<V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner);
+ /**
+ * Aggregate values of this stream by key on a window basis.
+ *
+ * @param aggregatorSupplier the class of aggregatorSupplier
+ * @param windows the specification of the aggregation window
+ * @param <T> the value type of the aggregated table
+ */
+ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<T> aggValueDeserializer);
+
+ /**
+ * Sum extracted long integer values of this stream by key on a window basis.
+ *
+ * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer);
+
+ /**
+ * Sum extracted integer values of this stream by key on a window basis.
+ *
+ * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer);
+
+ /**
+ * Sum extracted double decimal values of this stream by key on a window basis.
+ *
+ * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer);
+
+ /**
+ * Count number of records of this stream by key on a window basis.
+ *
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer);
+
+ /**
+ * Get the top-k values of this stream by key on a window basis.
+ *
+ * @param k parameter of the top-k computation
+ * @param valueSelector the class of KeyValueMapper to extract the comparable value
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
+ KeyValueMapper<K, V, V1> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V1> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V1> aggValueDeserializer);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 32d3a9d..b50cffb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -46,8 +46,8 @@ public class KStreamBuilder extends TopologyBuilder {
* @param topics the topic names, if empty default to all the topics in the config
* @return KStream
*/
- public <K, V> KStream<K, V> from(String... topics) {
- return from(null, null, topics);
+ public <K, V> KStream<K, V> stream(String... topics) {
+ return stream(null, null, topics);
}
/**
@@ -60,7 +60,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @param topics the topic names, if empty default to all the topics in the config
* @return KStream
*/
- public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) {
+ public <K, V> KStream<K, V> stream(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) {
String name = newName(KStreamImpl.SOURCE_NAME);
addSource(name, keyDeserializer, valDeserializer, topics);
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
deleted file mode 100644
index 4d73128..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * KStreamWindowed is an abstraction of a stream of key-value pairs with a window.
- */
-public interface KStreamWindowed<K, V> extends KStream<K, V> {
-
- /**
- * Creates a new stream by joining this windowed stream with the other windowed stream.
- * Each element arrived from either of the streams is joined with elements in a window of each other.
- * The resulting values are computed by applying a joiner.
- *
- * @param other the other windowed stream
- * @param joiner ValueJoiner
- * @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
- * @return KStream
- */
- <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index c6e7975..997edcd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,6 +20,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import java.util.Collection;
+
/**
* KTable is an abstraction of a change log stream.
*
@@ -112,7 +114,7 @@ public interface KTable<K, V> {
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream
- * @return the instance of KStream
+ * @return the instance of KTable
*/
<V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
@@ -123,7 +125,7 @@ public interface KTable<K, V> {
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream
- * @return the instance of KStream
+ * @return the instance of KTable
*/
<V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
@@ -134,8 +136,90 @@ public interface KTable<K, V> {
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
* @param <V2> the value type of the new stream
- * @return the instance of KStream
+ * @return the instance of KTable
*/
<V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+ /**
+ * Aggregate values of this table by the selected key.
+ *
+ * @param aggregatorSupplier the class of AggregatorSupplier
+ * @param selector the KeyValue mapper that select the aggregate key
+ * @param name the name of the resulted table
+ * @param <K1> the key type of the aggregated table
+ * @param <V1> the value type of the aggregated table
+ * @return the instance of KTable
+ */
+ <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K> keySerializer,
+ Serializer<V2> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V2> aggValueDeserializer,
+ String name);
+
+ /**
+ * Sum extracted long integer values of this table by the selected aggregation key
+ *
+ * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
+ * @param name the name of the resulted table
+ */
+ <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
+ KeyValueToLongMapper<K, V> valueSelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name);
+
+ /**
+ * Sum extracted integer values of this table by the selected aggregation key
+ *
+ * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
+ * @param name the name of the resulted table
+ */
+ <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
+ KeyValueToIntMapper<K, V> valueSelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name);
+
+ /**
+ * Sum extracted double decimal values of this table by the selected aggregation key
+ *
+ * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
+ * @param name the name of the resulted table
+ */
+ <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
+ KeyValueToDoubleMapper<K, V> valueSelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name);
+
+ /**
+ * Count number of records of this table by the selected aggregation key
+ *
+ * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param name the name of the resulted table
+ */
+ <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name);
+
+ /**
+ * Get the top-k values of this table by the selected aggregation key
+ *
+ * @param k parameter of the top-k computation
+ * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param name the name of the resulted table
+ */
+ <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
+ KeyValueMapper<K, V, K1> keySelector,
+ Serializer<K> keySerializer,
+ Serializer<V1> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V1> aggValueDeserializer,
+ String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
new file mode 100644
index 0000000..ae3b858
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueToDoubleMapper<K, V> {
+
+ double apply(K key, V value);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
new file mode 100644
index 0000000..72e5ee9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueToIntMapper<K, V> {
+
+ int apply(K key, V value);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
new file mode 100644
index 0000000..3a8d8a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueToLongMapper<K, V> {
+
+ long apply(K key, V value);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
new file mode 100644
index 0000000..ffdb4ad
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class SlidingWindows extends Windows<SlidingWindow> {
+
+ private static final long DEFAULT_SIZE_MS = 1000L;
+
+ public final long size;
+
+ private SlidingWindows(String name, long size) {
+ super(name);
+
+ this.size = size;
+ }
+
+ /**
+ * Returns a half-interval sliding window definition with the default window size
+ */
+ public static SlidingWindows of(String name) {
+ return new SlidingWindows(name, DEFAULT_SIZE_MS);
+ }
+
+ /**
+ * Returns a half-interval sliding window definition with the window size in milliseconds
+ */
+ public SlidingWindows with(long size) {
+ return new SlidingWindows(this.name, size);
+ }
+
+ @Override
+ public Collection<SlidingWindow> windowsFor(long timestamp) {
+ // TODO
+ return Collections.<SlidingWindow>emptyList();
+ }
+
+ @Override
+ public boolean equalTo(Windows other) {
+ if (!other.getClass().equals(SlidingWindows.class))
+ return false;
+
+ SlidingWindows otherWindows = (SlidingWindows) other;
+
+ return this.size == otherWindows.size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index 2c2d8dd..93d930d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -20,5 +20,4 @@ package org.apache.kafka.streams.kstream;
public interface TransformerSupplier<K, V, R> {
Transformer<K, V, R> get();
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
new file mode 100644
index 0000000..89cb0a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class UnlimitedWindows extends Windows<UnlimitedWindow> {
+
+ private static final long DEFAULT_START_TIMESTAMP = 0L;
+
+ public final long start;
+
+ private UnlimitedWindows(String name, long start) {
+ super(name);
+
+ this.start = start;
+ }
+
+ /**
+ * Returns an unlimited window definition
+ */
+ public static UnlimitedWindows of(String name) {
+ return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
+ }
+
+ public UnlimitedWindows startOn(long start) {
+ return new UnlimitedWindows(this.name, start);
+ }
+
+ @Override
+ public Collection<UnlimitedWindow> windowsFor(long timestamp) {
+ // TODO
+ return Collections.<UnlimitedWindow>emptyList();
+ }
+
+ @Override
+ public boolean equalTo(Windows other) {
+ if (!other.getClass().equals(UnlimitedWindows.class))
+ return false;
+
+ UnlimitedWindows otherWindows = (UnlimitedWindows) other;
+
+ return this.start == otherWindows.start;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
new file mode 100644
index 0000000..63e0a35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public abstract class Window {
+
+ private long start;
+ private long end;
+
+ public Window(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ /**
+ * Returns the start timestamp of this window, inclusive
+ */
+ public long start() {
+ return start;
+ }
+
+ /**
+ * Returns the end timestamp of this window, exclusive
+ */
+ public long end() {
+ return end;
+ }
+
+ public boolean overlap(Window other) {
+ return this.start() < other.end() || other.start() < this.end();
+ }
+
+ public boolean equalsTo(Window other) {
+ return this.start() == other.start() && this.end() == other.end();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
new file mode 100644
index 0000000..03fb656
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class Windowed<T> {
+
+ private T value;
+
+ private Window window;
+
+ public Windowed(T value, Window window) {
+ this.value = value;
+ this.window = window;
+ }
+
+ public T value() {
+ return value;
+ }
+
+ public Window window() {
+ return window;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
new file mode 100644
index 0000000..ab8d822
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class Windows<W extends Window> {
+
+ private static final long DEFAULT_EMIT_DURATION = 1000L;
+
+ private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day
+
+ private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
+
+ private long emitDuration;
+
+ private long maintainDuration;
+
+ protected String name;
+
+ protected Windows(String name) {
+ this.name = name;
+ this.emitDuration = DEFAULT_EMIT_DURATION;
+ this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Set the window emit duration in milliseconds of system time
+ */
+ public Windows emit(long duration) {
+ this.emitDuration = duration;
+
+ return this;
+ }
+
+ /**
+ * Set the window maintain duration in milliseconds of system time
+ */
+ public Windows until(long duration) {
+ this.maintainDuration = duration;
+
+ return this;
+ }
+
+ public long emitEveryMs() {
+ return this.emitDuration;
+ }
+
+ public long maintainMs() {
+ return this.maintainDuration;
+ }
+
+ protected String newName(String prefix) {
+ return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
+ }
+
+ abstract boolean equalTo(Windows other);
+
+ abstract Collection<W> windowsFor(long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
new file mode 100644
index 0000000..9a14c53
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+
+ private static final int TIMESTAMP_SIZE = 8;
+
+ private Deserializer<T> inner;
+
+ public DefaultWindowedDeserializer(Deserializer<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // do nothing
+ }
+
+ @Override
+ public Windowed<T> deserialize(String topic, byte[] data) {
+
+ byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
+
+ System.arraycopy(data, 0, bytes, 0, bytes.length);
+
+ long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
+
+ // always read as unlimited window
+ return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
+ }
+
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
new file mode 100644
index 0000000..4bf2b28
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> {
+
+ private static final int TIMESTAMP_SIZE = 8;
+
+ private Serializer<T> inner;
+
+ public DefaultWindowedSerializer(Serializer<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // do nothing
+ }
+
+ @Override
+ public byte[] serialize(String topic, Windowed<T> data) {
+ byte[] serializedKey = inner.serialize(topic, data.value());
+
+ ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
+ buf.put(serializedKey);
+ buf.putLong(data.window().start());
+
+ return buf.array();
+ }
+
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
new file mode 100644
index 0000000..8b0b2fb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class HoppingWindow extends Window {
+
+ public HoppingWindow(long start, long end) {
+ super(start, end);
+ }
+
+ @Override
+ public boolean overlap(Window other) {
+ return super.overlap(other) && other.getClass().equals(HoppingWindow.class);
+ }
+
+ @Override
+ public boolean equalsTo(Window other) {
+ return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index f47fe0f..4505e74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,10 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
+import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
+import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -30,11 +34,15 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
import java.lang.reflect.Array;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -62,6 +70,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+
public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
@@ -187,7 +197,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<V> valDeserializer) {
to(topic, keySerializer, valSerializer);
- return topology.from(keyDeserializer, valDeserializer, topic);
+ return topology.stream(keyDeserializer, valDeserializer, topic);
}
@Override
@@ -239,7 +249,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier,
@@ -247,7 +257,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier) {
- return join(other, joiner, joinWindowSpec,
+ return join(other, joiner, windows,
keySerialzier, thisValueSerialzier, otherValueSerialzier,
keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
}
@@ -256,7 +266,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier,
@@ -264,7 +274,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<V> thisValueDeserialzier,
Deserializer<V1> otherValueDeserialzier) {
- return join(other, joiner, joinWindowSpec,
+ return join(other, joiner, windows,
keySerialzier, thisValueSerialzier, otherValueSerialzier,
keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
}
@@ -273,7 +283,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V> thisValueSerialzier,
Serializer<V1> otherValueSerialzier,
@@ -286,21 +296,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
RocksDBWindowStoreSupplier<K, V> thisWindow =
new RocksDBWindowStoreSupplier<>(
- joinWindowSpec.name + "-1",
- joinWindowSpec.before,
- joinWindowSpec.after,
- joinWindowSpec.retention,
- joinWindowSpec.segments,
+ windows.name() + "-this",
+ windows.before,
+ windows.after,
+ windows.maintainMs(),
+ windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
null);
RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>(
- joinWindowSpec.name + "-2",
- joinWindowSpec.after,
- joinWindowSpec.before,
- joinWindowSpec.retention,
- joinWindowSpec.segments,
+ windows.name() + "-other",
+ windows.before,
+ windows.after,
+ windows.maintainMs(),
+ windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
null);
@@ -333,7 +343,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1, R> KStream<K, R> leftJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
- JoinWindowSpec joinWindowSpec,
+ JoinWindows windows,
Serializer<K> keySerialzier,
Serializer<V1> otherValueSerialzier,
Deserializer<K> keyDeserialier,
@@ -343,11 +353,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>(
- joinWindowSpec.name,
- joinWindowSpec.after,
- joinWindowSpec.before,
- joinWindowSpec.retention,
- joinWindowSpec.segments,
+ windows.name() + "-this",
+ windows.before,
+ windows.after,
+ windows.maintainMs(),
+ windows.segments,
new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
null);
@@ -376,4 +386,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return new KStreamImpl<>(topology, name, allSourceNodes);
}
+ @Override
+ public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<T> aggValueDeserializer) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer) {
+ // TODO
+ return null;
+ }
+
+ public <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer) {
+ // TODO
+ return null;
+ }
+
+ public <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
+ KeyValueMapper<K, V, V1> valueSelector,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V1> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V1> aggValueDeserializer) {
+ // TODO
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9f97958..32d3cc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,15 +19,22 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
+import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
+import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import java.util.Collection;
import java.util.Set;
/**
@@ -271,4 +278,66 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
}
+ @Override
+ public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K> keySerializer,
+ Serializer<V2> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V2> aggValueDeserializer,
+ String name) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
+ KeyValueToLongMapper<K, V> valueSelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
+ KeyValueToIntMapper<K, V> valueSelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
+ KeyValueToDoubleMapper<K, V> valueSelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
+ Serializer<K> keySerializer,
+ Deserializer<K> keyDeserializer,
+ String name) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
+ KeyValueMapper<K, V, K1> keySelector,
+ Serializer<K> keySerializer,
+ Serializer<V1> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V1> aggValueDeserializer,
+ String name) {
+ // TODO
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
new file mode 100644
index 0000000..a6b5149
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class SlidingWindow extends Window {
+
+ public SlidingWindow(long start, long end) {
+ super(start, end);
+ }
+
+ @Override
+ public boolean overlap(Window other) {
+ return super.overlap(other) && other.getClass().equals(SlidingWindow.class);
+ }
+
+ @Override
+ public boolean equalsTo(Window other) {
+ return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class);
+ }
+}