You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/01 23:04:20 UTC
kafka-site git commit: MINOR: update streams hello world
Repository: kafka-site
Updated Branches:
refs/heads/asf-site 2e200cfce -> 6f8013869
MINOR: update streams hello world
Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/6f801386
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/6f801386
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/6f801386
Branch: refs/heads/asf-site
Commit: 6f80138695c78355195b683cae06f8a2f116a307
Parents: 2e200cf
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Nov 1 16:03:52 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 1 16:03:52 2017 -0700
----------------------------------------------------------------------
10/streams/index.html | 85 +++++++++++++++++++++++++---------------------
1 file changed, 46 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/6f801386/10/streams/index.html
----------------------------------------------------------------------
diff --git a/10/streams/index.html b/10/streams/index.html
index ece9fa7..bf86adc 100644
--- a/10/streams/index.html
+++ b/10/streams/index.html
@@ -152,35 +152,37 @@
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.KStreamBuilder;
- import org.apache.kafka.streams.kstream.KTable;
-
+ import org.apache.kafka.streams.Topology;
+ import org.apache.kafka.streams.kstream.Materialized;
+ import org.apache.kafka.streams.kstream.Produced;
+ import org.apache.kafka.streams.state.KeyValueStore;
+
import java.util.Arrays;
import java.util.Properties;
-
+
public class WordCountApplication {
-
+
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-
- KStreamBuilder builder = new KStreamBuilder();
+
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
- .count("Counts");
- wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
-
- KafkaStreams streams = new KafkaStreams(builder, config);
+ .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
+ wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
+
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
-
+
}
</pre>
</div>
@@ -189,26 +191,28 @@
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.KStreamBuilder;
- import org.apache.kafka.streams.kstream.KTable;
+ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+ import org.apache.kafka.streams.kstream.Materialized;
+ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
-
+ import org.apache.kafka.streams.state.KeyValueStore;
+
import java.util.Arrays;
import java.util.Properties;
-
+
public class WordCountApplication {
-
+
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-
- KStreamBuilder builder = new KStreamBuilder();
+
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -223,13 +227,15 @@
return word;
}
})
- .count("Counts");
- wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
-
- KafkaStreams streams = new KafkaStreams(builder, config);
+ .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
+
+
+ wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
+
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
-
+
}
</pre>
</div>
@@ -239,15 +245,16 @@
import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit
-
+
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
- import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
-
+ import org.apache.kafka.streams.kstream.{KeyValueMapper, Materialized, Produced, ValueMapper}
+ import org.apache.kafka.streams.state.KeyValueStore;
+
import scala.collection.JavaConverters.asJavaIterableConverter
-
+
object WordCountApplication {
-
+
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
@@ -257,23 +264,23 @@
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
-
- val builder: KStreamBuilder = new KStreamBuilder()
+
+ val builder: StreamsBuilder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
- .count("Counts")
- wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
-
- val streams: KafkaStreams = new KafkaStreams(builder, config)
+ .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
+ wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()))
+
+ val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
-
+
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
-
+
}
</pre>
</div>