You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/11 01:40:06 UTC

samza git commit: SAMZA-1911: Add documentation for quick start

Repository: samza
Updated Branches:
  refs/heads/master aaef1ab17 -> 623661e02


SAMZA-1911: Add documentation for quick start

md file for quick start.

Author: xinyuiscool <xi...@gmail.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #704 from xinyuiscool/SAMZA-1911


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/623661e0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/623661e0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/623661e0

Branch: refs/heads/master
Commit: 623661e020ef1d53f0334037f1efef15c0499e35
Parents: aaef1ab
Author: xinyuiscool <xi...@gmail.com>
Authored: Wed Oct 10 18:40:03 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 10 18:40:03 2018 -0700

----------------------------------------------------------------------
 docs/_docs/replace-versioned.sh             |   3 +
 docs/_layouts/default.html                  |   6 +-
 docs/_menu/index.html                       |   2 +-
 docs/startup/quick-start/versioned/index.md | 254 +++++++++++++++++++++++
 4 files changed, 261 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/623661e0/docs/_docs/replace-versioned.sh
----------------------------------------------------------------------
diff --git a/docs/_docs/replace-versioned.sh b/docs/_docs/replace-versioned.sh
index c005e62..24bf7ae 100755
--- a/docs/_docs/replace-versioned.sh
+++ b/docs/_docs/replace-versioned.sh
@@ -42,3 +42,6 @@ mv -f $DIR/_site/startup/releases/versioned $DIR/_site/startup/releases/$version
 
 echo "replaced startup/hello-samza/versioned to startup/hello-samza/"$version
 mv -f $DIR/_site/startup/hello-samza/versioned $DIR/_site/startup/hello-samza/$version
+
+echo "replaced startup/quick-start/versioned to startup/quick-start/"$version
+mv -f $DIR/_site/startup/quick-start/versioned $DIR/_site/startup/quick-start/$version
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/623661e0/docs/_layouts/default.html
----------------------------------------------------------------------
diff --git a/docs/_layouts/default.html b/docs/_layouts/default.html
index c10403d..b8cce84 100644
--- a/docs/_layouts/default.html
+++ b/docs/_layouts/default.html
@@ -61,7 +61,7 @@
           A distributed stream processing framework
         </h2>
         <div class="content">
-          <a class="button" href="/startup/hello-samza/{{site.version}}">
+          <a class="button" href="/startup/quick-start/{{site.version}}">
             Quick Start
           </a>
           <a class="button" href="/case-studies/">
@@ -111,8 +111,8 @@
   <div class="content--samza-intro">
       <p>
         Samza allows you to build stateful applications that process data in real-time from multiple sources including Apache Kafka.
-        <br/> <br/> 
-        Battle-tested at scale, it supports flexible deployment options to run on <a href="/learn/documentation/latest/deployment/yarn.html">YARN</a> or as a 
+        <br/> <br/>
+        Battle-tested at scale, it supports flexible deployment options to run on <a href="/learn/documentation/latest/deployment/yarn.html">YARN</a> or as a
         <a href="/learn/documentation/latest/deployment/standalone.html">standalone library</a>.
       </p>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/623661e0/docs/_menu/index.html
----------------------------------------------------------------------
diff --git a/docs/_menu/index.html b/docs/_menu/index.html
index ecf40d6..1f35c13 100644
--- a/docs/_menu/index.html
+++ b/docs/_menu/index.html
@@ -3,7 +3,7 @@ items:
   - menu_title: Getting Started
     items:
       - menu_title: QuickStart
-        url: /startup/hello-samza/version/
+        url: /startup/quick-start/version/
       - menu_title: Code Examples
         url: /learn/tutorials/version/
   - menu_title: Documentation

