You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/25 18:34:19 UTC
kafka git commit: MINOR: Make streams quick start more interactive
Repository: kafka
Updated Branches:
refs/heads/trunk 5d798511b -> 91c207c2c
MINOR: Make streams quick start more interactive
1. Make the WordCountDemo application to not stop automatically but via "ctrl-C".
2. Update the quickstart html file to let users type input messages one-by-one, and observe added output in an interactive manner.
3. Some minor fixes on the parent documentation page pointing to streams sub-pages, added a new recommended Scala version number.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Michael G. Noll <mi...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #3515 from guozhangwang/KMinor-interactive-quickstart
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91c207c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91c207c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91c207c2
Branch: refs/heads/trunk
Commit: 91c207c2c6b09d88cc3366d69a31d0bf0ab0bffb
Parents: 5d79851
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Jul 25 11:34:16 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 25 11:34:16 2017 -0700
----------------------------------------------------------------------
docs/js/templateData.js | 1 +
docs/streams/quickstart.html | 192 ++++++++++++++-----
docs/toc.html | 9 +
.../kafka/streams/examples/pipe/PipeDemo.java | 25 ++-
.../examples/wordcount/WordCountDemo.java | 27 ++-
.../wordcount/WordCountProcessorDemo.java | 31 ++-
6 files changed, 214 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/docs/js/templateData.js
----------------------------------------------------------------------
diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 3eca71e..50997bd 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -20,4 +20,5 @@ var context={
"version": "0110",
"dotVersion": "0.11.0",
"fullDotVersion": "0.11.0.0"
+ "scalaVersion:" "2.11"
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/docs/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html
index 1c45e16..031a375 100644
--- a/docs/streams/quickstart.html
+++ b/docs/streams/quickstart.html
@@ -40,10 +40,10 @@ of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
-// Construct a `KStream` from the input topic ""streams-file-input", where message values
+// Construct a `KStream` from the input topic "streams-wordcount-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
-KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
+KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-wordcount-input");
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
@@ -71,16 +71,18 @@ because it cannot know when it has processed "all" the input data.
<p>
As the first step, we will start Kafka (unless you already have it started) and then we will
prepare input data to a Kafka topic, which will subsequently be processed by a Kafka Streams application.
+</p>
- <h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step 1: Download the code</a></h4>
+<h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step 1: Download the code</a></h4>
-<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_2.11-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
+<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it.
+Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here:
<pre class="brush: bash;">
-> tar -xzf kafka_2.11-{{fullDotVersion}}.tgz
-> cd kafka_2.11-{{fullDotVersion}}
+> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+> cd kafka_{{scalaVersion}}-{{fullDotVersion}}
</pre>
-</p>
+
<h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
<p>
@@ -102,19 +104,9 @@ Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a> so you need to
</pre>
-<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare data</a></h4>
+<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4>
<!--
-<pre>
-> <b>./bin/kafka-topics --create \</b>
- <b>--zookeeper localhost:2181 \</b>
- <b>--replication-factor 1 \</b>
- <b>--partitions 1 \</b>
- <b>--topic streams-file-input</b>
-
-</pre>
-
--->
<pre class="brush: bash;">
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
@@ -126,41 +118,59 @@ Or on Windows:
> echo|set /p=join kafka summit>> file-input.txt
</pre>
-<p>
-Next, we send this input data to the input topic named <b>streams-file-input</b> using the console producer,
-which reads the data from STDIN line-by-line, and publishes each line as a separate Kafka message with null key and value encoded a string to the topic (in practice,
-stream data will likely be flowing continuously into Kafka where the application will be up and running):
-</p>
+-->
+
+Next, we create the input topic named <b>streams-wordcount-input</b> and the output topic named <b>streams-wordcount-output</b>:
<pre class="brush: bash;">
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
- --topic streams-file-input
+ --topic streams-wordcount-input
+Created topic "streams-wordcount-input".
+
+> bin/kafka-topics.sh --create \
+ --zookeeper localhost:2181 \
+ --replication-factor 1 \
+ --partitions 1 \
+ --topic streams-wordcount-output
+Created topic "streams-wordcount-output".
</pre>
+The created topic can be described with the same <b>kafka-topics</b> tool:
<pre class="brush: bash;">
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
+> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
+
+Topic:streams-wordcount-input PartitionCount:1 ReplicationFactor:1 Configs:
+ Topic: streams-wordcount-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
+Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:
+ Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
</pre>
-<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 4: Process data</a></h4>
+<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: Start the Wordcount Application</a></h4>
+
+The following command starts the WordCount demo application:
<pre class="brush: bash;">
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
</pre>
<p>
-The demo application will read from the input topic <b>streams-file-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
+The demo application will read from the input topic <b>streams-wordcount-input</b>, perform the computations of the WordCount algorithm on each of the read messages,
and continuously write its current results to the output topic <b>streams-wordcount-output</b>.
Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
-The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.
-</p>
-<p>
-We can now inspect the output of the WordCount demo application by reading from its output topic:
</p>
+Now we can start the console producer in a separate terminal to write some input data to this topic:
+
+<pre class="brush: bash;">
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+</pre>
+
+and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
+
<pre class="brush: bash;">
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
@@ -172,27 +182,115 @@ We can now inspect the output of the WordCount demo application by reading from
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
</pre>
+
+<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5: Process some data</a></h4>
+
+Now let's write some message with the console producer into the input topic <b>streams-wordcount-input</b> by entering a single line of text and then hit <RETURN>.
+This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
+(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
+
+<pre class="brush: bash;">
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+</pre>
+
<p>
-with the following output data being printed to the console:
+This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer:
</p>
<pre class="brush: bash;">
-all 1
-lead 1
-to 1
-hello 1
-streams 2
-join 1
-kafka 3
-summit 1
+> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+ --topic streams-wordcount-output \
+ --from-beginning \
+ --formatter kafka.tools.DefaultMessageFormatter \
+ --property print.key=true \
+ --property print.value=true \
+ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all 1
+streams 1
+lead 1
+to 1
+kafka 1
</pre>
<p>
-Here, the first column is the Kafka message key in <code>java.lang.String</code> format, and the second column is the message value in <code>java.lang.Long</code> format.
-Note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is
-an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
+Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count.
</p>
+Now let's continue writing one more message with the console producer into the input topic <b>streams-wordcount-input</b>.
+Enter the text line "hello kafka streams" and hit <RETURN>.
+Your terminal should look as follows:
+
+<pre class="brush: bash;">
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+</pre>
+
+In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
+
+<pre class="brush: bash;">
+> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+ --topic streams-wordcount-output \
+ --from-beginning \
+ --formatter kafka.tools.DefaultMessageFormatter \
+ --property print.key=true \
+ --property print.value=true \
+ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all 1
+streams 1
+lead 1
+to 1
+kafka 1
+hello 1
+kafka 2
+streams 2
+</pre>
+
+Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been incremented from <b>1</b> to <b>2</b>.
+Whenever you write further input messages to the input topic, you will observe new messages being added to the <b>streams-wordcount-output</b> topic,
+representing the most recent word counts as computed by the WordCount application.
+Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic <b>streams-wordcount-input</b> before we wrap up this quickstart:
+
+<pre class="brush: bash;">
+> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+join kafka summit
+</pre>
+
+The <b>streams-wordcount-output</b> topic will subsequently show the corresponding updated word counts (see last three lines):
+
+<pre class="brush: bash;">
+> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+ --topic streams-wordcount-output \
+ --from-beginning \
+ --formatter kafka.tools.DefaultMessageFormatter \
+ --property print.key=true \
+ --property print.value=true \
+ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all 1
+streams 1
+lead 1
+to 1
+kafka 1
+hello 1
+kafka 2
+streams 2
+join 1
+kafka 3
+summit 1
+</pre>
+
+As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is
+an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
+
<p>
The two diagrams below illustrate what is essentially happening behind the scenes.
The first column shows the evolution of the current state of the <code>KTable<String, Long></code> that is counting word occurrences for <code>count</code>.
@@ -217,13 +315,9 @@ And so on (we skip the illustration of how the third line is being processed). T
Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.
</p>
-<p>
-Now you can write more input messages to the <b>streams-file-input</b> topic and observe additional messages added
-to <b>streams-wordcount-output</b> topic, reflecting updated word counts (e.g., using the console producer and the
-console consumer, as described above).
-</p>
+<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown the application</a></h4>
-<p>You can stop the console consumer via <b>Ctrl-C</b>.</p>
+<p>You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the Zookeeper server in order via <b>Ctrl-C</b>.</p>
<div class="pagination">
<a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>
http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/docs/toc.html
----------------------------------------------------------------------
diff --git a/docs/toc.html b/docs/toc.html
index 7525b0f..2ec0129 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -141,6 +141,15 @@
<li><a href="#connect_development">8.3 Connector Development Guide</a></li>
</ul>
</li>
+ <li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a>
+ <ul>
+ <li><a href="/{{version}}/documentation/streams/quickstart">9.1 Play with a Streams Application</a></li>
+ <li><a href="/{{version}}/documentation/streams/developer-guide">9.2 Developer Guide</a></li>
+ <li><a href="/{{version}}/documentation/streams/core-concepts">9.3 Core Concepts</a></li>
+ <li><a href="/{{version}}/documentation/streams/architecture">9.4 Architecture</a></li>
+ <li><a href="/{{version}}/documentation/streams/upgrade-guide">9.5 Upgrade Guide and API Changes</a></li>
+ </ul>
+ </li>
</ul>
</script>
http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 86182a3..1d672b2 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -18,11 +18,13 @@ package org.apache.kafka.streams.examples.pipe;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
/**
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
@@ -51,13 +53,24 @@ public class PipeDemo {
builder.stream("streams-file-input").to("streams-pipe-output");
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
+ final KafkaStreams streams = new KafkaStreams(builder, props);
+ final CountDownLatch latch = new CountDownLatch(1);
- // usually the stream application would be running forever,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
+ @Override
+ public void run() {
+ streams.close();
+ latch.countDown();
+ }
+ });
- streams.close();
+ try {
+ streams.start();
+ latch.await();
+ } catch (Throwable e) {
+ Exit.exit(1);
+ }
+ Exit.exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 03f8762..616fc48 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
/**
* Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
@@ -60,7 +62,7 @@ public class WordCountDemo {
KStreamBuilder builder = new KStreamBuilder();
- KStream<String, String> source = builder.stream("streams-file-input");
+ KStream<String, String> source = builder.stream("streams-wordcount-input");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -80,13 +82,24 @@ public class WordCountDemo {
// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
+ final KafkaStreams streams = new KafkaStreams(builder, props);
+ final CountDownLatch latch = new CountDownLatch(1);
- // usually the stream application would be running forever,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
+ @Override
+ public void run() {
+ streams.close();
+ latch.countDown();
+ }
+ });
- streams.close();
+ try {
+ streams.start();
+ latch.await();
+ } catch (Throwable e) {
+ Exit.exit(1);
+ }
+ Exit.exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/91c207c2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index eceddf0..0ff42a7 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.state.Stores;
import java.util.Locale;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
/**
* Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
@@ -119,20 +121,31 @@ public class WordCountProcessorDemo {
TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("Source", "streams-file-input");
+ builder.addSource("Source", "streams-wordcount-input");
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
-
- // usually the stream application would be running forever,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
+ final KafkaStreams streams = new KafkaStreams(builder, props);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
+ @Override
+ public void run() {
+ streams.close();
+ latch.countDown();
+ }
+ });
+
+ try {
+ streams.start();
+ latch.await();
+ } catch (Throwable e) {
+ Exit.exit(1);
+ }
+ Exit.exit(0);
}
}