You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/23 00:25:44 UTC
[2/2] kafka git commit: KAFKA-3066: Demo Examples for Kafka Streams
KAFKA-3066: Demo Examples for Kafka Streams
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #797 from guozhangwang/K3066
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c197113a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c197113a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c197113a
Branch: refs/heads/trunk
Commit: c197113a9c04e2f6c2d1a72161c0d40d5804490e
Parents: a19729f
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jan 22 15:25:24 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Jan 22 15:25:24 2016 -0800
----------------------------------------------------------------------
build.gradle | 25 ++++
settings.gradle | 2 +-
.../examples/pageview/JsonPOJODeserializer.java | 66 ++++++++++
.../examples/pageview/JsonPOJOSerializer.java | 60 +++++++++
.../examples/pageview/PageViewTypedJob.java | 127 +++++++++++++++++++
.../examples/pageview/PageViewUnTypedJob.java | 107 ++++++++++++++++
.../kafka/streams/examples/pipe/PipeJob.java | 50 ++++++++
.../examples/wordcount/WordCountJob.java | 103 +++++++++++++++
.../wordcount/WordCountProcessorJob.java | 121 ++++++++++++++++++
.../org/apache/kafka/streams/StreamsConfig.java | 6 +-
.../kafka/streams/examples/KStreamJob.java | 84 ------------
.../kafka/streams/examples/ProcessorJob.java | 115 -----------------
.../examples/WallclockTimestampExtractor.java | 28 ----
.../org/apache/kafka/streams/kstream/Count.java | 6 +-
.../streams/kstream/KeyValueToDoubleMapper.java | 23 ----
.../streams/kstream/KeyValueToIntMapper.java | 23 ----
.../streams/kstream/KeyValueToLongMapper.java | 23 ----
.../kafka/streams/kstream/TumblingWindows.java | 8 +-
.../kafka/streams/kstream/UnlimitedWindows.java | 10 +-
.../streams/kstream/internals/KStreamImpl.java | 8 +-
.../internals/WindowedStreamPartitioner.java | 52 ++++++++
.../internals/WindowedStreamsPartitioner.java | 52 --------
.../streams/processor/StreamPartitioner.java | 59 +++++++++
.../streams/processor/StreamsPartitioner.java | 59 ---------
.../streams/processor/TopologyBuilder.java | 26 ++--
.../processor/internals/AbstractTask.java | 3 +-
.../processor/internals/RecordCollector.java | 4 +-
.../streams/processor/internals/SinkNode.java | 6 +-
.../processor/internals/StreamThread.java | 20 ++-
.../internals/WallclockTimestampExtractor.java | 28 ++++
.../apache/kafka/streams/StreamsConfigTest.java | 2 +-
.../WindowedStreamPartitionerTest.java | 84 ++++++++++++
.../WindowedStreamsPartitionerTest.java | 84 ------------
.../internals/ProcessorTopologyTest.java | 6 +-
.../processor/internals/StandbyTaskTest.java | 6 +-
.../processor/internals/StreamThreadTest.java | 32 +++--
.../streams/state/KeyValueStoreTestDriver.java | 4 +-
.../apache/kafka/test/KStreamTestDriver.java | 4 +-
38 files changed, 972 insertions(+), 554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 150cac7..0b1dcc4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -514,6 +514,7 @@ project(':streams') {
dependencies {
compile project(':clients')
+ compile project(':connect:json') // this dependency should be removed after we unify data API
compile libs.slf4jlog4j
compile libs.rocksDBJni
compile libs.zkclient // this dependency should be removed after KIP-4
@@ -542,6 +543,30 @@ project(':streams') {
}
}
+project(':streams:examples') {
+ archivesBaseName = "kafka-streams-examples"
+
+ dependencies {
+ compile project(':streams')
+ compile project(':connect:json') // this dependency should be removed after we unify data API
+ }
+
+ javadoc {
+ enabled = false
+ }
+
+ tasks.create(name: "copyDependantLibs", type: Copy) {
+ from (configurations.runtime) {
+ exclude('kafka-streams*')
+ }
+ into "$buildDir/dependant-libs-${versions.scala}"
+ }
+
+ jar {
+ dependsOn 'copyDependantLibs'
+ }
+}
+
project(':log4j-appender') {
archivesBaseName = "kafka-log4j-appender"
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 097c43b..d430c2f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
new file mode 100644
index 0000000..583ec2d
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+
+/**
+ * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
+ * structured data without having associated Java classes. This deserializer also supports Connect schemas.
+ */
+public class JsonPOJODeserializer<T> implements Deserializer<T> {
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ private Class<T> tClass;
+
+ /**
+ * Default constructor needed by Kafka
+ */
+ public JsonPOJODeserializer() {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> props, boolean isKey) {
+ tClass = (Class<T>) props.get("JsonPOJOClass");
+ }
+
+ @Override
+ public T deserialize(String topic, byte[] bytes) {
+ if (bytes == null)
+ return null;
+
+ T data;
+ try {
+ data = objectMapper.readValue(bytes, tClass);
+ } catch (Exception e) {
+ throw new SerializationException(e);
+ }
+
+ return data;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
new file mode 100644
index 0000000..bb60327
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.streams.examples.pageview;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+public class JsonPOJOSerializer<T> implements Serializer<T> {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private Class<T> tClass;
+
+ /**
+ * Default constructor needed by Kafka
+ */
+ public JsonPOJOSerializer() {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> props, boolean isKey) {
+ tClass = (Class<T>) props.get("JsonPOJOClass");
+ }
+
+ @Override
+ public byte[] serialize(String topic, T data) {
+ if (data == null)
+ return null;
+
+ try {
+ return objectMapper.writeValueAsBytes(data);
+ } catch (Exception e) {
+ throw new SerializationException("Error serializing JSON message", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
new file mode 100644
index 0000000..c064848
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+public class PageViewTypedJob {
+
+ // POJO classes
+ static public class PageView {
+ public String user;
+ public String page;
+ }
+
+ static public class UserProfile {
+ public String user;
+ public String region;
+ }
+
+ static public class PageViewByRegion {
+ public String user;
+ public String page;
+ public String region;
+ }
+
+ static public class WindowedPageViewByRegion {
+ public long windowStart;
+ public String region;
+ }
+
+ static public class RegionCount {
+ public long count;
+ public String region;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+ final Serializer<Long> longSerializer = new LongSerializer();
+ final Deserializer<Long> longDeserializer = new LongDeserializer();
+
+
+ KStream<String, PageView> views = builder.stream("streams-pageview-input");
+
+ KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record));
+
+ KTable<String, UserProfile> users = builder.table("streams-userprofile-input");
+
+ KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser
+ .leftJoin(users, (view, profile) -> {
+ PageViewByRegion viewByRegion = new PageViewByRegion();
+ viewByRegion.user = view.user;
+ viewByRegion.page = view.page;
+ viewByRegion.region = profile.region;
+
+ return viewByRegion;
+ })
+ .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
+ .aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+ stringSerializer, longSerializer,
+ stringDeserializer, longDeserializer)
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+ @Override
+ public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+ WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+ wViewByRegion.windowStart = key.window().start();
+ wViewByRegion.region = key.value();
+
+ RegionCount rCount = new RegionCount();
+ rCount.region = key.value();
+ rCount.count = value;
+
+ return new KeyValue<>(wViewByRegion, rCount);
+ }
+ });
+
+ // write to the result topic
+ regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
+
+ KafkaStreams kstream = new KafkaStreams(builder, props);
+ kstream.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
new file mode 100644
index 0000000..1ae02c9
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+public class PageViewUnTypedJob {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+
+ StreamsConfig config = new StreamsConfig(props);
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+ final Serializer<Long> longSerializer = new LongSerializer();
+ final Deserializer<Long> longDeserializer = new LongDeserializer();
+
+
+ KStream<String, JsonNode> views = builder.stream("streams-pageview-input");
+
+ KStream<String, JsonNode> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record));
+
+ KTable<String, JsonNode> users = builder.table("streams-userprofile-input");
+
+ KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
+
+ KStream<JsonNode, JsonNode> regionCount = viewsByUser
+ .leftJoin(userRegions, (view, region) -> {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+ return (JsonNode) jNode.put("user", view.get("user").textValue())
+ .put("page", view.get("page").textValue())
+ .put("region", region);
+ })
+ .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
+ .aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+ stringSerializer, longSerializer,
+ stringDeserializer, longDeserializer)
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+ @Override
+ public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+ ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+ keyNode.put("window-start", key.window().start())
+ .put("region", key.window().start());
+
+ ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+ keyNode.put("count", value);
+
+ return new KeyValue<JsonNode, JsonNode>((JsonNode) keyNode, (JsonNode) valueNode);
+ }
+ });
+
+ // write to the result topic
+ regionCount.to("streams-pageviewstats-output");
+
+ KafkaStreams kstream = new KafkaStreams(builder, config);
+ kstream.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
new file mode 100644
index 0000000..4a4f97f
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pipe;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+
+public class PipeJob {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ // can specify underlying client configs if necessary
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ builder.stream("streams-file-input").to("streams-pipe-output");
+
+ KafkaStreams kstream = new KafkaStreams(builder, props);
+ kstream.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
new file mode 100644
index 0000000..8aa15a4
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class WordCountJob {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ // can specify underlying client configs if necessary
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+ final Serializer<Long> longSerializer = new LongSerializer();
+ final Deserializer<Long> longDeserializer = new LongDeserializer();
+ final Serializer<JsonNode> JsonSerializer = new JsonSerializer();
+
+ KStream<String, String> source = builder.stream("streams-file-input");
+
+ KStream<String, JsonNode> counts = source
+ .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(String value) {
+ return Arrays.asList(value.toLowerCase().split(" "));
+ }
+ }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+ @Override
+ public KeyValue<String, String> apply(String key, String value) {
+ return new KeyValue<String, String>(value, value);
+ }
+ })
+ .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L),
+ stringSerializer, longSerializer,
+ stringDeserializer, longDeserializer)
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<String, JsonNode>>() {
+ @Override
+ public KeyValue<String, JsonNode> apply(Windowed<String> key, Long value) {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+ jNode.put("word", key.value())
+ .put("count", value);
+
+ return new KeyValue<String, JsonNode>(null, jNode);
+ }
+ });
+
+ counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
+
+ KafkaStreams kstream = new KafkaStreams(builder, props);
+ kstream.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
new file mode 100644
index 0000000..63692bd
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Properties;
+
+public class WordCountProcessorJob {
+
+ private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+
+ @Override
+ public Processor<String, String> get() {
+ return new Processor<String, String>() {
+ private ProcessorContext context;
+ private KeyValueStore<String, Integer> kvStore;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void init(ProcessorContext context) {
+ this.context = context;
+ this.context.schedule(1000);
+ this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+ }
+
+ @Override
+ public void process(String dummy, String line) {
+ String words[] = line.toLowerCase().split(" ");
+
+ for (String word : words) {
+ Integer oldValue = this.kvStore.get(word);
+
+ if (oldValue == null) {
+ this.kvStore.put(word, 1);
+ } else {
+ this.kvStore.put(word, oldValue + 1);
+ }
+ }
+
+ context.commit();
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+ System.out.println("----------- " + timestamp + "----------- ");
+
+ while (iter.hasNext()) {
+ KeyValue<String, Integer> entry = iter.next();
+
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+ context.forward(entry.key, entry.value.toString());
+ }
+
+ iter.close();
+ }
+
+ @Override
+ public void close() {
+ this.kvStore.close();
+ }
+ };
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ // can specify underlying client configs if necessary
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("Source", "streams-file-input");
+
+ builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+ builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+
+ builder.addSink("Sink", "streams-wordcount-output", "Process");
+
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 3843b1d..16bb06a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -113,7 +113,7 @@ public class StreamsConfig extends AbstractConfig {
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
- private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+ private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
static {
CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value
@@ -136,8 +136,8 @@ public class StreamsConfig extends AbstractConfig {
StreamsConfig.ZOOKEEPER_CONNECT_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
- SYSTEM_TEMP_DIRECTORY,
- Importance.HIGH,
+ "/tmp/kafka-streams",
+ Importance.MEDIUM,
STATE_DIR_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value
Type.CLASS,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
deleted file mode 100644
index a234395..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-
-import java.util.Properties;
-
-public class KStreamJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamsConfig config = new StreamsConfig(props);
-
- KStreamBuilder builder = new KStreamBuilder();
-
- KStream<String, String> stream1 = builder.stream("topic1");
-
- KStream<String, Integer> stream2 =
- stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(String key, String value) {
- return new KeyValue<>(key, new Integer(value));
- }
- }).filter(new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return true;
- }
- });
-
- KStream<String, Integer>[] streams = stream2.branch(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- },
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return true;
- }
- }
- );
-
- streams[0].to("topic2");
- streams[1].to("topic3");
-
- KafkaStreams kstream = new KafkaStreams(builder, config);
- kstream.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
deleted file mode 100644
index e17c16b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-public class ProcessorJob {
-
- private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
- @Override
- public Processor<String, String> get() {
- return new Processor<String, String>() {
- private ProcessorContext context;
- private KeyValueStore<String, Integer> kvStore;
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
- this.context = context;
- this.context.schedule(1000);
- this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE");
- }
-
- @Override
- public void process(String key, String value) {
- Integer oldValue = this.kvStore.get(key);
- Integer newValue = Integer.parseInt(value);
- if (oldValue == null) {
- this.kvStore.put(key, newValue);
- } else {
- this.kvStore.put(key, oldValue + newValue);
- }
-
- context.commit();
- }
-
- @Override
- public void punctuate(long timestamp) {
- KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
- while (iter.hasNext()) {
- KeyValue<String, Integer> entry = iter.next();
-
- System.out.println("[" + entry.key + ", " + entry.value + "]");
-
- context.forward(entry.key, entry.value);
- }
-
- iter.close();
- }
-
- @Override
- public void close() {
- this.kvStore.close();
- }
- };
- }
- }
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamsConfig config = new StreamsConfig(props);
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
-
- builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
- builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS");
-
- builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
-
- KafkaStreams streams = new KafkaStreams(builder, config);
- streams.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
deleted file mode 100644
index 26281d6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-public class WallclockTimestampExtractor implements TimestampExtractor {
- @Override
- public long extract(ConsumerRecord<Object, Object> record) {
- return System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
index 3c1ed46..8780cc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream;
-public class Count<K> implements Aggregator<K, Long, Long> {
+public class Count<K, V> implements Aggregator<K, V, Long> {
@Override
public Long initialValue(K aggKey) {
@@ -25,12 +25,12 @@ public class Count<K> implements Aggregator<K, Long, Long> {
}
@Override
- public Long add(K aggKey, Long value, Long aggregate) {
+ public Long add(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
@Override
- public Long remove(K aggKey, Long value, Long aggregate) {
+ public Long remove(K aggKey, V value, Long aggregate) {
return aggregate - 1L;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
deleted file mode 100644
index ae3b858..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueToDoubleMapper<K, V> {
-
- double apply(K key, V value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
deleted file mode 100644
index 72e5ee9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueToIntMapper<K, V> {
-
- int apply(K key, V value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
deleted file mode 100644
index 3a8d8a8..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueToLongMapper<K, V> {
-
- long apply(K key, V value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
index 02ece3a..188fe66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TumblingWindow;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
public class TumblingWindows extends Windows<TumblingWindow> {
@@ -53,7 +53,11 @@ public class TumblingWindows extends Windows<TumblingWindow> {
public Map<Long, TumblingWindow> windowsFor(long timestamp) {
long windowStart = timestamp - timestamp % size;
- return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size));
+ // we cannot use Collections.singleMap since it does not support remove() call
+ Map<Long, TumblingWindow> windows = new HashMap<>();
+ windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size));
+
+ return windows;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 6f47253..06882b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
public class UnlimitedWindows extends Windows<UnlimitedWindow> {
@@ -48,7 +48,13 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
@Override
public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
// always return the single unlimited window
- return Collections.singletonMap(start, new UnlimitedWindow(start));
+
+ // we cannot use Collections.singleMap since it does not support remove() call
+ Map<Long, UnlimitedWindow> windows = new HashMap<>();
+ windows.put(start, new UnlimitedWindow(start));
+
+
+ return windows;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 98e50c3..7ebc28c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
@@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
String name = topology.newName(SINK_NAME);
- StreamsPartitioner<K, V> streamsPartitioner = null;
+ StreamPartitioner<K, V> streamPartitioner = null;
if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
- streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
+ streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
}
- topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
+ topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
new file mode 100644
index 0000000..10e69cc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
+
+ private final WindowedSerializer<K> serializer;
+
+ public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
+ * and the current number of partitions. The partition number id determined by the original key of the windowed key
+ * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
+ *
+ * @param windowedKey the key of the message
+ * @param value the value of the message
+ * @param numPartitions the total number of partitions
+ * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+ */
+ public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+ byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+ // hash the keyBytes to choose a partition
+ return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ }
+
+ private static int toPositive(int number) {
+ return number & 0x7fffffff;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
deleted file mode 100644
index ff1fa2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
-
-public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
-
- private final WindowedSerializer<K> serializer;
-
- public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
- this.serializer = serializer;
- }
-
- /**
- * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
- * and the current number of partitions. The partition number id determined by the original key of the windowed key
- * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
- *
- * @param windowedKey the key of the message
- * @param value the value of the message
- * @param numPartitions the total number of partitions
- * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
- */
- public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
- byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
-
- // hash the keyBytes to choose a partition
- return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
-
- private static int toPositive(int number) {
- return number & 0x7fffffff;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
new file mode 100644
index 0000000..f14d9d9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ * org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
+ */
+public interface StreamPartitioner<K, V> {
+
+ /**
+ * Determine the partition number for a message with the given key and value and the current number of partitions.
+ *
+ * @param key the key of the message
+ * @param value the value of the message
+ * @param numPartitions the total number of partitions
+ * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+ */
+ Integer partition(K key, V value, int numPartitions);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
deleted file mode 100644
index f8d199d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
- * <p>
- * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
- * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
- * <p>
- * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
- * automatically manage these instances, and adjust when new topology instances are added or removed.
- * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
- * <p>
- * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
- * for that topic.
- * <p>
- * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
- * org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
- */
-public interface StreamsPartitioner<K, V> {
-
- /**
- * Determine the partition number for a message with the given key and value and the current number of partitions.
- *
- * @param key the key of the message
- * @param value the value of the message
- * @param numPartitions the total number of partitions
- * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
- */
- Integer partition(K key, V value, int numPartitions);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index f4e6821..a6b54b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -135,9 +135,9 @@ public class TopologyBuilder {
public final String topic;
private Serializer keySerializer;
private Serializer valSerializer;
- private final StreamsPartitioner partitioner;
+ private final StreamPartitioner partitioner;
- private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
+ private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
super(name);
this.parents = parents.clone();
this.topic = topic;
@@ -245,9 +245,9 @@ public class TopologyBuilder {
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, StreamsPartitioner, String...)
+ * @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@@ -260,7 +260,7 @@ public class TopologyBuilder {
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
* <p>
- * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -274,9 +274,9 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
+ public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
}
@@ -284,7 +284,7 @@ public class TopologyBuilder {
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
* <p>
- * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -302,11 +302,11 @@ public class TopologyBuilder {
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamsPartitioner, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
+ * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
- return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
+ return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
}
/**
@@ -326,10 +326,10 @@ public class TopologyBuilder {
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamsPartitioner, String...)
+ * @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
*/
- public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
+ public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index ef4c3c7..68680ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -57,7 +57,8 @@ public abstract class AbstractTask {
// create the processor state manager
try {
- File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
+ File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+ File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 25c663d..fe0472e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@ public class RecordCollector {
}
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
- StreamsPartitioner<K, V> partitioner) {
+ StreamPartitioner<K, V> partitioner) {
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 88b3f56..7ab59ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
public class SinkNode<K, V> extends ProcessorNode<K, V> {
private final String topic;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
- private final StreamsPartitioner<K, V> partitioner;
+ private final StreamPartitioner<K, V> partitioner;
private ProcessorContext context;
- public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
+ public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
super(name);
this.topic = topic;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e5d0922..f118f60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -104,6 +104,18 @@ public class StreamThread extends Thread {
private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
private boolean processStandbyRecords = false;
+ static File makeStateDir(String jobId, String baseDirName) {
+ File baseDir = new File(baseDirName);
+ if (!baseDir.exists())
+ baseDir.mkdir();
+
+ File stateDir = new File(baseDir, jobId);
+ if (!stateDir.exists())
+ stateDir.mkdir();
+
+ return stateDir;
+ }
+
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -167,8 +179,7 @@ public class StreamThread extends Thread {
this.standbyRecords = new HashMap<>();
// read in task specific config values
- this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
- this.stateDir.mkdir();
+ this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -452,14 +463,15 @@ public class StreamThread extends Thread {
if (stateDirs != null) {
for (File dir : stateDirs) {
try {
- TaskId id = TaskId.parse(dir.getName());
+ String dirName = dir.getName();
+ TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1));
// try to acquire the exclusive lock on the state directory
FileLock directoryLock = null;
try {
directoryLock = ProcessorStateManager.lockStateDirectory(dir);
if (directoryLock != null) {
- log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
+ log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs);
Utils.delete(dir);
}
} catch (IOException e) {