You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/01 23:04:20 UTC

kafka-site git commit: MINOR: update streams hello world

Repository: kafka-site
Updated Branches:
  refs/heads/asf-site 2e200cfce -> 6f8013869


MINOR: update streams hello world


Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/6f801386
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/6f801386
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/6f801386

Branch: refs/heads/asf-site
Commit: 6f80138695c78355195b683cae06f8a2f116a307
Parents: 2e200cf
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Nov 1 16:03:52 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 1 16:03:52 2017 -0700

----------------------------------------------------------------------
 10/streams/index.html | 85 +++++++++++++++++++++++++---------------------
 1 file changed, 46 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/6f801386/10/streams/index.html
----------------------------------------------------------------------
diff --git a/10/streams/index.html b/10/streams/index.html
index ece9fa7..bf86adc 100644
--- a/10/streams/index.html
+++ b/10/streams/index.html
@@ -152,35 +152,37 @@
                <pre class="brush: java;">
                    import org.apache.kafka.common.serialization.Serdes;
                    import org.apache.kafka.streams.KafkaStreams;
+                   import org.apache.kafka.streams.StreamsBuilder;
                    import org.apache.kafka.streams.StreamsConfig;
-                   import org.apache.kafka.streams.kstream.KStream;
-                   import org.apache.kafka.streams.kstream.KStreamBuilder;
-                   import org.apache.kafka.streams.kstream.KTable;
-       
+                   import org.apache.kafka.streams.Topology;
+                   import org.apache.kafka.streams.kstream.Materialized;
+                   import org.apache.kafka.streams.kstream.Produced;
+                   import org.apache.kafka.streams.state.KeyValueStore;
+
                    import java.util.Arrays;
                    import java.util.Properties;
-       
+
                    public class WordCountApplication {
-       
+
                        public static void main(final String[] args) throws Exception {
                            Properties config = new Properties();
                            config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                            config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                            config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                            config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-       
-                           KStreamBuilder builder = new KStreamBuilder();
+
+                           StreamsBuilder builder = new StreamsBuilder();
                            KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
                            KTable&lt;String, Long&gt; wordCounts = textLines
                                .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                                .groupBy((key, word) -> word)
-                               .count("Counts");
-                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
-       
-                           KafkaStreams streams = new KafkaStreams(builder, config);
+                               .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"));
+                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
+
+                           KafkaStreams streams = new KafkaStreams(builder.build(), config);
                            streams.start();
                        }
-       
+
                    }
                </pre>
         </div>
@@ -189,26 +191,28 @@
                <pre class="brush: java;">
                    import org.apache.kafka.common.serialization.Serdes;
                    import org.apache.kafka.streams.KafkaStreams;
+                   import org.apache.kafka.streams.StreamsBuilder;
                    import org.apache.kafka.streams.StreamsConfig;
-                   import org.apache.kafka.streams.kstream.KStream;
-                   import org.apache.kafka.streams.kstream.KStreamBuilder;
-                   import org.apache.kafka.streams.kstream.KTable;
+                   import org.apache.kafka.streams.Topology;
                    import org.apache.kafka.streams.kstream.KeyValueMapper;
+                   import org.apache.kafka.streams.kstream.Materialized;
+                   import org.apache.kafka.streams.kstream.Produced;
                    import org.apache.kafka.streams.kstream.ValueMapper;
-       
+                   import org.apache.kafka.streams.state.KeyValueStore;
+
                    import java.util.Arrays;
                    import java.util.Properties;
-       
+
                    public class WordCountApplication {
-       
+
                        public static void main(final String[] args) throws Exception {
                            Properties config = new Properties();
                            config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                            config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                            config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                            config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-       
-                           KStreamBuilder builder = new KStreamBuilder();
+
+                           StreamsBuilder builder = new StreamsBuilder();
                            KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
                            KTable&lt;String, Long&gt; wordCounts = textLines
                                .flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@@ -223,13 +227,15 @@
                                        return word;
                                    }
                                })
-                               .count("Counts");
-                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
-       
-                           KafkaStreams streams = new KafkaStreams(builder, config);
+                               .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"));
+
+
+                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
+
+                           KafkaStreams streams = new KafkaStreams(builder.build(), config);
                            streams.start();
                        }
-       
+
                    }
                </pre>
         </div>
@@ -239,15 +245,16 @@
                    import java.lang.Long
                    import java.util.Properties
                    import java.util.concurrent.TimeUnit
-       
+
                    import org.apache.kafka.common.serialization._
                    import org.apache.kafka.streams._
-                   import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
-       
+                   import org.apache.kafka.streams.kstream.{KeyValueMapper, Materialized, Produced, ValueMapper}
+                   import org.apache.kafka.streams.state.KeyValueStore;
+
                    import scala.collection.JavaConverters.asJavaIterableConverter
-       
+
                    object WordCountApplication {
-       
+
                        def main(args: Array[String]) {
                            val config: Properties = {
                                val p = new Properties()
@@ -257,23 +264,23 @@
                                p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                                p
                            }
-       
-                           val builder: KStreamBuilder = new KStreamBuilder()
+
+                           val builder: StreamsBuilder = new StreamsBuilder()
                            val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
                            val wordCounts: KTable[String, Long] = textLines
                                .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
                                .groupBy((_, word) => word)
-                               .count("Counts")
-                           wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
-       
-                           val streams: KafkaStreams = new KafkaStreams(builder, config)
+                               .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
+                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()))
+
+                           val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
                            streams.start()
-       
+
                            Runtime.getRuntime.addShutdownHook(new Thread(() => {
                                streams.close(10, TimeUnit.SECONDS)
                            }))
                        }
-       
+
                    }
                </pre>
         </div>