You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/29 23:03:37 UTC
[2/2] kafka git commit: KAFKA-3192: Add unwindowed aggregations for
KStream; and make all example code executable
KAFKA-3192: Add unwindowed aggregations for KStream; and make all example code executable
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda, Michael G. Noll, Jun Rao
Closes #870 from guozhangwang/K3192
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/845c6eae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/845c6eae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/845c6eae
Branch: refs/heads/trunk
Commit: 845c6eae1f6c6bcf117f5baa53bb19b4611c0528
Parents: a731297
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 29 14:03:32 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 29 14:03:32 2016 -0800
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 1 +
.../scala/kafka/tools/ConsoleConsumer.scala | 23 +-
.../examples/pageview/JsonPOJODeserializer.java | 5 -
.../pageview/JsonTimestampExtractor.java | 46 +++
.../examples/pageview/PageViewTypedJob.java | 88 +++++-
.../examples/pageview/PageViewUnTypedJob.java | 87 ++++--
.../kafka/streams/examples/pipe/PipeJob.java | 18 +-
.../examples/wordcount/WordCountJob.java | 48 ++-
.../wordcount/WordCountProcessorJob.java | 23 +-
.../apache/kafka/streams/kstream/KStream.java | 39 +++
.../internals/KStreamAggProcessorSupplier.java | 28 ++
.../kstream/internals/KStreamAggWindow.java | 51 ----
.../kstream/internals/KStreamAggregate.java | 119 ++------
.../streams/kstream/internals/KStreamImpl.java | 100 +++++--
.../kstream/internals/KStreamReduce.java | 124 +++-----
.../internals/KStreamWindowAggregate.java | 171 +++++++++++
.../kstream/internals/KStreamWindowReduce.java | 165 +++++++++++
.../streams/kstream/internals/KTableImpl.java | 10 +-
.../internals/KTableProcessorSupplier.java | 1 -
.../streams/processor/TopologyBuilder.java | 26 ++
.../processor/internals/RecordQueue.java | 8 +
.../internals/StreamPartitionAssignor.java | 2 +-
.../processor/internals/StreamThread.java | 1 -
.../kstream/internals/KStreamAggregateTest.java | 294 -------------------
.../internals/KStreamWindowAggregateTest.java | 294 +++++++++++++++++++
.../kstream/internals/KTableAggregateTest.java | 1 +
.../processor/internals/PartitionGroupTest.java | 4 +-
.../processor/internals/RecordQueueTest.java | 4 +-
.../processor/internals/StreamTaskTest.java | 19 +-
29 files changed, 1168 insertions(+), 632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8e93973..9d24d07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -343,6 +343,7 @@ public class Sender implements Runnable {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
+
return new ClientRequest(now, acks != 0, send, callback);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0ae057f..0d85aca 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -29,6 +29,7 @@ import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.serialization.{Deserializer, ByteArrayDeserializer}
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger
@@ -349,7 +350,12 @@ class DefaultMessageFormatter extends MessageFormatter {
var keySeparator = "\t".getBytes
var lineSeparator = "\n".getBytes
+ var keyDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer()
+ var valDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer()
+
override def init(props: Properties) {
+ System.out.println(props)
+
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
if (props.containsKey("print.key"))
@@ -358,6 +364,19 @@ class DefaultMessageFormatter extends MessageFormatter {
keySeparator = props.getProperty("key.separator").getBytes
if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes
+
+ if (props.containsKey("key.decoder")) {
+ keyDecoder = Class.forName(props.getProperty("key.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]]
+
+ System.out.println("update key decoder")
+ }
+ if (props.containsKey("value.decoder")) {
+ valDecoder = Class.forName(props.getProperty("value.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]]
+
+ System.out.println("update value decoder")
+ }
+ System.out.println(keyDecoder)
+ System.out.println(valDecoder)
}
def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
@@ -369,10 +388,10 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(keySeparator)
}
if (printKey) {
- output.write(if (key == null) "null".getBytes else key)
+ output.write(if (key == null) "null".getBytes else keyDecoder.deserialize(null, key).toString.getBytes)
output.write(keySeparator)
}
- output.write(if (value == null) "null".getBytes else value)
+ output.write(if (value == null) "null".getBytes else valDecoder.deserialize(null, value).toString.getBytes)
output.write(lineSeparator)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
index 583ec2d..5fcd1f3 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
@@ -16,17 +16,12 @@
**/
package org.apache.kafka.streams.examples.pageview;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
-/**
- * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
- * structured data without having associated Java classes. This deserializer also supports Connect schemas.
- */
public class JsonPOJODeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
new file mode 100644
index 0000000..6443193
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+/**
+ * A timestamp extractor implementation that tries to extract event time from
+ * the "timestamp" field in the Json formatted message.
+ */
+public class JsonTimestampExtractor implements TimestampExtractor {
+
+ @Override
+ public long extract(ConsumerRecord<Object, Object> record) {
+ if (record.value() instanceof PageViewTypedJob.PageView) {
+ return ((PageViewTypedJob.PageView) record.value()).timestamp;
+ }
+
+ if (record.value() instanceof PageViewTypedJob.UserProfile) {
+ return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
+ }
+
+ if (record.value() instanceof JsonNode) {
+ return ((JsonNode) record.value()).get("timestamp").longValue();
+ }
+
+ throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 358cbe8..3f9b283 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.examples.pageview;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -30,22 +31,38 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.StreamsConfig;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
public class PageViewTypedJob {
// POJO classes
static public class PageView {
public String user;
public String page;
+ public Long timestamp;
}
static public class UserProfile {
- public String user;
public String region;
+ public Long timestamp;
}
static public class PageViewByRegion {
@@ -66,13 +83,17 @@ public class PageViewTypedJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
@@ -81,26 +102,59 @@ public class PageViewTypedJob {
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();
+ // TODO: the following can be removed with a serialization factory
+ Map<String, Object> serdeProps = new HashMap<>();
+
+ final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
+ serdeProps.put("JsonPOJOClass", PageView.class);
+ pageViewDeserializer.configure(serdeProps, false);
+
+ final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+ serdeProps.put("JsonPOJOClass", UserProfile.class);
+ userProfileDeserializer.configure(serdeProps, false);
- KStream<String, PageView> views = builder.stream("streams-pageview-input");
+ final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", UserProfile.class);
+ userProfileSerializer.configure(serdeProps, false);
- KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record));
+ final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+ wPageViewByRegionSerializer.configure(serdeProps, false);
- KTable<String, UserProfile> users = builder.table("streams-userprofile-input");
+ final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", RegionCount.class);
+ regionCountSerializer.configure(serdeProps, false);
- KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser
- .leftJoin(users, (view, profile) -> {
- PageViewByRegion viewByRegion = new PageViewByRegion();
- viewByRegion.user = view.user;
- viewByRegion.page = view.page;
- viewByRegion.region = profile.region;
+ KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
- return viewByRegion;
+ KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+
+ KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
+ .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+ @Override
+ public PageViewByRegion apply(PageView view, UserProfile profile) {
+ PageViewByRegion viewByRegion = new PageViewByRegion();
+ viewByRegion.user = view.user;
+ viewByRegion.page = view.page;
+
+ if (profile != null) {
+ viewByRegion.region = profile.region;
+ } else {
+ viewByRegion.region = "UNKNOWN";
+ }
+ return viewByRegion;
+ }
+ })
+ .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+ @Override
+ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+ return new KeyValue<>(viewRegion.region, viewRegion);
+ }
})
- .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
+ // TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
@Override
@@ -118,9 +172,15 @@ public class PageViewTypedJob {
});
// write to the result topic
- regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
+ regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
+
+ // usually the streaming job would be ever running,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 2fdfa97..065f5f5 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.examples.pageview;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -31,27 +32,44 @@ import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.Properties;
-public class PageViewUnTypedJob {
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewUntypedJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
@@ -59,47 +77,66 @@ public class PageViewUnTypedJob {
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();
-
-
- KStream<String, JsonNode> views = builder.stream("streams-pageview-input");
-
- KStream<String, JsonNode> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record));
-
- KTable<String, JsonNode> users = builder.table("streams-userprofile-input");
-
- KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
-
- KStream<JsonNode, JsonNode> regionCount = viewsByUser
- .leftJoin(userRegions, (view, region) -> {
- ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
- return (JsonNode) jNode.put("user", view.get("user").textValue())
- .put("page", view.get("page").textValue())
- .put("region", region);
+ final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
+ final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+
+ KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+
+ KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+
+ KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
+ @Override
+ public String apply(JsonNode record) {
+ return record.get("region").textValue();
+ }
+ });
+
+ KStream<JsonNode, JsonNode> regionCount = views
+ .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+ @Override
+ public JsonNode apply(JsonNode view, String region) {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+ return jNode.put("user", view.get("user").textValue())
+ .put("page", view.get("page").textValue())
+ .put("region", region == null ? "UNKNOWN" : region);
+ }
+ })
+ .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+ @Override
+ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+ return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+ }
})
- .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
+ // TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
@Override
public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
- .put("region", key.window().start());
+ .put("region", key.value());
ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
- keyNode.put("count", value);
+ valueNode.put("count", value);
- return new KeyValue<JsonNode, JsonNode>((JsonNode) keyNode, (JsonNode) valueNode);
+ return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
}
});
// write to the result topic
- regionCount.to("streams-pageviewstats-output");
+ regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
+
+ // usually the streaming job would be ever running,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
index 841f37b..9e737ba 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -26,6 +26,16 @@ import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
+/**
+ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
+ * write data to a sink (output) topic.
+ *
+ * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
+ * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
public class PipeJob {
public static void main(String[] args) throws Exception {
@@ -37,7 +47,7 @@ public class PipeJob {
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // can specify underlying client configs if necessary
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
@@ -46,5 +56,11 @@ public class PipeJob {
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
+
+ // usually the streaming job would be ever running,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index b922695..da6b095 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -17,9 +17,6 @@
package org.apache.kafka.streams.examples.wordcount;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
@@ -27,21 +24,29 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
import java.util.Arrays;
import java.util.Properties;
+/**
+ * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
public class WordCountJob {
public static void main(String[] args) throws Exception {
@@ -54,7 +59,7 @@ public class WordCountJob {
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // can specify underlying client configs if necessary
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
@@ -63,11 +68,10 @@ public class WordCountJob {
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();
- final Serializer<JsonNode> JsonSerializer = new JsonSerializer();
KStream<String, String> source = builder.stream("streams-file-input");
- KStream<String, JsonNode> counts = source
+ KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
@@ -79,25 +83,17 @@ public class WordCountJob {
return new KeyValue<String, String>(value, value);
}
})
- .countByKey(UnlimitedWindows.of("Counts").startOn(0L),
- stringSerializer, longSerializer,
- stringDeserializer, longDeserializer)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<String, JsonNode>>() {
- @Override
- public KeyValue<String, JsonNode> apply(Windowed<String> key, Long value) {
- ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+ .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");
- jNode.put("word", key.value())
- .put("count", value);
-
- return new KeyValue<String, JsonNode>(null, jNode);
- }
- });
-
- counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
+ counts.to("streams-wordcount-output", stringSerializer, longSerializer);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
+
+ // usually the streaming job would be ever running,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
index 63692bd..61e8335 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
@@ -33,6 +33,17 @@ import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
+/**
+ * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
public class WordCountProcessorJob {
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
@@ -72,7 +83,7 @@ public class WordCountProcessorJob {
public void punctuate(long timestamp) {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
- System.out.println("----------- " + timestamp + "----------- ");
+ System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
@@ -103,7 +114,7 @@ public class WordCountProcessorJob {
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // can specify underlying client configs if necessary
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
@@ -113,9 +124,15 @@ public class WordCountProcessorJob {
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
- builder.addSink("Sink", "streams-wordcount-output", "Process");
+ builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
+
+ // usually the streaming job would be ever running,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index b83298f..231eb22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -283,6 +283,18 @@ public interface KStream<K, V> {
/**
* Aggregate values of this stream by key on a window basis.
*
+ * @param reducer the class of Reducer
+ */
+ KTable<K, V> reduceByKey(Reducer<V> reducer,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer,
+ String name);
+
+ /**
+ * Aggregate values of this stream by key on a window basis.
+ *
* @param initializer the class of Initializer
* @param aggregator the class of Aggregator
* @param windows the specification of the aggregation window
@@ -297,6 +309,22 @@ public interface KStream<K, V> {
Deserializer<T> aggValueDeserializer);
/**
+ * Aggregate values of this stream by key without a window basis, and hence
+ * return an ever updating table
+ *
+ * @param initializer the class of Initializer
+ * @param aggregator the class of Aggregator
+ * @param <T> the value type of the aggregated table
+ */
+ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
+ Aggregator<K, V, T> aggregator,
+ Serializer<K> keySerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<T> aggValueDeserializer,
+ String name);
+
+ /**
* Count number of messages of this stream by key on a window basis.
*
* @param windows the specification of the aggregation window
@@ -306,4 +334,15 @@ public interface KStream<K, V> {
Serializer<Long> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<Long> aggValueDeserializer);
+
+ /**
+ * Count number of messages of this stream by key without a window basis, and hence
+ * return a ever updating counting table
+ *
+ */
+ KTable<K, Long> countByKey(Serializer<K> keySerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<Long> aggValueDeserializer,
+ String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
new file mode 100644
index 0000000..deb98ed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public interface KStreamAggProcessorSupplier<K, RK, V, T> extends ProcessorSupplier<K, V> {
+
+ KTableValueGetterSupplier<RK, T> view();
+
+ void enableSendingOldValues();
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
deleted file mode 100644
index f02f53a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamAggWindow<K, V> implements ProcessorSupplier<K, V> {
-
- @Override
- public Processor<K, V> get() {
- return new KStreamAggWindowProcessor();
- }
-
- private class KStreamAggWindowProcessor extends AbstractProcessor<K, V> {
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- }
-
- @Override
- public void process(K key, V value) {
- // create a dummy window just for wrapping the timestamp
- long timestamp = context().timestamp();
-
- // send the new aggregate value
- context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b64277c..f41bfa6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -18,38 +18,28 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
-import java.util.Iterator;
-import java.util.Map;
-
-public class KStreamAggregate<K, V, T, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, T> {
+public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
private final String storeName;
- private final Windows<W> windows;
private final Initializer<T> initializer;
private final Aggregator<K, V, T> aggregator;
private boolean sendOldValues = false;
- public KStreamAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
- this.windows = windows;
+ public KStreamAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
this.storeName = storeName;
this.initializer = initializer;
this.aggregator = aggregator;
}
@Override
- public Processor<Windowed<K>, Change<V>> get() {
+ public Processor<K, V> get() {
return new KStreamAggregateProcessor();
}
@@ -58,117 +48,68 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
sendOldValues = true;
}
- private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+ private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
- private WindowStore<K, T> windowStore;
+ private KeyValueStore<K, T> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
- windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+ store = (KeyValueStore<K, T>) context.getStateStore(storeName);
}
@Override
- public void process(Windowed<K> windowedKey, Change<V> change) {
- // first get the matching windows
- long timestamp = windowedKey.window().start();
- K key = windowedKey.value();
- V value = change.newValue;
-
- Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
- long timeFrom = Long.MAX_VALUE;
- long timeTo = Long.MIN_VALUE;
-
- // use range query on window store for efficient reads
- for (long windowStartMs : matchedWindows.keySet()) {
- timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
- timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
- }
-
- WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
-
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue<Long, T> entry = iter.next();
- W window = matchedWindows.get(entry.key);
-
- if (window != null) {
-
- T oldAgg = entry.value;
+ public void process(K key, V value) {
+ T oldAgg = store.get(key);
- if (oldAgg == null)
- oldAgg = initializer.apply();
+ if (oldAgg == null)
+ oldAgg = initializer.apply();
- // try to add the new new value (there will never be old value)
- T newAgg = aggregator.apply(key, value, oldAgg);
+ T newAgg = oldAgg;
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
-
- // forward the aggregated change pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
-
- matchedWindows.remove(entry.key);
- }
+ // try to add the new new value
+ if (value != null) {
+ newAgg = aggregator.apply(key, value, newAgg);
}
- iter.close();
-
- // create the new window for the rest of unmatched window that do not exist yet
- for (long windowStartMs : matchedWindows.keySet()) {
- T oldAgg = initializer.apply();
- T newAgg = aggregator.apply(key, value, oldAgg);
+ // update the store with the new value
+ store.put(key, newAgg);
- windowStore.put(key, newAgg, windowStartMs);
-
- // send the new aggregate pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
- }
+ // send the old / new pair
+ if (sendOldValues)
+ context().forward(key, new Change<>(newAgg, oldAgg));
+ else
+ context().forward(key, new Change<>(newAgg, null));
}
}
@Override
- public KTableValueGetterSupplier<Windowed<K>, T> view() {
+ public KTableValueGetterSupplier<K, T> view() {
- return new KTableValueGetterSupplier<Windowed<K>, T>() {
+ return new KTableValueGetterSupplier<K, T>() {
- public KTableValueGetter<Windowed<K>, T> get() {
+ public KTableValueGetter<K, T> get() {
return new KStreamAggregateValueGetter();
}
};
}
- private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+ private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
- private WindowStore<K, T> windowStore;
+ private KeyValueStore<K, T> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
- windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+ store = (KeyValueStore<K, T>) context.getStateStore(storeName);
}
- @SuppressWarnings("unchecked")
@Override
- public T get(Windowed<K> windowedKey) {
- K key = windowedKey.value();
- W window = (W) windowedKey.window();
-
- // this iterator should contain at most one element
- Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
-
- return iter.hasNext() ? iter.next().value : null;
+ public T get(K key) {
+ return store.get(key);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79a3115..9f384ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -385,6 +385,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String name = topology.newName(LEFTJOIN_NAME);
topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+ topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes);
}
@@ -397,15 +398,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<K> keyDeserializer,
Deserializer<V> aggValueDeserializer) {
- // TODO: this agg window operator is only used for casting K to Windowed<K> for
- // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
String reduceName = topology.newName(REDUCE_NAME);
- String selectName = topology.newName(SELECT_NAME);
- ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
- ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
+ KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer);
- StateStoreSupplier aggregateStore = Stores.create(windows.name())
+ StateStoreSupplier reduceStore = Stores.create(windows.name())
.withKeys(keySerializer, keyDeserializer)
.withValues(aggValueSerializer, aggValueDeserializer)
.persistent()
@@ -413,12 +410,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
.build();
// aggregate the values with the aggregator and local store
- topology.addProcessor(selectName, aggWindowSupplier, this.name);
- topology.addProcessor(reduceName, aggregateSupplier, selectName);
- topology.addStateStore(aggregateStore, reduceName);
+ topology.addProcessor(reduceName, reduceSupplier, this.name);
+ topology.addStateStore(reduceStore, reduceName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
+ }
+
+ @Override
+ public KTable<K, V> reduceByKey(Reducer<V> reducer,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer,
+ String name) {
+
+ String reduceName = topology.newName(REDUCE_NAME);
+
+ KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer);
+
+ StateStoreSupplier reduceStore = Stores.create(name)
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(aggValueSerializer, aggValueDeserializer)
+ .persistent()
+ .build();
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(reduceName, reduceSupplier, this.name);
+ topology.addStateStore(reduceStore, reduceName);
// return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes);
+ return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
}
@Override
@@ -430,13 +452,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer) {
- // TODO: this agg window operator is only used for casting K to Windowed<K> for
- // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
String aggregateName = topology.newName(AGGREGATE_NAME);
- String selectName = topology.newName(SELECT_NAME);
- ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
- ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), initializer, aggregator);
+ KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
StateStoreSupplier aggregateStore = Stores.create(windows.name())
.withKeys(keySerializer, keyDeserializer)
@@ -446,8 +464,34 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
.build();
// aggregate the values with the aggregator and local store
- topology.addProcessor(selectName, aggWindowSupplier, this.name);
- topology.addProcessor(aggregateName, aggregateSupplier, selectName);
+ topology.addProcessor(aggregateName, aggregateSupplier, this.name);
+ topology.addStateStore(aggregateStore, aggregateName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<Windowed<K>, T, T>(topology, aggregateName, aggregateSupplier, sourceNodes);
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
+ Aggregator<K, V, T> aggregator,
+ Serializer<K> keySerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<T> aggValueDeserializer,
+ String name) {
+
+ String aggregateName = topology.newName(AGGREGATE_NAME);
+
+ KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator);
+
+ StateStoreSupplier aggregateStore = Stores.create(name)
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(aggValueSerializer, aggValueDeserializer)
+ .persistent()
+ .build();
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(aggregateName, aggregateSupplier, this.name);
topology.addStateStore(aggregateStore, aggregateName);
// return the KTable representation with the intermediate topic as the sources
@@ -474,4 +518,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
}, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer);
}
+
+ @Override
+ public KTable<K, Long> countByKey(Serializer<K> keySerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<Long> aggValueDeserializer,
+ String name) {
+
+ return this.aggregateByKey(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate + 1L;
+ }
+ }, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer, name);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index c484c7b..0ec0465 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,37 +17,27 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
-import java.util.Iterator;
-import java.util.Map;
-
-public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, V> {
+public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
private final String storeName;
- private final Windows<W> windows;
private final Reducer<V> reducer;
private boolean sendOldValues = false;
- public KStreamReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
- this.windows = windows;
+ public KStreamReduce(String storeName, Reducer<V> reducer) {
this.storeName = storeName;
this.reducer = reducer;
}
@Override
- public Processor<Windowed<K>, Change<V>> get() {
- return new KStreamAggregateProcessor();
+ public Processor<K, V> get() {
+ return new KStreamReduceProcessor();
}
@Override
@@ -55,113 +45,69 @@ public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSup
sendOldValues = true;
}
- private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+ private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
- private WindowStore<K, V> windowStore;
+ private KeyValueStore<K, V> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
- windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ store = (KeyValueStore<K, V>) context.getStateStore(storeName);
}
@Override
- public void process(Windowed<K> windowedKey, Change<V> change) {
- // first get the matching windows
- long timestamp = windowedKey.window().start();
- K key = windowedKey.value();
- V value = change.newValue;
-
- Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
- long timeFrom = Long.MAX_VALUE;
- long timeTo = Long.MIN_VALUE;
-
- // use range query on window store for efficient reads
- for (long windowStartMs : matchedWindows.keySet()) {
- timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
- timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
- }
-
- WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
-
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue<Long, V> entry = iter.next();
- W window = matchedWindows.get(entry.key);
-
- if (window != null) {
-
- V oldAgg = entry.value;
- V newAgg = oldAgg;
-
- // try to add the new new value (there will never be old value)
- if (newAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(newAgg, value);
- }
-
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
-
- // forward the aggregated change pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
-
- matchedWindows.remove(entry.key);
+ public void process(K key, V value) {
+ V oldAgg = store.get(key);
+ V newAgg = oldAgg;
+
+ // try to add the new new value
+ if (value != null) {
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
}
}
- iter.close();
+ // update the store with the new value
+ store.put(key, newAgg);
- // create the new window for the rest of unmatched window that do not exist yet
- for (long windowStartMs : matchedWindows.keySet()) {
- windowStore.put(key, value, windowStartMs);
-
- // send the new aggregate pair (there will be no old value)
- context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
- }
+ // send the old / new pair
+ if (sendOldValues)
+ context().forward(key, new Change<>(newAgg, oldAgg));
+ else
+ context().forward(key, new Change<>(newAgg, null));
}
}
@Override
- public KTableValueGetterSupplier<Windowed<K>, V> view() {
+ public KTableValueGetterSupplier<K, V> view() {
- return new KTableValueGetterSupplier<Windowed<K>, V>() {
+ return new KTableValueGetterSupplier<K, V>() {
- public KTableValueGetter<Windowed<K>, V> get() {
- return new KStreamAggregateValueGetter();
+ public KTableValueGetter<K, V> get() {
+ return new KStreamReduceValueGetter();
}
};
}
- private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+ private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
- private WindowStore<K, V> windowStore;
+ private KeyValueStore<K, V> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
- windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ store = (KeyValueStore<K, V>) context.getStateStore(storeName);
}
- @SuppressWarnings("unchecked")
@Override
- public V get(Windowed<K> windowedKey) {
- K key = windowedKey.value();
- W window = (W) windowedKey.window();
-
- // this iterator should only contain one element
- Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
-
- return iter.next().value;
+ public V get(K key) {
+ return store.get(key);
}
-
}
}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
new file mode 100644
index 0000000..76964f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+
+ private final String storeName;
+ private final Windows<W> windows;
+ private final Initializer<T> initializer;
+ private final Aggregator<K, V, T> aggregator;
+
+ private boolean sendOldValues = false;
+
+ public KStreamWindowAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.initializer = initializer;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamWindowAggregateProcessor();
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
+
+ private WindowStore<K, T> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ // first get the matching windows
+ long timestamp = context().timestamp();
+ Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+ long timeFrom = Long.MAX_VALUE;
+ long timeTo = Long.MIN_VALUE;
+
+ // use range query on window store for efficient reads
+ for (long windowStartMs : matchedWindows.keySet()) {
+ timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+ timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+ }
+
+ WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, T> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
+
+ if (window != null) {
+
+ T oldAgg = entry.value;
+
+ if (oldAgg == null)
+ oldAgg = initializer.apply();
+
+ // try to add the new new value (there will never be old value)
+ T newAgg = aggregator.apply(key, value, oldAgg);
+
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
+
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+ matchedWindows.remove(entry.key);
+ }
+ }
+
+ iter.close();
+
+ // create the new window for the rest of unmatched window that do not exist yet
+ for (long windowStartMs : matchedWindows.keySet()) {
+ T oldAgg = initializer.apply();
+ T newAgg = aggregator.apply(key, value, oldAgg);
+
+ windowStore.put(key, newAgg, windowStartMs);
+
+ // send the new aggregate pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
+ }
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<Windowed<K>, T> view() {
+
+ return new KTableValueGetterSupplier<Windowed<K>, T>() {
+
+ public KTableValueGetter<Windowed<K>, T> get() {
+ return new KStreamWindowAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+
+ private WindowStore<K, T> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T get(Windowed<K> windowedKey) {
+ K key = windowedKey.value();
+ W window = (W) windowedKey.window();
+
+ // this iterator should contain at most one element
+ Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
+
+ return iter.hasNext() ? iter.next().value : null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
new file mode 100644
index 0000000..d532e79
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
+
+ private final String storeName;
+ private final Windows<W> windows;
+ private final Reducer<V> reducer;
+
+ private boolean sendOldValues = false;
+
+ public KStreamWindowReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.reducer = reducer;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamWindowReduceProcessor();
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamWindowReduceProcessor extends AbstractProcessor<K, V> {
+
+ private WindowStore<K, V> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ // first get the matching windows
+ long timestamp = context().timestamp();
+
+ Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+ long timeFrom = Long.MAX_VALUE;
+ long timeTo = Long.MIN_VALUE;
+
+ // use range query on window store for efficient reads
+ for (long windowStartMs : matchedWindows.keySet()) {
+ timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+ timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+ }
+
+ WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, V> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
+
+ if (window != null) {
+
+ V oldAgg = entry.value;
+ V newAgg = oldAgg;
+
+ // try to add the new new value (there will never be old value)
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
+ }
+
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
+
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+ matchedWindows.remove(entry.key);
+ }
+ }
+
+ iter.close();
+
+ // create the new window for the rest of unmatched window that do not exist yet
+ for (long windowStartMs : matchedWindows.keySet()) {
+ windowStore.put(key, value, windowStartMs);
+
+ // send the new aggregate pair (there will be no old value)
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
+ }
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<Windowed<K>, V> view() {
+
+ return new KTableValueGetterSupplier<Windowed<K>, V>() {
+
+ public KTableValueGetter<Windowed<K>, V> get() {
+ return new KStreamAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+
+ private WindowStore<K, V> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public V get(Windowed<K> windowedKey) {
+ K key = windowedKey.value();
+ W window = (W) windowedKey.window();
+
+ // this iterator should only contain one element
+ Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
+
+ return iter.next().value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index fa4cd93..b82582b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -76,7 +76,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
- public final ProcessorSupplier<K, ?> processorSupplier;
+ public final ProcessorSupplier<?, ?> processorSupplier;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
@@ -87,14 +87,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public KTableImpl(KStreamBuilder topology,
String name,
- ProcessorSupplier<K, ?> processorSupplier,
+ ProcessorSupplier<?, ?> processorSupplier,
Set<String> sourceNodes) {
this(topology, name, processorSupplier, sourceNodes, null, null, null, null);
}
public KTableImpl(KStreamBuilder topology,
String name,
- ProcessorSupplier<K, ?> processorSupplier,
+ ProcessorSupplier<?, ?> processorSupplier,
Set<String> sourceNodes,
Serializer<K> keySerializer,
Serializer<V> valSerializer,
@@ -389,6 +389,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
return new KTableSourceValueGetterSupplier<>(source.topic);
+ } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
+ return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
}
@@ -401,6 +403,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
source.enableSendingOldValues();
+ } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
+ ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
} else {
((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index d647b72..df03280 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -24,5 +24,4 @@ public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, C
KTableValueGetterSupplier<K, T> view();
void enableSendingOldValues();
-
}