http://git-wip-us.apache.org/repos/asf/samza/blob/623661e0/docs/startup/quick-start/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/quick-start/versioned/index.md b/docs/startup/quick-start/versioned/index.md
new file mode 100644
index 0000000..44b8376
--- /dev/null
+++ b/docs/startup/quick-start/versioned/index.md
@@ -0,0 +1,254 @@
+---
+layout: page
+title: Quick Start
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+This tutorial will go through the steps of creating your first Samza application - `WordCount`. It demonstrates how to start writing a Samza application, consume from a kafka stream, tokenize the lines into words, and count the frequency of each word.  For this tutorial we are going to use gradle 4.9 to build the projects. The full tutorial project tar file can be downloaded [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
+
+### Setting up a Java Project
+
+First let’s create the project structure as follows:
+
+{% highlight bash %}
+wordcount
+|-- build.gradle
+|-- gradle.properties
+|-- scripts
+|-- src
+    |-- main
+        |-- config
+        |-- java
+            |-- samzaapp
+                 |-- WordCount.java
+{% endhighlight %}
+
+You can copy build.gradle and gradle.properties files from the downloaded tutorial tgz file. The WordCount class is just an empty class for now. Once finishing this setup, you can build the project by:
+
+{% highlight bash %}
+> cd wordcount
+> gradle wrapper --gradle-version 4.9
+> ./gradlew build
+{% endhighlight %}
+
+### Create a Samza StreamApplication
+
+Now let’s write some code! The first step is to create your own Samza application by implementing the [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) class:
+
+{% highlight java %}
+package samzaapp;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
+
+public class WordCount implements StreamApplication {
+ @Override
+ public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
+ }
+}
+{% endhighlight %}
+
+The StreamApplication interface provides an API method named describe() for you to specify your streaming pipeline. Using [StreamApplicationDescriptor](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplicationDescriptor.html), you can describe your entire data processing task from data inputs, operations and outputs.
+
+### Input data source using Kafka
+
+In this example, we are going to use Kafka as the input data source and consume the text for word count line by line. We start by defining a KafkaSystemDescriptor, which specifies the properties to establishing the connection to the local Kafka cluster. Then we create a  `KafkaInputDescriptor`/`KafkaOutputDescriptor` to set up the topic, Serializer and Deserializer. Finally we use this input in the [StreamApplicationDescriptor](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplicationDescriptor.html) so we can consume from this topic. The code is in the following:
+
+{% highlight java %}
+public class WordCount implements StreamApplication {
+ private static final String KAFKA_SYSTEM_NAME = "kafka";
+ private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+ private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+ private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+ private static final String INPUT_STREAM_ID = "sample-text";
+ private static final String OUTPUT_STREAM_ID = "word-count-output";
+
+ @Override
+ public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
+   KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
+       .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+       .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+       .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+   KafkaInputDescriptor<KV<String, String>> inputDescriptor =
+       kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID,
+           KVSerde.of(new StringSerde(), new StringSerde()));
+   KafkaOutputDescriptor<KV<String, String>> outputDescriptor =
+       kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID,
+           KVSerde.of(new StringSerde(), new StringSerde()));
+
+   MessageStream<KV<String, String>> lines = streamApplicationDescriptor.getInputStream(inputDescriptor);
+   OutputStream<KV<String, String>> counts = streamApplicationDescriptor.getOutputStream(outputDescriptor);
+ }
+}
+{% endhighlight %}
+
+The resulting [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) lines contains the data set that reads from Kafka and deserialized into string of each line. We also defined the output stream counts so we can write the word count results to it. Next let’s add processing logic. 
+
+### Add word count processing logic
+
+First we are going to extract the value from lines. This is a one-to-one transform and we can use the Samza map operator as following:
+
+{% highlight java %}
+lines .map(kv -> kv.value)
+{% endhighlight %}
+
+Then we will split the line into words by using the flatmap operator:
+
+{% highlight java %}
+.flatMap(s -> Arrays.asList(s.split("\\W+")))
+{% endhighlight %}
+
+Now let’s think about how to count the words. We need to aggregate the count based on the word as the key, and emit the aggregation results once there are no more data coming. Here we can use a session window which will trigger the output if there is no data coming within a certain interval.
+
+{% highlight java %}
+.window(Windows.keyedSessionWindow(
+   w -> w, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+   new StringSerde(), new IntegerSerde()), "count")
+{% endhighlight %}
+
+The output will be captured in a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) type, which contains the key and the aggregation value. We add a further map to transform that into a KV. To write the output to the output Kafka stream, we used the sentTo operator in Samza:
+
+{% highlight java %}
+.map(windowPane ->
+   KV.of(windowPane.getKey().getKey(),
+       windowPane.getKey().getKey() + ": " + windowPane.getMessage().toString()))
+.sendTo(counts);
+{% endhighlight %}
+
+The full processing logic looks like the following:
+
+{% highlight java %}
+lines
+   .map(kv -> kv.value)
+   .flatMap(s -> Arrays.asList(s.split("\\W+")))
+   .window(Windows.keyedSessionWindow(
+       w -> w, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+       new StringSerde(), new IntegerSerde()), "count")
+   .map(windowPane ->
+       KV.of(windowPane.getKey().getKey(),
+           windowPane.getKey().getKey() + ": " + windowPane.getMessage().toString()))
+   .sendTo(counts);
+{% endhighlight %}
+
+
+### Config your application
+
+In this section we will configure the word count example to run locally in a single JVM. Please add a file named “word-count.properties” under the config folder. We will add the job configs in this file.
+
+Since there is only a single Samza processor, there is no coordination required. We use the PassthroughJobCoordinator for the example. We also group all Samza tasks into this single processor. As for the Kafka topic, we will consume from the beginning. Here is the full config needed for the job:
+
+{% highlight jproperties %}
+job.name=word-count
+job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
+job.coordination.utils.factory=org.apache.samza.standalone.PassthroughCoordinationUtilsFactory
+job.changelog.system=kafka
+task.name.grouper.factory=org.apache.samza.container.grouper.task.SingleContainerGrouperFactory
+processor.id=0
+systems.kafka.default.stream.samza.offset.default=oldest
+{% endhighlight %}
+
+For more details about Samza config, feel free to check out the latest config [here](/learn/documentation/{{site.version}}/jobs/configuration-table.html).
+
+### Run your application
+
+Let’s add a `main()` function to `WordCount` class first. The function reads the config file and factory from the args, and create a `LocalApplicationRunner` to run the application locally. Here is the function details:
+
+{% highlight java %}
+public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config config = cmdLine.loadConfig(options);
+ LocalApplicationRunner runner = new LocalApplicationRunner(new WordCount(), config);
+ runner.run();
+ runner.waitForFinish();
+}
+{% endhighlight %}
+
+In your "build.gradle" file, please add the following so we can use gradle to run it:
+
+{% highlight jproperties %}
+apply plugin:'application'
+
+mainClassName = "samzaapp.WordCount"
+{% endhighlight %}
+
+Before running `main()`, we need to create the input Kafka topic with some sample data. Let’s start a local kafka broker first. Samza examples provides a script named “grid” which you can use to start zookeeper, kafka broker and yarn. Your can download it [here](https://github.com/apache/samza-hello-samza/blob/master/bin/grid) and put it under scripts/ folder, then issue the following command:
+
+{% highlight bash %}
+> ./scripts/grid install zookeeper && ./scripts/grid start zookeeper
+> ./scripts/grid install kafka && ./scripts/grid start kafka
+{% endhighlight %}
+
+Next we will create a Kafka topic named sample-text, and publish some sample data into it. A "sample-text.txt" file is included in the downloaded tutorial tgz file. In command line:
+
+{% highlight bash %}
+> ./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic sample-text --partition 1 --replication-factor 1
+> ./deploy/kafka/bin/kafka-console-producer.sh --topic sample-text --broker localhost:9092 < ./sample-text.txt
+{% endhighlight %}
+
+Now let’s fire up our application. Here we use gradle to run it. You can also run it directly within your IDE, with the same program arguments.
+
+{% highlight bash %}
+> export BASE_DIR=`pwd`
+> ./gradlew run --args="--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$BASE_DIR/src/main/config/word-count.properties"
+{% endhighlight %}
+
+This application will output to a Kafka topic named "word-count-output". Let’s consume this topic to check out the results:
+
+{% highlight bash %}
+>  ./deploy/kafka/bin/kafka-console-consumer.sh --topic word-count-output --zookeeper localhost:2181 --from-beginning
+{% endhighlight %}
+
+It will show the counts for each word like the following:
+
+{% highlight bash %}
+well: 4
+complaining: 1
+die: 3
+but: 22
+not: 50
+truly: 5
+murmuring: 1
+satisfied: 3
+the: 120
+thy: 8
+gods: 8
+thankful: 1
+and: 243
+from: 16
+{% endhighlight %}
+
+### More Examples
+
+The [hello-samza](https://github.com/apache/samza-hello-samza) project contains a lot of more examples to help you create your Samza job. To checkout the hello-samza project:
+
+{% highlight bash %}
+> git clone https://git.apache.org/samza-hello-samza.git hello-samza
+{% endhighlight %}
+
+There are four main categories of examples in this project, including:
+
+1. [wikipedia](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia): this is a more complex example demonstrating the entire pipeline of consuming from the live feed from wikipedia edits, parsing the message and generating statistics from them.
+
+2. [cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook): you will find various examples in this folder to demonstrate usage of Samza high-level API, such as windowing, join and aggregations.
+
+3. [asure](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/azure): this example shows how to run your application on Microsoft Asure.
+
+4. [kinesis](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/kinesis): this example shows how to consume from Kinesis streams
\ No newline at end of file