You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/11/14 03:40:44 UTC

[1/9] samza git commit: Clean-up the Quick-Start and Code-Examples pages; Re-organize content

Repository: samza
Updated Branches:
  refs/heads/1.0.0 ed8d1da8b -> d034bbef8


Clean-up the Quick-Start and Code-Examples pages; Re-organize content

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #759 from vjagadish1989/website-reorg23


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4b012326
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4b012326
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4b012326

Branch: refs/heads/1.0.0
Commit: 4b0123262bcb754de0311d617c4110480ed3bfaf
Parents: ed8d1da
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Oct 23 23:39:54 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:28:10 2018 -0800

----------------------------------------------------------------------
 docs/_docs/replace-versioned.sh               |  5 +-
 docs/_menu/index.html                         |  2 +-
 docs/startup/code-examples/versioned/index.md | 49 +++++++++++++
 docs/startup/quick-start/versioned/index.md   | 83 ++++++++++------------
 4 files changed, 91 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4b012326/docs/_docs/replace-versioned.sh
----------------------------------------------------------------------
diff --git a/docs/_docs/replace-versioned.sh b/docs/_docs/replace-versioned.sh
index 24bf7ae..c454cac 100755
--- a/docs/_docs/replace-versioned.sh
+++ b/docs/_docs/replace-versioned.sh
@@ -44,4 +44,7 @@ echo "replaced startup/hello-samza/versioned to startup/hello-samza/"$version
 mv -f $DIR/_site/startup/hello-samza/versioned $DIR/_site/startup/hello-samza/$version
 
 echo "replaced startup/quick-start/versioned to startup/quick-start/"$version
-mv -f $DIR/_site/startup/quick-start/versioned $DIR/_site/startup/quick-start/$version
\ No newline at end of file
+mv -f $DIR/_site/startup/quick-start/versioned $DIR/_site/startup/quick-start/$version
+
+echo "replaced startup/code-examples/versioned to startup/code-examples/"$version
+mv -f $DIR/_site/startup/code-examples/versioned $DIR/_site/startup/code-examples/$version

http://git-wip-us.apache.org/repos/asf/samza/blob/4b012326/docs/_menu/index.html
----------------------------------------------------------------------
diff --git a/docs/_menu/index.html b/docs/_menu/index.html
index 0d1750f..a363bae 100644
--- a/docs/_menu/index.html
+++ b/docs/_menu/index.html
@@ -5,7 +5,7 @@ items:
       - menu_title: QuickStart
         url: /startup/quick-start/version/
       - menu_title: Code Examples
-        url: /learn/tutorials/version/
+        url: /startup/code-examples/version/
   - menu_title: Documentation
     has_sub: true
     has_sub_subs: true

http://git-wip-us.apache.org/repos/asf/samza/blob/4b012326/docs/startup/code-examples/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/code-examples/versioned/index.md b/docs/startup/code-examples/versioned/index.md
new file mode 100644
index 0000000..ba1cc3e
--- /dev/null
+++ b/docs/startup/code-examples/versioned/index.md
@@ -0,0 +1,49 @@
+---
+layout: page
+title:
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
+### Checking out our examples
+
+The [hello-samza](https://github.com/apache/samza-hello-samza) project contains several examples to help you create your Samza applications. To checkout the hello-samza project:
+
+{% highlight bash %}
+> git clone https://git.apache.org/samza-hello-samza.git hello-samza
+{% endhighlight %}
+
+#### High-level API examples
+[The Samza Cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook) contains various recipes using the Samza high-level API.
+These include:
+
+- The [Filter example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) demonstrates how to perform stateless operations on a stream. 
+
+- The [Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java]) demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks
+
+- The [Stream-Table Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java) demonstrates how the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.
+
+- The [SessionWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java) and [TumblingWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java) examples illustrate Samza's rich windowing and triggering capabilities.
+
+
+In addition to the cookbook, you can also consult these:
+
+- [Wikipedia Parser](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia): An advanced example that builds a streaming pipeline consuming a live-feed of wikipedia edits, parsing each message and generating statistics from them.
+
+
+- [Amazon Kinesis](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/kinesis) and [Azure Eventhubs](https://github.com/apache/samza-hello-samza/tree/latest/src/main/java/samza/examples/azure) examples that cover how to consume input data from the respective systems.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/4b012326/docs/startup/quick-start/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/quick-start/versioned/index.md b/docs/startup/quick-start/versioned/index.md
index a046ee7..30add8a 100644
--- a/docs/startup/quick-start/versioned/index.md
+++ b/docs/startup/quick-start/versioned/index.md
@@ -19,11 +19,11 @@ title: Quick Start
    limitations under the License.
 -->
 
-This tutorial will go through the steps of creating your first Samza application - `WordCount`. It demonstrates how to start writing a Samza application, consume from a kafka stream, tokenize the lines into words, and count the frequency of each word.  For this tutorial we are going to use gradle 4.9 to build the projects. The full tutorial project tar file can be downloaded [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
+In this tutorial, we will create our first Samza application - `WordCount`. This application will consume messages from a Kafka stream, tokenize them into individual words and count the frequency of each word.  Let us download the entire project from [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
 
 ### Setting up a Java Project
 
-First let’s create the project structure as follows:
+Observe the project structure as follows:
 
 {% highlight bash %}
 wordcount
@@ -38,7 +38,7 @@ wordcount
                  |-- WordCount.java
 {% endhighlight %}
 
-You can copy build.gradle and gradle.properties files from the downloaded tutorial tgz file. The WordCount class is just an empty class for now. Once finishing this setup, you can build the project by:
+You can build the project anytime by running:
 
 {% highlight bash %}
 > cd wordcount
@@ -48,7 +48,7 @@ You can copy build.gradle and gradle.properties files from the downloaded tutori
 
 ### Create a Samza StreamApplication
 
-Now let’s write some code! The first step is to create your own Samza application by implementing the [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) class:
+Now let’s write some code! An application written using Samza's [high-level API](/learn/documentation/{{site.version}}/api/api/high-level-api.html) implements the [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) interface:
 
 {% highlight java %}
 package samzaapp;
@@ -63,11 +63,11 @@ public class WordCount implements StreamApplication {
 }
 {% endhighlight %}
 
-The StreamApplication interface provides an API method named describe() for you to specify your streaming pipeline. Using [StreamApplicationDescriptor](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplicationDescriptor.html), you can describe your entire data processing task from data inputs, operations and outputs.
+The interface provides a single method named `describe()`, which allows us to define our inputs, the processing logic and outputs for our application. 
 
-### Input data source using Kafka
+### Describe your inputs and outputs
 
-In this example, we are going to use Kafka as the input data source and consume the text for word count line by line. We start by defining a KafkaSystemDescriptor, which specifies the properties to establishing the connection to the local Kafka cluster. Then we create a  `KafkaInputDescriptor`/`KafkaOutputDescriptor` to set up the topic, Serializer and Deserializer. Finally we use this input in the [StreamApplicationDescriptor](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplicationDescriptor.html) so we can consume from this topic. The code is in the following:
+To interact with Kafka, we will first create a `KafkaSystemDescriptor` by providing the coordinates of the Kafka cluster. For each Kafka topic our application reads from, we create a `KafkaInputDescriptor` with the name of the topic and a serializer. Likewise, for each output topic, we instantiate a corresponding `KafkaOutputDescriptor`. 
 
 {% highlight java %}
 public class WordCount implements StreamApplication {
@@ -81,11 +81,13 @@ public class WordCount implements StreamApplication {
 
  @Override
  public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
+   // Create a KafkaSystemDescriptor providing properties of the cluster
    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
        .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
        .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
        .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
 
+   // For each input or output stream, create a KafkaInput/Output descriptor
    KafkaInputDescriptor<KV<String, String>> inputDescriptor =
        kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID,
            KVSerde.of(new StringSerde(), new StringSerde()));
@@ -93,29 +95,31 @@ public class WordCount implements StreamApplication {
        kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID,
            KVSerde.of(new StringSerde(), new StringSerde()));
 
+   // Obtain a handle to a MessageStream that you can chain operations on
    MessageStream<KV<String, String>> lines = streamApplicationDescriptor.getInputStream(inputDescriptor);
    OutputStream<KV<String, String>> counts = streamApplicationDescriptor.getOutputStream(outputDescriptor);
  }
 }
 {% endhighlight %}
 
-The resulting [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) lines contains the data set that reads from Kafka and deserialized into string of each line. We also defined the output stream counts so we can write the word count results to it. Next let’s add processing logic. 
+The above example creates a [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) which reads from an input topic named `sample-text`. It also defines an output stream that emits results to a topic named `word-count-output`. Next let’s add our processing logic. 
 
 ### Add word count processing logic
 
-First we are going to extract the value from lines. This is a one-to-one transform and we can use the Samza map operator as following:
+Kafka messages typically have a key and a value. Since we only care about the value here, we will apply the `map` operator on the input stream to extract the value. 
 
 {% highlight java %}
-lines .map(kv -> kv.value)
+lines.map(kv -> kv.value)
 {% endhighlight %}
 
-Then we will split the line into words by using the flatmap operator:
+Next, we will tokenize the message into individual words using the `flatmap` operator.
 
 {% highlight java %}
 .flatMap(s -> Arrays.asList(s.split("\\W+")))
 {% endhighlight %}
 
-Now let’s think about how to count the words. We need to aggregate the count based on the word as the key, and emit the aggregation results once there are no more data coming. Here we can use a session window which will trigger the output if there is no data coming within a certain interval.
+
+We now need to group the words, aggregate their respective counts and periodically emit our results. For this, we will use Samza's session-windowing feature.
 
 {% highlight java %}
 .window(Windows.keyedSessionWindow(
@@ -123,7 +127,11 @@ Now let’s think about how to count the words. We need to aggregate the count b
    new StringSerde(), new IntegerSerde()), "count")
 {% endhighlight %}
 
-The output will be captured in a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) type, which contains the key and the aggregation value. We add a further map to transform that into a KV. To write the output to the output Kafka stream, we used the sentTo operator in Samza:
+Let's walk through each of the parameters to the above `window` function:
+The first parameter is a "key function", which defines the key to group messages by. In our case, we can simply use the word as the key. The second parameter is the windowing interval, which is set to 5 seconds. The third parameter is a function which provides the initial value for our aggregations. We can start with an initial count of zero for each word. The fourth parameter is an aggregation function for computing counts. The next two parameters specify the key and value serializers for our window. 
+
+The output from the window operator is captured in a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) type, which contains the word as the key and its count as the value. We add a further `map` to format this into a `KV`, that we can send to our Kafka topic. To write our results to the output topic, we use the `sendTo` operator in Samza.
+
 
 {% highlight java %}
 .map(windowPane ->
@@ -148,27 +156,31 @@ lines
 {% endhighlight %}
 
 
-### Config your application
+### Configure your application
 
-In this section we will configure the word count example to run locally in a single JVM. Please add a file named “word-count.properties” under the config folder. We will add the job configs in this file.
-
-Since there is only a single Samza processor, there is no coordination required. We use the PassthroughJobCoordinator for the example. We also group all Samza tasks into this single processor. As for the Kafka topic, we will consume from the beginning. Here is the full config needed for the job:
+In this section, we will configure our word count example to run locally in a single JVM. Let us add a file named “word-count.properties” under the config folder. 
 
 {% highlight jproperties %}
 job.name=word-count
+# Use a PassthroughJobCoordinator since there is no coordination needed
 job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
 job.coordination.utils.factory=org.apache.samza.standalone.PassthroughCoordinationUtilsFactory
+
 job.changelog.system=kafka
+
+# Use a single container to process all of the data
 task.name.grouper.factory=org.apache.samza.container.grouper.task.SingleContainerGrouperFactory
 processor.id=0
+
+# Read from the beginning of the topic
 systems.kafka.default.stream.samza.offset.default=oldest
 {% endhighlight %}
 
-For more details about Samza config, feel free to check out the latest config [here](/learn/documentation/{{site.version}}/jobs/configuration-table.html).
+For more details on Samza's configs, feel free to check out the latest [configuration reference](/learn/documentation/{{site.version}}/jobs/configuration-table.html).
 
 ### Run your application
 
-Let’s add a `main()` function to `WordCount` class first. The function reads the config file and factory from the args, and create a `LocalApplicationRunner` to run the application locally. Here is the function details:
+We are ready to add a `main()` function to the `WordCount` class. It parses the command-line arguments and instantiates a `LocalApplicationRunner` to execute the application locally.
 
 {% highlight java %}
 public static void main(String[] args) {
@@ -181,36 +193,29 @@ public static void main(String[] args) {
 }
 {% endhighlight %}
 
-In your "build.gradle" file, please add the following so we can use gradle to run it:
-
-{% highlight jproperties %}
-apply plugin:'application'
-
-mainClassName = "samzaapp.WordCount"
-{% endhighlight %}
 
-Before running `main()`, we need to create the input Kafka topic with some sample data. Let’s start a local kafka broker first. Samza examples provides a script named “grid” which you can use to start zookeeper, kafka broker and yarn. Your can download it [here](https://github.com/apache/samza-hello-samza/blob/master/bin/grid) and put it under scripts/ folder, then issue the following command:
+Before running `main()`, we will create our input Kafka topic and populate it with sample data. You can download the scripts to interact with Kafka along with the sample data from [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
 
 {% highlight bash %}
 > ./scripts/grid install zookeeper && ./scripts/grid start zookeeper
 > ./scripts/grid install kafka && ./scripts/grid start kafka
 {% endhighlight %}
 
-Next we will create a Kafka topic named sample-text, and publish some sample data into it. A "sample-text.txt" file is included in the downloaded tutorial tgz file. In command line:
 
 {% highlight bash %}
 > ./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic sample-text --partition 1 --replication-factor 1
 > ./deploy/kafka/bin/kafka-console-producer.sh --topic sample-text --broker localhost:9092 < ./sample-text.txt
 {% endhighlight %}
 
-Now let’s fire up our application. Here we use gradle to run it. You can also run it directly within your IDE, with the same program arguments.
+Let’s kick off our application and use gradle to run it. Alternately, you can also run it directly from your IDE, with the same program arguments.
 
 {% highlight bash %}
 > export BASE_DIR=`pwd`
 > ./gradlew run --args="--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$BASE_DIR/src/main/config/word-count.properties"
 {% endhighlight %}
 
-This application will output to a Kafka topic named "word-count-output". Let’s consume this topic to check out the results:
+
+The application will output to a Kafka topic named "word-count-output". We will now fire up a Kafka consumer to read from this topic:
 
 {% highlight bash %}
 >  ./deploy/kafka/bin/kafka-console-consumer.sh --topic word-count-output --zookeeper localhost:2181 --from-beginning
@@ -235,20 +240,6 @@ and: 243
 from: 16
 {% endhighlight %}
 
-### More Examples
-
-The [hello-samza](https://github.com/apache/samza-hello-samza) project contains a lot of more examples to help you create your Samza job. To checkout the hello-samza project:
-
-{% highlight bash %}
-> git clone https://git.apache.org/samza-hello-samza.git hello-samza
-{% endhighlight %}
-
-There are four main categories of examples in this project, including:
-
-1. [wikipedia](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia): this is a more complex example demonstrating the entire pipeline of consuming from the live feed from wikipedia edits, parsing the message and generating statistics from them.
-
-2. [cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook): you will find various examples in this folder to demonstrate usage of Samza high-level API, such as windowing, join and aggregations.
-
-3. [asure](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/azure): this example shows how to run your application on Microsoft Asure.
+Congratulations! You've successfully run your first Samza application.
 
-4. [kinesis](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/kinesis): this example shows how to consume from Kinesis streams
\ No newline at end of file
+### [More Examples >>](/startup/code-examples/{{site.version}})
\ No newline at end of file


[6/9] samza git commit: atoomula and prateekm FYI..

Posted by ja...@apache.org.
atoomula and prateekm FYI..

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #785 from vjagadish1989/website-reorg28


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ac5f9480
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ac5f9480
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ac5f9480

Branch: refs/heads/1.0.0
Commit: ac5f948018cb05bee5f1c44568146961152863aa
Parents: 299c031
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Oct 30 12:11:56 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:32:55 2018 -0800

----------------------------------------------------------------------
 .../versioned/connectors/kinesis.md             | 104 +++++++++++--------
 1 file changed, 61 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ac5f9480/docs/learn/documentation/versioned/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kinesis.md b/docs/learn/documentation/versioned/connectors/kinesis.md
index a866484..85149f6 100644
--- a/docs/learn/documentation/versioned/connectors/kinesis.md
+++ b/docs/learn/documentation/versioned/connectors/kinesis.md
@@ -19,11 +19,16 @@ title: Kinesis Connector
    limitations under the License.
 -->
 
-## Overview
+## Kinesis I/O: Quickstart
 
-The Samza Kinesis connector provides access to [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams),
-Amazon’s data streaming service. A Kinesis Data Stream is similar to a Kafka topic and can have multiple partitions.
-Each message consumed from a Kinesis Data Stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
+The Samza Kinesis connector allows you to interact with [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams),
+Amazon’s data streaming service. The `hello-samza` project includes an example of processing Kinesis streams using Samza. Here is the complete [source code](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java) and [configs](https://github.com/apache/samza-hello-samza/blob/master/src/main/config/kinesis-hello-samza.properties).
+You can build and run this example using this [tutorial](https://github.com/apache/samza-hello-samza#hello-samza).
+
+
+###Data Format
+Like a Kafka topic, a Kinesis stream can have multiple shards with producers and consumers.
+Each message consumed from the stream is an instance of a Kinesis [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
 Samza’s [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java)
 wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java).
 
@@ -31,90 +36,103 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac
 
 ### Basic Configuration
 
-You can configure your Samza jobs to process data from Kinesis Streams. To configure Samza job to consume from Kinesis
-streams, please add the below configuration:
+Here is the required configuration for consuming messages from Kinesis. 
 
 {% highlight jproperties %}
-// define a kinesis system factory with your identifier. eg: kinesis-system
-systems.kinesis-system.samza.factory=org.apache.samza.system.eventhub.KinesisSystemFactory
+// Define a Kinesis system factory with your identifier. eg: kinesis-system
+systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
 
-// kinesis system consumer works with only AllSspToSingleTaskGrouperFactory
+// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory
 job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
 
-// define your streams
+// Define your streams
 task.inputs=kinesis-system.input0
 
-// define required properties for your streams
+// Define required properties for your streams
 systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
 systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
 sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
 {% endhighlight %}
 
-The tuple required to access the Kinesis data stream must be provided, namely the following fields:<br>
-**YOUR-STREAM-REGION**, **YOUR-ACCESS-KEY**, **YOUR-SECRET-KEY**.
+####Coordination
+The Kinesis system consumer does not rely on Samza's coordination mechanism. Instead, it uses the Kinesis client library (KCL) for coordination and distributing available shards among available instances. Hence, you should
+set your `grouper` configuration to `AllSspToSingleTaskGrouperFactory`.
 
+{% highlight jproperties %}
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+{% endhighlight %}
 
-### Advanced Configuration
+####Security
 
-#### AWS Client configs
-You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html)
-with the prefix **systems.system-name.aws.clientConfig.***
+Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS. 
 
 {% highlight jproperties %}
-systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
+systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
+systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
+sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
 {% endhighlight %}
 
-As an example, to set a *proxy host* and *proxy port* for the AWS Client:
+### Advanced Configuration
+
+#### Kinesis Client Library Configs
+Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
+(KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
+for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix.
 
 {% highlight jproperties %}
-systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
-systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
+systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
 {% endhighlight %}
 
-#### Kinesis Client Library Configs
-Samza Kinesis Connector uses [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
-(KCL) to access the Kinesis data streams. You can set any [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
-for a stream by configuring it under **systems.system-name.streams.stream-name.aws.kcl.***
+As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance.
+{% highlight jproperties %}
+systems.system-name.streams.stream-name.aws.kcl.TableName=myTable
+{% endhighlight %}
+
+#### AWS Client configs
+Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance.
+You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix.
 
 {% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
+systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
 {% endhighlight %}
 
-Obtain the config param from the public functions in [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
-by removing the *"with"* prefix. For example: config param corresponding to **withTableName()** is **TableName**.
+As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client:
+{% highlight jproperties %}
+systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
+systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
+{% endhighlight %}
 
 ### Resetting Offsets
 
-The source of truth for checkpointing while using Kinesis Connector is not the Samza checkpoint topic but Kinesis itself.
-The Kinesis Client Library (KCL) [uses DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html)
-to store it’s checkpoints. By default, Kinesis Connector reads from the latest offset in the stream.
-
-To reset the checkpoints and consume from earliest/latest offset of a Kinesis data stream, please change the KCL TableName
-and set the appropriate starting position for the stream as shown below.
+Unlike other connectors where Samza stores and manages checkpointed offsets, Kinesis checkpoints are stored in a [DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html) table.
+These checkpoints are stored and managed by the KCL library internally. You can reset the checkpoints by configuring a different name for the DynamoDB table. 
 
 {% highlight jproperties %}
-// change the TableName to a unique name to reset checkpoint.
+// change the TableName to a unique name to reset checkpoints.
 systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
+{% endhighlight %}
+
+When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream.  
+
+{% highlight jproperties %}
 // set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
-systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=my-start-position
+systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST
 {% endhighlight %}
 
-To manipulate checkpoints to start from a particular position in the Kinesis stream, in lieu of Samza CheckpointTool,
-please login to the AWS Console and change the offsets in the DynamoDB Table with the table name that you have specified
-in the config above. By default, the table name has the following format:
-"\<job name\>-\<job id\>-\<kinesis stream\>".
+Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table.
+By default, the table-name has the following format: "\<job name\>-\<job id\>-\<kinesis stream\>".
 
 ### Known Limitations
 
-The following limitations apply to Samza jobs consuming from Kinesis streams using the Samza consumer:
+The following limitations apply to Samza jobs consuming from Kinesis streams :
 
 - Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by
 chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the
 data from Kafka.
 - Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html)
 or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams.
-- Kinesis streams must be used ONLY with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java)
-as the Kinesis consumer does the partition management by itself. No other grouper is supported.
+- Kinesis streams must be used only with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java)
+as the Kinesis consumer does the partition management by itself. No other grouper is currently supported.
 - A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results
 to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.
 


[8/9] samza git commit: Updated API documentation for high and low level APIs.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/programming-model.md b/docs/learn/documentation/versioned/api/programming-model.md
index 1c9bd1c..efdcfa3 100644
--- a/docs/learn/documentation/versioned/api/programming-model.md
+++ b/docs/learn/documentation/versioned/api/programming-model.md
@@ -18,74 +18,81 @@ title: Programming Model
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-# Introduction
-Samza provides different sets of programming APIs to meet requirements from different sets of users. The APIs are listed below:
+### Introduction
+Samza provides multiple programming APIs to fit your use case:
 
-1. Java programming APIs: Samza provides Java programming APIs for users who are familiar with imperative programming languages. The overall programming model to create a Samza application in Java will be described here. Samza also provides two sets of APIs to describe user processing logic:
-    1. [High-level API](high-level-api.md): this API allows users to describe the end-to-end stream processing pipeline in a connected DAG (Directional Acyclic Graph). It also provides a rich set of build-in operators to help users implementing common transformation logic, such as filter, map, join, and window.
-    2. [Task API](low-level-api.md): this is low-level Java API which provides “bare-metal” programming interfaces to the users. Task API allows users to explicitly access physical implementation details in the system, such as accessing the physical system stream partition of an incoming message and explicitly controlling the thread pool to execute asynchronous processing method.
-2. [Samza SQL](samza-sql.md): Samza provides SQL for users who are familiar with declarative query languages, which allows the users to focus on data manipulation via SQL predicates and UDFs, not the physical implementation details.
-3. Beam API: Samza also provides a [Beam runner](https://beam.apache.org/documentation/runners/capability-matrix/) to run applications written in Beam API. This is considered as an extension to existing operators supported by the high-level API in Samza.
+1. Java APIs: Samza's provides two Java programming APIs that are ideal for building advanced Stream Processing applications. 
+    1. [High Level Streams API](high-level-api.md): Samza's flexible High Level Streams API lets you describe your complex stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of operations on message streams. It provides a rich set of built-in operators that simplify common stream processing operations such as filtering, projection, repartitioning, joins, and windows.
+    2. [Low Level Task API](low-level-api.md): Samza's powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. 
+2. [Samza SQL](samza-sql.md): Samza SQL provides a declarative query language for describing your stream processing logic. It lets you manipulate streams using SQL predicates and UDFs instead of working with the physical implementation details.
+3. Apache Beam API: Samza also provides a [Apache Beam runner](https://beam.apache.org/documentation/runners/capability-matrix/) to run applications written using the Apache Beam API. This is considered as an extension to the operators supported by the High Level Streams API in Samza.
 
-The following sections will be focused on Java programming APIs.
-
-# Key Concepts for a Samza Java Application
-To write a Samza Java application, you will typically follow the steps below:
-1. Define your input and output streams and tables
-2. Define your main processing logic
 
+### Key Concepts
 The following sections will talk about key concepts in writing your Samza applications in Java.
 
-## Samza Applications
-When writing your stream processing application using Java API in Samza, you implement either a [StreamApplication](javadocs/org/apache/samza/application/StreamApplication.html) or [TaskApplication](javadocs/org/apache/samza/application/TaskApplication.html) and define your processing logic in the describe method.
-- For StreamApplication:
+#### Samza Applications
+A [SamzaApplication](javadocs/org/apache/samza/application/SamzaApplication.html) describes the inputs, outputs, state, configuration and the logic for processing data from one or more streaming sources. 
+
+You can implement a 
+[StreamApplication](javadocs/org/apache/samza/application/StreamApplication.html) and use the provided [StreamApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor) to describe the processing logic using Samza's High Level Streams API in terms of [MessageStream](javadocs/org/apache/samza/operators/MessageStream.html) operators. 
 
 {% highlight java %}
-    
-    public void describe(StreamApplicationDescriptor appDesc) { … }
+
+    public class MyStreamApplication implements StreamApplication {
+        @Override
+        public void describe(StreamApplicationDescriptor appDesc) {
+            // Describe your application here 
+        }
+    }
 
 {% endhighlight %}
+
+Alternatively, you can implement a [TaskApplication](javadocs/org/apache/samza/application/TaskApplication.html) and use the provided [TaskApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor) to describe it using Samza's Low Level API in terms of per-message processing logic.
+
+
 - For TaskApplication:
 
 {% highlight java %}
     
-    public void describe(TaskApplicationDescriptor appDesc) { … }
+    public class MyTaskApplication implements TaskApplication {
+        @Override
+        public void describe(TaskApplicationDescriptor appDesc) {
+            // Describe your application here
+        }
+    }
 
 {% endhighlight %}
 
-## Descriptors for Data Streams and Tables
-There are three different types of descriptors in Samza: [InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html), [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html), and [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html). The InputDescriptor and OutputDescriptor are used to describe the physical sources and destinations of a stream, while a TableDescriptor is used to describe the physical dataset and IO functions for a table.
-Usually, you will obtain InputDescriptor and OutputDescriptor from a [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor.html), which include all information about producer and consumers to a physical system. The following code snippet illustrate how you will obtain InputDescriptor and OutputDescriptor from a SystemDescriptor.
 
-{% highlight java %}
-    
-    public class BadPageViewFilter implements StreamApplication {
-      @Override
-      public void describe(StreamApplicationDescriptor appDesc) {
-        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
-        InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
-        OutputDescriptor<DecoratedPageView> pageViewOutput = kafka.getOutputDescriptor(“decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));
+#### Streams and Table Descriptors
+Descriptors let you specify the properties of various aspects of your application from within it. 
 
-        // Now, implement your main processing logic
-      }
-    }
-    
-{% endhighlight %}
+[InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html)s and [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html)s can be used for specifying Samza and implementation-specific properties of the streaming inputs and outputs for your application. You can obtain InputDescriptors and OutputDescriptors using a [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor.html) for your system. This SystemDescriptor can be used for specify Samza and implementation-specific properties of the producer and consumers for your I/O system. Most Samza system implementations come with their own SystemDescriptors, but if one isn't available, you 
+can use the [GenericSystemDescriptor](javadocs/org/apache/samza/system/descriptors/GenericSystemDescriptor.html).
 
-You can also add a TableDescriptor to your application.
+A [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html) can be used for specifying Samza and implementation-specific properties of a [Table](javadocs/org/apache/samza/table/Table.html). You can use a Local TableDescriptor (e.g. [RocksDbTableDescriptor](javadocs/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.html) or a [RemoteTableDescriptor](javadocs/org/apache/samza/table/descriptors/RemoteTableDescriptor).
+
+
+The following example illustrates how you can use input and output descriptors for a Kafka system, and a table descriptor for a local RocksDB table within your application:
 
 {% highlight java %}
-     
-    public class BadPageViewFilter implements StreamApplication {
+    
+    public class MyStreamApplication implements StreamApplication {
       @Override
-      public void describe(StreamApplicationDescriptor appDesc) {
-        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
-        InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
-        OutputDescriptor<DecoratedPageView> pageViewOutput = kafka.getOutputDescriptor(“decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));
-        TableDescriptor<String, Integer> viewCountTable = new RocksDBTableDescriptor(
-            “pageViewCountTable”, KVSerde.of(new StringSerde(), new IntegerSerde()));
-
-        // Now, implement your main processing logic
+      public void describe(StreamApplicationDescriptor appDescriptor) {
+        KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka")
+            .withConsumerZkConnect(ImmutableList.of("..."))
+            .withProducerBootstrapServers(ImmutableList.of("...", "..."));
+        KafkaInputDescriptor<PageView> kid = 
+            ksd.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
+        KafkaOutputDescriptor<DecoratedPageView> kod = 
+            ksd.getOutputDescriptor(“decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));
+
+        RocksDbTableDescriptor<String, Integer> td = 
+            new RocksDbTableDescriptor(“viewCounts”, KVSerde.of(new StringSerde(), new IntegerSerde()));
+            
+        // Implement your processing logic here
       }
     }
     
@@ -93,21 +100,21 @@ You can also add a TableDescriptor to your application.
 
 The same code in the above describe method applies to TaskApplication as well.
 
-## Stream Processing Logic
+#### Stream Processing Logic
 
-Samza provides two sets of APIs to define the main stream processing logic, high-level API and Task API, via StreamApplication and TaskApplication, respectively. 
+Samza provides two sets of APIs to define the main stream processing logic, High Level Streams API and Low Level Task API, via StreamApplication and TaskApplication, respectively. 
 
-High-level API allows you to describe the processing logic in a connected DAG of transformation operators, like the example below:
+High Level Streams API allows you to describe the processing logic in a connected DAG of transformation operators, like the example below:
 
 {% highlight java %}
 
     public class BadPageViewFilter implements StreamApplication {
       @Override
       public void describe(StreamApplicationDescriptor appDesc) {
-        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
+        KafkaSystemDescriptor ksd = new KafkaSystemDescriptor();
         InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
         OutputDescriptor<DecoratedPageView> pageViewOutput = kafka.getOutputDescriptor(“decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));
-        TableDescriptor<String, Integer> viewCountTable = new RocksDBTableDescriptor(
+        RocksDbTableDescriptor<String, Integer> viewCountTable = new RocksDbTableDescriptor(
             “pageViewCountTable”, KVSerde.of(new StringSerde(), new IntegerSerde()));
 
         // Now, implement your main processing logic
@@ -120,7 +127,7 @@ High-level API allows you to describe the processing logic in a connected DAG of
     
 {% endhighlight %}
 
-Task API allows you to describe the processing logic in a customized StreamTaskFactory or AsyncStreamTaskFactory, like the example below:
+Low Level Task API allows you to describe the processing logic in a customized StreamTaskFactory or AsyncStreamTaskFactory, like the example below:
 
 {% highlight java %}
 
@@ -130,7 +137,7 @@ Task API allows you to describe the processing logic in a customized StreamTaskF
         KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
         InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
         OutputDescriptor<DecoratedPageView> pageViewOutput = kafka.getOutputDescriptor(“decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));
-        TableDescriptor<String, Integer> viewCountTable = new RocksDBTableDescriptor(
+        RocksDbTableDescriptor<String, Integer> viewCountTable = new RocksDbTableDescriptor(
             “pageViewCountTable”, KVSerde.of(new StringSerde(), new IntegerSerde()));
 
         // Now, implement your main processing logic
@@ -142,11 +149,10 @@ Task API allows you to describe the processing logic in a customized StreamTaskF
     
 {% endhighlight %}
 
-Details for [high-level API](high-level-api.md) and [Task API](low-level-api.md) are explained later.
+#### Configuration for a Samza Application
 
-## Configuration for a Samza Application
+To deploy a Samza application, you need to specify the implementation class for your application and the ApplicationRunner to launch your application. The following is an incomplete example of minimum required configuration to set up the Samza application and the runner. For additional configuration, see the Configuration Reference.
 
-To deploy a Samza application, you will need to specify the implementation class for your application and the ApplicationRunner to launch your application. The following is an incomplete example of minimum required configuration to set up the Samza application and the runner:
 {% highlight jproperties %}
     
     # This is the class implementing StreamApplication

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/samza-sql.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/samza-sql.md b/docs/learn/documentation/versioned/api/samza-sql.md
index 13b059f..7412f6c 100644
--- a/docs/learn/documentation/versioned/api/samza-sql.md
+++ b/docs/learn/documentation/versioned/api/samza-sql.md
@@ -87,7 +87,7 @@ Note: Samza sql console right now doesn’t support queries that need state, for
 
 
 # Running Samza SQL on YARN
-The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the low level task API, high level API as well as Samza SQL.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the Low Level Task API, High Level Streams API as well as Samza SQL.
 
 This tutorial demonstrates a simple Samza application that uses SQL to perform stream processing.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/table-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md
index f5efa88..0a9c33c 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -181,7 +181,7 @@ join with a table and finally write the output to another table.
 
 # Using Table with Samza Low Level API
 
-The code snippet below illustrates the usage of table in Samza low level API.
+The code snippet below illustrates the usage of table in Samza Low Level Task API.
 
 {% highlight java %}
  1  class SamzaTaskApplication implements TaskApplication {
@@ -273,8 +273,7 @@ The table below summarizes table metrics:
 
 [`RemoteTable`](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java) 
 provides a unified abstraction for Samza applications to access any remote data 
-store through stream-table join in high-level API or direct access in low-level 
-API. Remote Table is a store-agnostic abstraction that can be customized to 
+store through stream-table join in High Level Streams API or direct access in Low Level Task API. Remote Table is a store-agnostic abstraction that can be customized to 
 access new types of stores by writing pluggable I/O "Read/Write" functions, 
 implementations of 
 [`TableReadFunction`](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java) and 
@@ -283,7 +282,7 @@ interfaces. Remote Table also provides common functionality, eg. rate limiting
 (built-in) and caching (hybrid).
 
 The async APIs in Remote Table are recommended over the sync versions for higher 
-throughput. They can be used with Samza with low-level API to achieve the maximum 
+throughput. They can be used with Samza with Low Level Task API to achieve the maximum 
 throughput. 
 
 Remote Tables are represented by class 
@@ -420,7 +419,7 @@ created during instantiation of Samza container.
 The life of a table goes through a few phases
 
 1. **Declaration** - at first one declares the table by creating a `TableDescriptor`. In both 
-   Samza high level and low level API, the `TableDescriptor` is registered with stream 
+   Samza High Level Streams API and Low Level Task API, the `TableDescriptor` is registered with stream 
    graph, internally converted to `TableSpec` and in return a reference to a `Table` 
    object is obtained that can participate in the building of the DAG.
 2. **Instantiation** - during planning stage, configuration is 
@@ -436,7 +435,7 @@ The life of a table goes through a few phases
    * In Samza high level API, all table instances can be retrieved from `TaskContext` using 
      table-id during initialization of a 
      [`InitableFunction`] (https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java).
-   * In Samza low level API, all table instances can be retrieved from `TaskContext` using 
+   * In Samza Low Level Task API, all table instances can be retrieved from `TaskContext` using 
      table-id during initialization of a 
    [`InitableTask`] (https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/task/InitableTask.java).
 4. **Cleanup** - 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/connectors/eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/eventhubs.md b/docs/learn/documentation/versioned/connectors/eventhubs.md
index 0be288b..9fdc861 100644
--- a/docs/learn/documentation/versioned/connectors/eventhubs.md
+++ b/docs/learn/documentation/versioned/connectors/eventhubs.md
@@ -126,7 +126,7 @@ In this section, we will walk through a simple pipeline that reads from one Even
 4    MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
 5    OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
 
-    // Define the execution flow with the high-level API
+    // Define the execution flow with the High Level Streams API
 6    eventhubInput
 7        .map((message) -> {
 8          System.out.println("Received Key: " + message.getKey());

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kafka.md b/docs/learn/documentation/versioned/connectors/kafka.md
index b71c736..9628130 100644
--- a/docs/learn/documentation/versioned/connectors/kafka.md
+++ b/docs/learn/documentation/versioned/connectors/kafka.md
@@ -24,9 +24,9 @@ Samza offers built-in integration with Apache Kafka for stream processing. A com
 
 The `hello-samza` project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results. 
 
-- [High-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) with a corresponding [tutorial](/learn/documentation/{{site.version}}/deployment/yarn.html#starting-your-application-on-yarn)
+- [High Level Streams API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) with a corresponding [tutorial](/learn/documentation/{{site.version}}/deployment/yarn.html#starting-your-application-on-yarn)
 
-- [Low-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) with a corresponding [tutorial](https://github.com/apache/samza-hello-samza#hello-samza)
+- [Low Level Task API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) with a corresponding [tutorial](https://github.com/apache/samza-hello-samza#hello-samza)
 
 
 ### Concepts
@@ -105,7 +105,7 @@ The above example configures Samza to ignore checkpointed offsets for `page-view
 
  
 
-### Code walkthrough: High-level API
+### Code walkthrough: High Level Streams API
 
 In this section, we walk through a complete example that reads from a Kafka topic, filters a few messages and writes them to another topic.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/core-concepts/core-concepts.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/core-concepts/core-concepts.md b/docs/learn/documentation/versioned/core-concepts/core-concepts.md
index 479ebcb..b69de3d 100644
--- a/docs/learn/documentation/versioned/core-concepts/core-concepts.md
+++ b/docs/learn/documentation/versioned/core-concepts/core-concepts.md
@@ -55,7 +55,7 @@ A stream can have multiple producers that write data to it and multiple consumer
 
 A stream is sharded into multiple partitions for scaling how its data is processed. Each _partition_ is an ordered, replayable sequence of records. When a message is written to a stream, it ends up in one its partitions. Each message in a partition is uniquely identified by an _offset_. 
 
-Samza supports for pluggable systems that can implement the stream abstraction. As an example, Kafka implements a stream as a topic while another database might implement a stream as a sequence of updates to its tables.
+Samza supports pluggable systems that can implement the stream abstraction. As an example, Kafka implements a stream as a topic while a database might implement a stream as a sequence of updates to its tables.
 
 ## Stream Application
 A _stream application_ processes messages from input streams, transforms them and emits results to an output stream or a database. It is built by chaining multiple operators, each of which take in one or more streams and transform them.
@@ -63,19 +63,19 @@ A _stream application_ processes messages from input streams, transforms them an
 ![diagram-medium](/img/{{site.version}}/learn/documentation/core-concepts/stream-application.png)
 
 Samza offers three top-level APIs to help you build your stream applications: <br/>
-1. The [Samza Streams DSL](/learn/documentation/{{site.version}}/api/high-level-api.html),  which offers several built-in operators like map, filter etc. This is the recommended API for most use-cases. <br/>
-2. The [low-level API](/learn/documentation/{{site.version}}/api/low-level-api.html), which allows greater flexibility to define your processing-logic and offers for greater control <br/>
+1. The [High Level Streams API](/learn/documentation/{{site.version}}/api/high-level-api.html),  which offers several built-in operators like map, filter etc. This is the recommended API for most use-cases. <br/>
+2. The [Low Level Task API](/learn/documentation/{{site.version}}/api/low-level-api.html), which allows greater flexibility to define your processing-logic and offers greater control <br/>
 3. [Samza SQL](/learn/documentation/{{site.version}}/api/samza-sql.html), which offers a declarative SQL interface to create your applications <br/>
 
 ## State
-Samza supports for both stateless and stateful stream processing. _Stateless processing_, as the name implies, does not retain any state associated with the current message after it has been processed. A good example of this is to filter an incoming stream of user-records by a field (eg:userId) and write the filtered messages to their own stream. 
+Samza supports for both stateless and stateful stream processing. _Stateless processing_, as the name implies, does not retain any state associated with the current message after it has been processed. A good example of this is filtering an incoming stream of user-records by a field (eg:userId) and writing the filtered messages to their own stream. 
 
-In contrast, _stateful processing_ requires you to record some state about a message even after processing it. Consider the example of counting the number of unique users to a website every five minutes. This requires you to record state about each user seen thus far, for deduping later. Samza offers a fault-tolerant, scalable state-store for this purpose.
+In contrast, _stateful processing_ requires you to record some state about a message even after processing it. Consider the example of counting the number of unique users to a website every five minutes. This requires you to store information about each user seen thus far for de-duplication. Samza offers a fault-tolerant, scalable state-store for this purpose.
 
 ## Time
-Time is a fundamental concept in stream processing, especially how it is modeled and interpreted by the system. Samza supports two notions of dealing with time. By default, all built-in Samza operators use processing time. In processing time, the timestamp of a message is determined by when it is processed by the system. For example, an event generated by a sensor could be processed by Samza several milliseconds later. 
+Time is a fundamental concept in stream processing, especially in how it is modeled and interpreted by the system. Samza supports two notions of time. By default, all built-in Samza operators use processing time. In processing time, the timestamp of a message is determined by when it is processed by the system. For example, an event generated by a sensor could be processed by Samza several milliseconds later. 
 
-On the other hand, in event time, the timestamp of an event is determined by when it actually occurred in the source. For example, a sensor which generates an event could embed the time of occurrence as a part of the event itself. Samza provides event-time based processing by its integration with [Apache BEAM](https://beam.apache.org/documentation/runners/samza/).
+On the other hand, in event time, the timestamp of an event is determined by when it actually occurred at the source. For example, a sensor which generates an event could embed the time of occurrence as a part of the event itself. Samza provides event-time based processing by its integration with [Apache BEAM](https://beam.apache.org/documentation/runners/samza/).
 
 ## Processing guarantee
 Samza supports at-least once processing. As the name implies, this ensures that each message in the input stream is processed by the system at-least once. This guarantees no data-loss even when there are failures, thereby making Samza a practical choice for building fault-tolerant applications.

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/hadoop/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/hadoop/overview.md b/docs/learn/documentation/versioned/hadoop/overview.md
index 0820127..3d8caaa 100644
--- a/docs/learn/documentation/versioned/hadoop/overview.md
+++ b/docs/learn/documentation/versioned/hadoop/overview.md
@@ -29,7 +29,7 @@ Samza provides a single set of APIs for both batch and stream processing. This u
 
 ### Multi-stage Batch Pipeline
 
-Complex data pipelines usually consist multiple stages, with data shuffled (repartitioned) between stages to enable key-based operations such as windowing, aggregation, and join. Samza [high-level API](/startup/preview/index.html) provides an operator named `partitionBy` to create such multi-stage pipelines. Internally, Samza creates a physical stream, called an “intermediate stream”, based on the system configured as in `job.default.system`. Samza repartitions the output of the previous stage by sending it to the intermediate stream with the appropriate partition count and partition key. It then feeds it to the next stage of the pipeline. The lifecycle of intermediate streams is completely managed by Samza so from the user perspective the data shuffling is automatic.
+Complex data pipelines usually consist multiple stages, with data shuffled (repartitioned) between stages to enable key-based operations such as windowing, aggregation, and join. Samza [High Level Streams API](/startup/preview/index.html) provides an operator named `partitionBy` to create such multi-stage pipelines. Internally, Samza creates a physical stream, called an “intermediate stream”, based on the system configured as in `job.default.system`. Samza repartitions the output of the previous stage by sending it to the intermediate stream with the appropriate partition count and partition key. It then feeds it to the next stage of the pipeline. The lifecycle of intermediate streams is completely managed by Samza so from the user perspective the data shuffling is automatic.
 
 For a single-stage pipeline, dealing with bounded data sets is straightforward: the system consumer “knows” the end of a particular partition, and it will emit end-of-stream token once a partition is complete. Samza will shut down the container when all its input partitions are complete.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index 893e428..2d5446b 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -26,19 +26,15 @@ title: Documentation
 <h4>API</h4>
 
 <ul class="documentation-list">
-  <li><a href="api/low-level-api.html">Low-level API</a></li>
-  <li><a href="api/high-level-api.html">Streams DSL</a></li>
+  <li><a href="api/programming-model.html">API overview</a></li>
+  <li><a href="api/high-level-api.html">High Level Streams API</a></li>
+  <li><a href="api/low-level-api.html">Low Level Task API</a></li>
   <li><a href="api/table-api.html">Table API</a></li>
   <li><a href="api/samza-sql.html">Samza SQL</a></li>
   <li><a href="https://beam.apache.org/documentation/runners/samza/">Apache BEAM</a></li>
-<!-- TODO comparisons pages
-  <li><a href="comparisons/aurora.html">Aurora</a></li>
-  <li><a href="comparisons/jms.html">JMS</a></li>
-  <li><a href="comparisons/s4.html">S4</a></li>
--->
 </ul>
 
-<h4>Deployment</h4>
+<h4>DEPLOYMENT</h4>
 
 <ul class="documentation-list">
   <li><a href="deployment/deployment-model.html">Deployment options</a></li>
@@ -46,7 +42,7 @@ title: Documentation
   <li><a href="deployment/standalone.html">Run as an embedded library</a></li>
 </ul>
 
-<h4>Connectors</h4>
+<h4>CONNECTORS</h4>
 
 <ul class="documentation-list">
   <li><a href="connectors/overview.html">Connectors overview</a></li>
@@ -56,7 +52,7 @@ title: Documentation
   <li><a href="connectors/kinesis.html">AWS Kinesis</a></li>
 </ul>
 
-<h4>Operations</h4>
+<h4>OPERATIONS</h4>
 
 <ul class="documentation-list">
   <li><a href="operations/monitoring.html">Monitoring</a></li>

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/jobs/samza-configurations.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 5828969..e55446a 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -249,7 +249,7 @@ These properties define Samza's storage mechanism for efficient [stateful stream
 
 |Name|Default|Description|
 |--- |--- |--- |
-|stores.**_store-name_**.factory| |You can give a store any **_store-name_** except `default` (`default` is reserved for defining default store parameters), and use that name to get a reference to the store in your Samza application (call [TaskContext.getStore()](../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)) in your task's [`init()`](../api/javadocs/org/apache/samza/task/InitableTask.html#init) method for the low-level API and in your application function's [`init()`](..api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init) method for the high level API. The value of this property is the fully-qualified name of a Java class that implements [`StorageEngineFactory`](../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html). Samza currently ships with two storage engine implementations: <br><br>`org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory` <br>An on-disk storage engine with a key-value interface, 
 implemented using [RocksDB](http://rocksdb.org/). It supports fast random-access reads and writes, as well as range queries on keys. RocksDB can be configured with various additional tuning parameters.<br><br> `org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory`<br> In memory implementation of a key-value store. Uses `util.concurrent.ConcurrentSkipListMap` to store the keys in order.|
+|stores.**_store-name_**.factory| |You can give a store any **_store-name_** except `default` (`default` is reserved for defining default store parameters), and use that name to get a reference to the store in your Samza application (call [TaskContext.getStore()](../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)) in your task's [`init()`](../api/javadocs/org/apache/samza/task/InitableTask.html#init) method for the Low Level Task API and in your application function's [`init()`](..api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init) method for the high level API. The value of this property is the fully-qualified name of a Java class that implements [`StorageEngineFactory`](../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html). Samza currently ships with two storage engine implementations: <br><br>`org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory` <br>An on-disk storage engine with a key-value interf
 ace, implemented using [RocksDB](http://rocksdb.org/). It supports fast random-access reads and writes, as well as range queries on keys. RocksDB can be configured with various additional tuning parameters.<br><br> `org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory`<br> In memory implementation of a key-value store. Uses `util.concurrent.ConcurrentSkipListMap` to store the keys in order.|
 |stores.**_store-name_**.key.serde| |If the storage engine expects keys in the store to be simple byte arrays, this [serde](../container/serialization.html) allows the stream task to access the store using another object type as key. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, keys are passed unmodified to the storage engine (and the changelog stream, if appropriate).|
 |stores.**_store-name_**.msg.serde| |If the storage engine expects values in the store to be simple byte arrays, this [serde](../container/serialization.html) allows the stream task to access the store using another object type as value. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, values are passed unmodified to the storage engine (and the changelog stream, if appropriate).|
 |stores.**_store-name_**.changelog| |Samza stores are local to a container. If the container fails, the contents of the store are lost. To prevent loss of data, you need to set this property to configure a changelog stream: Samza then ensures that writes to the store are replicated to this stream, and the store is restored from this stream after a failure. The value of this property is given in the form system-name.stream-name. The "system-name" part is optional. If it is omitted you must specify the system in job.changelog.system config. Any output stream can be used as changelog, but you must ensure that only one job ever writes to a given changelog stream (each instance of a job and each store needs its own changelog stream).|

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/operations/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md
index d93e497..ec8430c 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -37,8 +37,8 @@ Like any other production software, it is critical to monitor the health of our
   + [A.3 Creating a Custom MetricsReporter](#customreporter)
 * [B. Metric Types in Samza](#metrictypes)
 * [C. Adding User-Defined Metrics](#userdefinedmetrics)
-  + [Low-level API](#lowlevelapi)
-  + [High-Level API](#highlevelapi)
+  + [Low Level Task API](#lowlevelapi)
+  + [High Level Streams API](#highlevelapi)
 * [D. Key Internal Samza Metrics](#keyinternalsamzametrics)
   + [D.1 Vital Metrics](#vitalmetrics)
   + [D.2 Store Metrics](#storemetrics)
@@ -232,7 +232,7 @@ To add a new metric, you can simply use the _MetricsRegistry_ in the provided Ta
 the init() method to register new metrics. The code snippets below show examples of registering and updating a user-defined
  Counter metric. Timers and gauges can similarly be used from within your task class.
 
-### <a name="lowlevelapi"></a> Low-level API
+### <a name="lowlevelapi"></a> Low Level Task API
 
 Simply have your task implement the InitableTask interface and access the MetricsRegistry from the TaskContext.
 
@@ -252,9 +252,9 @@ public class MyJavaStreamTask implements StreamTask, InitableTask {
 }
 ```
 
-### <a name="highlevelapi"></a> High-Level API
+### <a name="highlevelapi"></a> High Level Streams API
 
-In the high-level API, you can define a ContextManager and access the MetricsRegistry from the TaskContext, using which you can add and update your metrics.
+In the High Level Streams API, you can define a ContextManager and access the MetricsRegistry from the TaskContext, using which you can add and update your metrics.
 
 ```
 public class MyJavaStreamApp implements StreamApplication {

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
index 1c06116..a881b51 100644
--- a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
+++ b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
@@ -35,7 +35,7 @@ cd hello-samza
 git checkout latest
 {% endhighlight %}
 
-This project already contains implementations of the wikipedia application using both the low-level task API and the high-level API. The low-level task implementations are in the `samza.examples.wikipedia.task` package. The high-level application implementation is in the `samza.examples.wikipedia.application` package.
+This project already contains implementations of the wikipedia application using both the Low Level Task API and the High Level Streams API. The Low Level Task API implementations are in the `samza.examples.wikipedia.task` package. The High Level Streams API implementation is in the `samza.examples.wikipedia.application` package.
 
 This tutorial will provide step by step instructions to recreate the existing wikipedia application.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md b/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md
index cfe589a..5127fd6 100644
--- a/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md
+++ b/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md
@@ -18,9 +18,9 @@ title: Hello Samza High Level API - YARN Deployment
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the low level task API as well as the high level API.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the Low Level Task API as well as the High Level Streams API.
 
-This tutorial demonstrates a simple wikipedia application created with the high level API. The [Hello Samza tutorial] (/startup/hello-samza/{{site.version}}/index.html) is the low-level analog to this tutorial. It demonstrates the same logic but is created with the task API. The tutorials are designed to be as similar as possible. The primary differences are that with the high level API we accomplish the equivalent of 3 separate low-level jobs with a single application, we skip the intermediate topics for simplicity, and we can visualize the execution plan after we start the application.
+This tutorial demonstrates a simple wikipedia application created with the High Level Streams API. The [Hello Samza tutorial] (/startup/hello-samza/{{site.version}}/index.html) is the Low Level Task API analog to this tutorial. It demonstrates the same logic but is created with the Low Level Task API. The tutorials are designed to be as similar as possible. The primary differences are that with the High Level Streams API we accomplish the equivalent of 3 separate Low Level Task API jobs with a single application, we skip the intermediate topics for simplicity, and we can visualize the execution plan after we start the application.
 
 ### Get the Code
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/tutorials/versioned/samza-event-hubs-standalone.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/samza-event-hubs-standalone.md b/docs/learn/tutorials/versioned/samza-event-hubs-standalone.md
index 53dbec3..b9fe533 100644
--- a/docs/learn/tutorials/versioned/samza-event-hubs-standalone.md
+++ b/docs/learn/tutorials/versioned/samza-event-hubs-standalone.md
@@ -18,7 +18,7 @@ title: Samza Event Hubs Connectors Example
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-The [hello-samza](https://github.com/apache/samza-hello-samza) project has an example that uses the Samza high-level API to consume and produce from [Event Hubs](../../documentation/versioned/connectors/eventhubs.html) using the Zookeeper deployment model.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project has an example that uses the Samza High Level Streams API to consume and produce from [Event Hubs](../../documentation/versioned/connectors/eventhubs.html) using the Zookeeper deployment model.
 
 #### Get the Code
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/tutorials/versioned/samza-sql.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/samza-sql.md b/docs/learn/tutorials/versioned/samza-sql.md
index f64aa06..71cfcc6 100644
--- a/docs/learn/tutorials/versioned/samza-sql.md
+++ b/docs/learn/tutorials/versioned/samza-sql.md
@@ -68,7 +68,7 @@ Below are some of the sql queries that you can execute using the samza-sql-conso
 
 # Running Samza SQL on YARN
 
-The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the low level task API, high level API as well as Samza SQL.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the Low Level  Task API, High Level Streams API as well as Samza SQL.
 
 This tutorial demonstrates a simple Samza application that uses SQL to perform stream processing.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/startup/code-examples/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/code-examples/versioned/index.md b/docs/startup/code-examples/versioned/index.md
index ba1cc3e..384419d 100644
--- a/docs/startup/code-examples/versioned/index.md
+++ b/docs/startup/code-examples/versioned/index.md
@@ -36,7 +36,7 @@ These include:
 
 - The [Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java]) demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks
 
-- The [Stream-Table Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java) demonstrates how the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.
+- The [Stream-Table Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java) demonstrates how to use the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.
 
 - The [SessionWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java) and [TumblingWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java) examples illustrate Samza's rich windowing and triggering capabilities.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java b/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
index 36003b3..148773f 100644
--- a/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
+++ b/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
@@ -19,12 +19,19 @@
 
 package org.apache.samza.task;
 
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationTaskContext;
+
 /**
  * A ClosableTask augments {@link org.apache.samza.task.StreamTask}, allowing the method implementer to specify
  * code that will be called when the StreamTask is being shut down by the framework, providing to emit final metrics,
  * clean or close resources, etc.  The close method is not guaranteed to be called in event of crash or hard kill
  * of the process.
+ *
+ * Deprecated: It's recommended to manage the lifecycle of any runtime objects using
+ * {@link ApplicationContainerContext} and {@link ApplicationTaskContext} instead.
  */
+@Deprecated
 public interface ClosableTask {
   void close() throws Exception;
 }


[2/9] samza git commit: Clean up the deployment options section.

Posted by ja...@apache.org.
Clean up the deployment options section.

- Reword for consistency, style, tone

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #761 from vjagadish1989/website-reorg24


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ed616ec8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ed616ec8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ed616ec8

Branch: refs/heads/1.0.0
Commit: ed616ec81849834292943564ee5e25bb5dba55d4
Parents: 4b01232
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Oct 24 15:20:02 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:28:34 2018 -0800

----------------------------------------------------------------------
 .../versioned/deployment/deployment-model.md    | 40 ++++++++------------
 docs/learn/documentation/versioned/index.html   |  2 +-
 2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ed616ec8/docs/learn/documentation/versioned/deployment/deployment-model.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/deployment/deployment-model.md b/docs/learn/documentation/versioned/deployment/deployment-model.md
index 9192278..ba100da 100644
--- a/docs/learn/documentation/versioned/deployment/deployment-model.md
+++ b/docs/learn/documentation/versioned/deployment/deployment-model.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Deployment model
+title: Deployment options
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,40 +19,32 @@ title: Deployment model
    limitations under the License.
 -->
 
-# Overview
-One unique thing about Samza is that it provides multiple ways to deploy an application. Each deployment model comes with its own benefits, so you have flexibility in being able to choose which model best fits your needs. Samza supports “write once, run anywhere”, so application logic is the same regardless of the deployment model that you choose.
+### Overview
+A unique thing about Samza is that it provides multiple ways to run your applications. Each deployment model comes with its own benefits, so you have flexibility in choosing whichever fits your needs. Since Samza supports “Write Once, Run Anywhere”, your application logic does not change depending on where you deploy it.
 
-## YARN
-Apache YARN is a technology that manages resources, deploys applications, and monitors applications for a cluster of machines. Samza submits an application to YARN, and YARN assigns resources from across the cluster to that application. Multiple applications can run on a single YARN cluster.
+### Running Samza on YARN
+Samza integrates with [Apache YARN](learn/documentation/{{site.version}}/deployment/yarn.html) for running stream-processing as a managed service. We leverage YARN for isolation, multi-tenancy, resource-management and deployment for your applications. In this mode, you write your Samza application and submit it to be scheduled on a YARN cluster. You also specify its resource requirement - the number of containers needed, number of cores and memory required per-container. Samza then works with YARN to provision resources for your application and run it across a cluster of machines. It also handles failures of individual instances and automatically restarts them.
 
-* Provides central cluster management
-* Each application has an associated application master in YARN to coordinate processing containers
-* Enforces CPU and memory limits
-* Supports multi-tenancy for applications
-* A Samza application is run directly as its own set of processes
-* Automatically restarts containers that have failed
-* Provides centrally managed tools and dashboards
+When multiple applications share the same YARN cluster, they need to be isolated from each other. For this purpose, Samza works with YARN to enforce cpu and memory limits. Any application that uses more than its requested share of memory or cpu is automatically terminated - thereby, allowing multi-tenancy. Just like you would for any YARN-based application, you can use YARN's [web UI](/learn/documentation/{{site.version}}/deployment/yarn.html#application-master-ui) to manage your Samza jobs, view their logs etc.
 
-## Standalone
+### Running Samza in standalone mode
 
-In standalone mode, a Samza application is a library embedded into another application (similar to Kafka Streams). This means that an application owner can control the full lifecycle of the application. Samza will do the coordination between processing containers to ensure that processing is balanced and failures are handled.
+Often you want to embed and integrate Samza as a component within a larger application. To enable this, Samza supports a [standalone mode](learn/documentation/{{site.version}}/deployment/standalone.html) of deployment allowing greater control over your application's life-cycle. In this model, Samza can be used just like any library you import within your Java application. This is identical to using a [high-level Kafka consumer](https://kafka.apache.org/) to process your streams.
 
-* Application owner is free to control cluster management, CPU and memory limits, and multi-tenancy
-* Container coordination is done by Zookeeper out of the box, and container coordination can be extended to be done by a technology other than Zookeeper
-* If containers fail, then partitions will be rebalanced across remaining containers
-* Samza logic can run within the same process as non-Samza logic
-* Application owner can run tools and dashboards wherever the application is deployed
+You can increase your application's capacity by spinning up multiple instances. These instances will then dynamically coordinate with each other and distribute work among themselves. If an instance fails for some reason, the tasks running on it will be re-assigned to the remaining ones. By default, Samza uses [Zookeeper](https://zookeeper.apache.org/) for coordination across individual instances. The coordination logic by itself is pluggable and hence, can integrate with other frameworks.
 
-# Choosing a deployment model
+This mode allows you to bring any cluster-manager or hosting-environment of your choice(eg: [Kubernetes](https://kubernetes.io/), [Marathon](https://mesosphere.github.io/marathon/)) to run your application. You are also free to control memory-limits, multi-tenancy on your own - since Samza is used as a light-weight library.
 
-Here are some guidelines when choosing your deployment model.
+### Choosing a deployment model
 
-* Would you like your Samza application to be embedded as a component of a larger application?
+A common question that we get asked is - "Should I use YARN or standalone?". Here are some guidelines when choosing your deployment model. Since your application logic does not change, it is quite easy to port from one to the other.
+
+* Would you like Samza to be embedded as a component of a larger application?
     * If so, then you should use standalone.
 * Would you like to have out-of-the-box resource management (e.g. CPU/memory limits, restarts on failures)?
     * If so, then you should use YARN.
-* Would you like to have the freedom to deploy and run your application anywhere?
+* Would you like to run your application on any other cluster manager - eg: Kubernetes?
     * If so, then you should use standalone.
 * Would you like to run centrally-managed tools and dashboards?
     * If so, then you should use YARN.
-    * Note: You can still have tools and dashboards when using standalone, but you will need to run them yourself wherever you have actually deployed your application.
+    * Note: You can still have tools and dashboards when using standalone, but you will need to run them yourself wherever your application is deployed.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ed616ec8/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index 50bfd2d..893e428 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -21,7 +21,7 @@ title: Documentation
 
 <h4><a href="core-concepts/core-concepts.html">CORE CONCEPTS</a></h4>
 <h4><a href="architecture/architecture-overview.html">ARCHITECTURE</a></h4>
-<h4><a href="jobs/configuration.html">CONFIGURATIONS</a></h4>
+<h4><a href="jobs/samza-configurations.html">CONFIGURATIONS</a></h4>
 
 <h4>API</h4>
 


[7/9] samza git commit: Cleanup docs for HDFS connector

Posted by ja...@apache.org.
Cleanup docs for HDFS connector

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #793 from vjagadish1989/website-reorg30


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3e397022
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3e397022
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3e397022

Branch: refs/heads/1.0.0
Commit: 3e397022a5a54630d21a1cbbc0c273016592a0c2
Parents: ac5f948
Author: Jagadish <jv...@linkedin.com>
Authored: Fri Nov 2 17:35:20 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:33:26 2018 -0800

----------------------------------------------------------------------
 .../documentation/versioned/connectors/hdfs.md  | 134 +++++++------------
 1 file changed, 50 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3e397022/docs/learn/documentation/versioned/connectors/hdfs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/hdfs.md b/docs/learn/documentation/versioned/connectors/hdfs.md
index 9692d18..9b79f24 100644
--- a/docs/learn/documentation/versioned/connectors/hdfs.md
+++ b/docs/learn/documentation/versioned/connectors/hdfs.md
@@ -21,133 +21,99 @@ title: HDFS Connector
 
 ## Overview
 
-Samza applications can read and process data stored in HDFS. Likewise, you can also write processed results to HDFS.
-
-### Environment Requirement
-
-Your job needs to run on the same YARN cluster which hosts the HDFS you want to consume from (or write into).
+The HDFS connector allows your Samza jobs to read data stored in HDFS files. Likewise, you can write processed results to HDFS. 
+To interact with HDFS, Samza requires your job to run on the same YARN cluster.
 
 ## Consuming from HDFS
+### Input Partitioning
 
-You can configure your Samza job to read from HDFS files with the [HdfsSystemConsumer](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java). Avro encoded records are supported out of the box and it is easy to extend to support other formats (plain text, csv, json etc). See Event Format section below.
-
-### Partitioning
+Partitioning works at the level of individual directories and files. Each directory is treated as its own stream and each of its files is treated as a _partition_. For example, Samza creates 5 partitions when it's reading from a directory containing 5 files. There is no way to parallelize the consumption when reading from a single file - you can only have one container to process the file.
 
-Partitioning works at the level of individual directories and files. Each directory is treated as its own stream, while each of its files is treated as a partition. For example, when reading from a directory on HDFS with 10 files, there will be 10 partitions created. This means that you can have up-to 10 containers to process them. If you want to read from a single HDFS file, there is currently no way to break down the consumption - you can only have one container to process the file.
+### Input Event format
+Samza supports avro natively, and it's easy to extend to other serialization formats. Each avro record read from HDFS is wrapped into a message-envelope. The [envelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains these 3 fields:
 
-### Event format
+- The key, which is empty
 
-Samza's HDFS consumer wraps each avro record read from HDFS into a message-envelope. The [Envelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three fields of interest:
+- The value, which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html)
 
-1. The key, which is empty
-2. The message, which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html)
-3. The stream partition, which is set to the name of the HDFS file
+- The partition, which is set to the name of the HDFS file
 
-To support input formats which are not avro, you can implement the [SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java) interface (example: [AvroFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java))
+To support non-avro input formats, you can implement the [SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java) interface.
 
-### End of stream support
+### EndOfStream
 
-While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of end-of-file.
+While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of EOF. When reading from HDFS, your Samza job automatically exits after consuming all the data. You can implement [EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to get a callback once EOF has been reached. 
 
-When reading from HDFS, your Samza job automatically exits after consuming all the data. You can choose to implement [EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to receive a callback when reaching end of stream. 
 
-### Basic Configuration
+### Defining streams
 
-Here is a few of the basic configs which are required to set up HdfsSystemConsumer:
+Samza uses the notion of a _system_ to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - `HdfsSystemFactory`. You can then associate multiple streams with this _system_. Each stream should have a _physical name_, which should be set to the name of the directory on HDFS.
 
 {% highlight jproperties %}
-# The HDFS system consumer is implemented under the org.apache.samza.system.hdfs package,
-# so use HdfsSystemFactory as the system factory for your system
 systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
 
-# Define the hdfs stream
 streams.hdfs-clickstream.samza.system=hdfs
-
-# You need to specify the path of files you want to consume
 streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11
 
-# You can specify a white list of files you want your job to process (in Java Pattern style)
-systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
-
-# You can specify a black list of files you don't want your job to process (in Java Pattern style),
-# by default it's empty.
-# Note that you can have both white list and black list, in which case both will be applied.
-systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
 {% endhighlight %}
 
-### Security Configuration
+The above example defines a stream called `hdfs-clickstream` that reads data from the `/data/clickstream/2016/09/11` directory. 
 
-The following additional configs are required when accessing HDFS clusters that have kerberos enabled:
+#### Whitelists & Blacklists
+If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the _whitelist_ selects the files to be filtered and the _blacklist_ is later applied on its results. 
 
 {% highlight jproperties %}
-# When the job is running in a secure environment, use the SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos delegation tokens
-job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
-
-# Kerberos principal
-yarn.kerberos.principal=your-principal-name
-
-# Path of the keytab file (local path)
-yarn.kerberos.keytab=/tmp/keytab
-{% endhighlight %}
-
-### Advanced Configuration
-
-Some of the advanced configuration you might need to set up:
-
-{% highlight jproperties %}
-# Specify the group pattern for advanced partitioning.
-systems.hdfs-clickstream.partitioner.defaultPartitioner.groupPattern=part-[id]-.*
-
-# Specify the type of files your job want to process (support avro only for now)
-systems.hdfs-clickstream.consumer.reader=avro
-
-# Max number of retries (per-partition) before the container fails.
-system.hdfs-clickstream.consumer.numMaxRetries=10
+systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
+systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
 {% endhighlight %}
 
-The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro] that you want to organize into three partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a “group identifier”, you can then set this property to be “part-[id]-.” (note that "[id]" is a reserved term here, i.e. you have to literally put it as [id]). The partitioner will apply this pattern to all file names and extract the “group identifier” (“[id]” in the pattern), then use the “group identifier” to group files into partitions.
 
 ## Producing to HDFS
 
-The samza-hdfs module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use HdfsSystemProducer, and two HdfsWriters: One that writes messages of raw bytes to a SequenceFile of BytesWritable keys and values. Another writes out Avro data files including the schema automatically reflected from the POJO objects fed to it.
+#### Output format
 
-### Configuring an HdfsSystemProducer
-
-You can configure an HdfsSystemProducer like any other Samza system: using configuration keys and values set in a job.properties file. You might configure the system producer for use by your StreamTasks like this:
+Samza allows writing your output results to HDFS in AVRO format. You can either use avro's GenericRecords or have Samza automatically infer the schema for your object using reflection. 
 
 {% highlight jproperties %}
 # set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs'
 systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+{% endhighlight %}
+
 
-# Assign the implementation class for this system's HdfsWriter
+If your output is non-avro, you can describe its format by implementing your own serializer.
+{% highlight jproperties %}
 systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
-#systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
-# define a serializer/deserializer for the hdfs system
-# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema
-systems.hdfs.samza.msg.serde=some-serde-impl
+serializers.registry.my-serde-name.class=MySerdeFactory
+systems.hdfs.samza.msg.serde=my-serde-name
+{% endhighlight %}
 
-# Assign a serde implementation to be used for the stream called "metrics"
-systems.hdfs.streams.metrics.samza.msg.serde=some-metrics-impl
 
-# Set compression type supported by chosen Writer.
-# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none
-systems.hdfs.producer.hdfs.compression.type=snappy
+#### Output directory structure
 
-# The base dir for HDFS output. Output is structured into buckets. The default Bucketer for SequenceFile HdfsWriters
-# is currently /BASE/JOB_NAME/DATE_PATH/FILES, where BASE is set below
+Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter. 
+{% highlight jproperties %}
 systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
-
-# Assign the implementation class for the HdfsWriter's Bucketer
-systems.hdfs.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
-
-# Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run.
 systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
+{% endhighlight %}
 
-# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file.
-# A new file will be cut and output continued on the next write call each time this many bytes
-# (records for AvroDataFileHdfsWriter) have been written.
+You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file.
+
+{% highlight jproperties %}
 systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728
-#systems.hdfs.producer.hdfs.write.batch.size.records=10000
+systems.hdfs.producer.hdfs.write.batch.size.records=10000
 {% endhighlight %}
 
-The above configuration assumes a Metrics and Serde implementation has been properly configured against the some-metrics-impl and some-serde-impl and labels somewhere else in the same job.properties file. Each of these properties has a reasonable default, so you can leave out the ones you don’t need to customize for your job run.
\ No newline at end of file
+## Security 
+
+You can access Kerberos-enabled HDFS clusters by providing your principal and the path to your key-tab file. Samza takes care of automatically creating and renewing your Kerberos tokens periodically. 
+
+{% highlight jproperties %}
+job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
+
+# Kerberos principal
+yarn.kerberos.principal=your-principal-name
+
+# Path of the keytab file (local path)
+yarn.kerberos.keytab=/tmp/keytab
+{% endhighlight %}


[9/9] samza git commit: Updated API documentation for high and low level APIs.

Posted by ja...@apache.org.
Updated API documentation for high and low level APIs.

vjagadish1989 nickpan47 Please take a look.

Author: Prateek Maheshwari <pm...@apache.org>

Reviewers: Jagadish<ja...@apache.org>

Closes #802 from prateekm/api-docs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d034bbef
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d034bbef
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d034bbef

Branch: refs/heads/1.0.0
Commit: d034bbef8532fed46cff9f9ee63ad83bfadbfb9a
Parents: 3e39702
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Tue Nov 13 18:17:17 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:34:00 2018 -0800

----------------------------------------------------------------------
 .../versioned/api/high-level-api.md             | 411 ++++++++++---------
 .../versioned/api/low-level-api.md              | 351 ++++++++--------
 .../versioned/api/programming-model.md          | 118 +++---
 .../documentation/versioned/api/samza-sql.md    |   2 +-
 .../documentation/versioned/api/table-api.md    |  11 +-
 .../versioned/connectors/eventhubs.md           |   2 +-
 .../documentation/versioned/connectors/kafka.md |   6 +-
 .../versioned/core-concepts/core-concepts.md    |  14 +-
 .../documentation/versioned/hadoop/overview.md  |   2 +-
 docs/learn/documentation/versioned/index.html   |  16 +-
 .../versioned/jobs/samza-configurations.md      |   2 +-
 .../versioned/operations/monitoring.md          |  10 +-
 .../versioned/hello-samza-high-level-code.md    |   2 +-
 .../versioned/hello-samza-high-level-yarn.md    |   4 +-
 .../versioned/samza-event-hubs-standalone.md    |   2 +-
 docs/learn/tutorials/versioned/samza-sql.md     |   2 +-
 docs/startup/code-examples/versioned/index.md   |   2 +-
 .../org/apache/samza/task/ClosableTask.java     |   7 +
 18 files changed, 483 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/high-level-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/high-level-api.md b/docs/learn/documentation/versioned/api/high-level-api.md
index 9c13f24..43fd727 100644
--- a/docs/learn/documentation/versioned/api/high-level-api.md
+++ b/docs/learn/documentation/versioned/api/high-level-api.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: High-level API
+title: High Level Streams API
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,155 +19,131 @@ title: High-level API
    limitations under the License.
 -->
 
-# Introduction
+### Table Of Contents
+- [Introduction](#introduction)
+- [Code Examples](#code-examples)
+- [Key Concepts](#key-concepts)
+  - [StreamApplication](#streamapplication)
+  - [MessageStream](#messagestream)
+  - [Table](#table)
+- [Operators](#operators)
+  - [Map](#map)
+  - [FlatMap](#flatmap)
+  - [Filter](#filter)
+  - [PartitionBy](#partitionby)
+  - [Merge](#merge)
+  - [Broadcast](#broadcast)
+  - [SendTo (Stream)](#sendto-stream)
+  - [SendTo (Table)](#sendto-table)
+  - [Sink](#sink)
+  - [Join (Stream-Stream)](#join-stream-stream)
+  - [Join (Stream-Table)](#join-stream-table)
+  - [Window](#window)
+      - [Windowing Concepts](#windowing-concepts)
+      - [Window Types](#window-types)
+- [Operator IDs](#operator-ids)
+- [Data Serialization](#data-serialization)
+- [Application Serialization](#application-serialization)
 
-The high level API provides the libraries to define your application logic. The StreamApplication is the central abstraction which your application must implement. You start by declaring your inputs as instances of MessageStream. Then you can apply operators on each MessageStream like map, filter, window, and join to define the whole end-to-end data processing in a single program.
+### Introduction
 
-Since the 0.13.0 release, Samza provides a new high level API that simplifies your applications. This API supports operations like re-partitioning, windowing, and joining on streams. You can now express your application logic concisely in few lines of code and accomplish what previously required multiple jobs.
-# Code Examples
+Samza's flexible High Level Streams API lets you describe your complex stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of operations on [MessageStream](javadocs/org/apache/samza/operators/MessageStream). It provides a rich set of built-in operators that simplify common stream processing operations such as filtering, projection, repartitioning, stream-stream and stream-table joins, and windowing. 
 
-Check out some examples to see the high-level API in action.
-1. PageView AdClick Joiner demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to analyze which pages get the most ad clicks.
-2. PageView Repartitioner illustrates re-partitioning the incoming stream of PageViews.
-3. PageView Sessionizer groups the incoming stream of events into sessions based on user activity.
-4. PageView by Region counts the number of views per-region over tumbling time intervals.
+### Code Examples
 
-# Key Concepts
-## StreamApplication
-When writing your stream processing application using the Samza high-level API, you implement a StreamApplication and define your processing logic in the describe method.
+[The Samza Cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook) contains various recipes using the Samza High Level Streams API. These include:
 
-{% highlight java %}
-
-    public void describe(StreamApplicationDescriptor appDesc) { … }
-
-{% endhighlight %}
-
-For example, here is a StreamApplication that validates and decorates page views with viewer’s profile information.
-
-{% highlight java %}
-
-    public class BadPageViewFilter implements StreamApplication {
-      @Override
-      public void describe(StreamApplicationDescriptor appDesc) {
-        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
-        InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
-        OutputDescriptor<DecoratedPageView> outputPageViews = kafka.getOutputDescriptor( “decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));    
-        MessageStream<PageView> pageViews = appDesc.getInputStream(pageViewInput);
-        pageViews.filter(this::isValidPageView)
-            .map(this::addProfileInformation)
-            .sendTo(appDesc.getOutputStream(outputPageViews));
-      }
-    }
-    
-{% endhighlight %}
+- The [Filter example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) demonstrates how to perform stateless operations on a stream. 
 
-## MessageStream
-A MessageStream, as the name implies, represents a stream of messages. A StreamApplication is described as a series of transformations on MessageStreams. You can get a MessageStream in two ways:
-1. Using StreamApplicationDescriptor.getInputStream to get the MessageStream for a given input stream (e.g., a Kafka topic).
-2. By transforming an existing MessageStream using operations like map, filter, window, join etc.
-## Table
-A Table represents a dataset that can be accessed by keys, and is one of the building blocks of the Samza high level API; the main motivation behind it is to support stream-table joins. The current K/V store is leveraged to provide backing store for local tables. More variations such as direct access and composite tables will be supported in the future. The usage of a table typically follows three steps:
-1. Create a table
-2. Populate the table using the sendTo() operator
-3. Join a stream with the table using the join() operator
+- The [Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java]) demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks
 
-{% highlight java %}
+- The [Stream-Table Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java) demonstrates how to use the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.
 
-    final StreamApplication app = (streamAppDesc) -> {
-      Table<KV<Integer, Profile>> table = streamAppDesc.getTable(new InMemoryTableDescriptor("t1")
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      ...
-    };
+- The [SessionWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java) and [TumblingWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java) examples illustrate Samza's rich windowing and triggering capabilities.
 
-{% endhighlight %}
-
-Example above creates a TableDescriptor object, which contains all information about a table. The currently supported table types are InMemoryTableDescriptor and RocksDbTableDescriptor. Notice the type of records in a table is KV, and Serdes for both key and value of records needs to be defined (line 4). Additional parameters can be added based on individual table types.
+### Key Concepts
+#### StreamApplication
+A [StreamApplication](javadocs/org/apache/samza/application/StreamApplication) describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza's High Level Streams API.
 
-More details about step 2 and 3 can be found at [operator section](#operators).
+A typical StreamApplication implementation consists of the following stages:
 
-# Anatomy of a typical StreamApplication
-There are 3 simple steps to write your stream processing logic using the Samza high-level API.
-## Step 1: Obtain the input streams
-You can obtain the MessageStream for your input stream ID (“page-views”) using StreamApplicationDescriptor.getInputStream.
+ 1. Configuring the inputs, outputs and state (tables) using the appropriate [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor)s, [InputDescriptor](javadocs/org/apache/samza/descriptors/InputDescriptor)s, [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor)s and [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor)s.
+ 2. Obtaining the corresponding [MessageStream](javadocs/org/apache/samza/operators/MessageStream)s, [OutputStream](javadocs/org/apache/samza/operators/OutputStream)s and [Table](javadocs/org/apache/samza/table/Table)s from the provided [StreamApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor)
+ 3. Defining the processing logic using operators and functions on the streams and tables thus obtained.
 
+The following example StreamApplication removes page views older than 1 hour from the input stream:
+ 
 {% highlight java %}
-
-    KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka")
-        .withConsumerZkConnect(ImmutableList.of("localhost:2181"))
-        .withProducerBootstrapServers(ImmutableList.of("localhost:9092"));
-
-    KafkaInputDescriptor<KV<String, Integer>> pageViewInput =
-        sd.getInputDescriptor("page-views", KVSerde.of(new StringSerde(), new JsonSerdeV2(PageView.class)));
-    
-    MessageStream<PageView> pageViews = streamAppDesc.getInputStream(pageViewInput);
-
+   
+    public class PageViewFilter implements StreamApplication {
+      public void describe(StreamApplicationDescriptor appDescriptor) {
+        // Step 1: configure the inputs and outputs using descriptors
+        KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka")
+            .withConsumerZkConnect(ImmutableList.of("..."))
+            .withProducerBootstrapServers(ImmutableList.of("...", "..."));
+        KafkaInputDescriptor<PageViewEvent> kid = 
+            ksd.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+        KafkaOutputDescriptor<PageViewEvent>> kod = 
+            ksd.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
+  
+        // Step 2: obtain the message strems and output streams 
+        MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(kid);
+        OutputStream<PageViewEvent> recentPageViewEvents = appDescriptor.getOutputStream(kod);
+  
+        // Step 3: define the processing logic
+        pageViewEvents
+            .filter(m -> m.getCreationTime() > 
+                System.currentTimeMillis() - Duration.ofHours(1).toMillis())
+            .sendTo(recentPageViewEvents);
+      }
+    }
+  
 {% endhighlight %}
 
-The parameter {% highlight java %}pageViewInput{% endhighlight %} is the [InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html). Each InputDescriptor includes the full information of an input stream, including the stream ID, the serde to deserialize the input messages, and the system. By default, Samza uses the stream ID as the physical stream name and accesses the stream on the default system which is specified with the property “job.default.system”. However, the physical name and system properties can be overridden in configuration. For example, the following configuration defines the stream ID “page-views” as an alias for the PageViewEvent topic in a local Kafka cluster.
 
-{% highlight jproperties %}
+#### MessageStream
+A [MessageStream](javadocs/org/apache/samza/operators/MessageStream), as the name implies, represents a stream of messages. A StreamApplication is described as a Directed Acyclic Graph (DAG) of transformations on MessageStreams. You can get a MessageStream in two ways:
 
-    streams.page-views.samza.physical.name=PageViewEvent
+1. Calling StreamApplicationDescriptor#getInputStream() with an [InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor) obtained from a [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor).
+2. By transforming an existing MessageStream using operators like map, filter, window, join etc.
 
-{% endhighlight %}
+#### Table
+A [Table](javadocs/org/apache/samza/table/Table) is an abstraction for data sources that support random access by key. It is an evolution of the older [KeyValueStore](javadocs/org/apache/samza/storage/kv/KeyValueStore) API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a [ReadableTable](javadocs/org/apache/samza/table/ReadableTable) or a [ReadWriteTable](javadocs/org/apache/samza/table/ReadWriteTable).
+ 
+In the High Level Streams API, you can obtain and use a Table as follows:
 
-## Step 2: Define your transformation logic
-You are now ready to define your StreamApplication logic as a series of transformations on MessageStreams.
+1. Use the appropriate TableDescriptor to specify the table properties.
+2. Register the TableDescriptor with the StreamApplicationDescriptor. This returns a Table reference, which can be used for populate the table using the [Send To Table](#sendto-table) operator, or for joining a stream with the table using the [Stream-Table Join](#join-stream-table) operator.
+3. Alternatively, you can obtain a Table reference within an operator's [InitableFunction](javadocs/org/apache/samza/operators/functions/InitableFunction) using the provided [TaskContext](javadocs/org/apache/samza/context/TaskContext).
 
-{% highlight java %}
-
-    MessageStream<DecoratedPageViews> decoratedPageViews = pageViews.filter(this::isValidPageView)
-        .map(this::addProfileInformation);
-
-{% endhighlight %}
+### Operators
+The High Level Streams API provides common operations like map, flatmap, filter, merge, broadcast, joins, and windows on MessageStreams. Most of these operators accept their corresponding Functions as an argument. 
 
-## Step 3: Write to output streams
-
-Finally, you can create an OutputStream using StreamApplicationDescriptor.getOutputStream and send the transformed messages through it.
+#### Map
+Applies the provided 1:1 [MapFunction](javadocs/org/apache/samza/operators/functions/MapFunction) to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).
 
 {% highlight java %}
 
-    KafkaOutputDescriptor<DecoratedPageViews> outputPageViews =
-        sd.getInputDescriptor("page-views", new JsonSerdeV2(DecoratedPageViews.class));
-  
-    // Send messages with userId as the key to “decorated-page-views”.
-    decoratedPageViews.sendTo(streamAppDesc.getOutputStream(outputPageViews));
-
-{% endhighlight %}
-
-The parameter {% highlight java %}outputPageViews{% endhighlight %} is the [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html), which includes the stream ID, the serde to serialize the outgoing messages, the physical name and the system. Similarly, the properties for this stream can be overridden just like the stream IDs for input streams. For example:
-
-{% highlight jproperties %}
-
-    streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
-
-{% endhighlight %}
-
-# Operators
-The high level API supports common operators like map, flatmap, filter, merge, joins, and windowing on streams. Most of these operators accept corresponding Functions, which are Initable and Closable.
-## Map
-Applies the provided 1:1 MapFunction to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).
-
-{% highlight java %}
-    
     MessageStream<Integer> numbers = ...
     MessageStream<Integer> tripled = numbers.map(m -> m * 3);
     MessageStream<String> stringified = numbers.map(m -> String.valueOf(m));
 
 {% endhighlight %}
-## FlatMap
-Applies the provided 1:n FlatMapFunction to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.
+
+#### FlatMap
+Applies the provided 1:n [FlatMapFunction](javadocs/org/apache/samza/operators/functions/FlatMapFunction) to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.
 
 {% highlight java %}
     
     MessageStream<String> sentence = ...
-    // Parse the sentence into its individual words splitting by space
+    // Parse the sentence into its individual words splitting on space
     MessageStream<String> words = sentence.flatMap(sentence ->
         Arrays.asList(sentence.split(“ ”))
 
 {% endhighlight %}
-## Filter
-Applies the provided FilterFunction to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
+
+#### Filter
+Applies the provided [FilterFunction](javadocs/org/apache/samza/operators/functions/FilterFunction) to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
 
 {% highlight java %}
     
@@ -178,91 +154,109 @@ Applies the provided FilterFunction to the MessageStream and returns the filtere
     MessageStream<String> shortWords = words.filter(word -> word.size() < 3);
     
 {% endhighlight %}
-## PartitionBy
+
+#### PartitionBy
 Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.
+
 {% highlight java %}
     
     MessageStream<PageView> pageViews = ...
-    // Repartition pageView by userId.
-    MessageStream<KV<String, PageView>> partitionedPageViews = pageViews.partitionBy(
-        pageView -> pageView.getUserId(), // key extractor
-        pageView -> pageView, // value extractor
-        KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)), // serdes
-        "partitioned-page-views"); // operator ID    
+    
+    // Repartition PageViews by userId.
+    MessageStream<KV<String, PageView>> partitionedPageViews = 
+        pageViews.partitionBy(
+            pageView -> pageView.getUserId(), // key extractor
+            pageView -> pageView, // value extractor
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)), // serdes
+            "partitioned-page-views"); // operator ID    
         
 {% endhighlight %}
 
-The operator ID should be unique for an operator within the application and is used to identify the streams and stores created by the operator.
+The operator ID should be unique for each operator within the application and is used to identify the streams and stores created by the operator.
 
-## Merge
+#### Merge
 Merges the MessageStream with all the provided MessageStreams and returns the merged stream.
 {% highlight java %}
     
-    MessageStream<ServiceCall> serviceCall1 = ...
-    MessageStream<ServiceCall> serviceCall2 = ...
-    // Merge individual “ServiceCall” streams and create a new merged MessageStream
-    MessageStream<ServiceCall> serviceCallMerged = serviceCall1.merge(serviceCall2);
+    MessageStream<LogEvent> log1 = ...
+    MessageStream<LogEvent> log2 = ...
+    
+    // Merge individual “LogEvent” streams and create a new merged MessageStream
+    MessageStream<LogEvent> mergedLogs = log1.merge(log2);
+    
+    // Alternatively, use mergeAll to merge multiple streams
+    MessageStream<LogEvent> mergedLogs = MessageStream.mergeAll(ImmutableList.of(log1, log2, ...));
     
 {% endhighlight %}
 
-The merge transform preserves the order of each MessageStream, so if message {% highlight java %}m1{% endhighlight %} appears before {% highlight java %}m2{% endhighlight %} in any provided stream, then, {% highlight java %}m1{% endhighlight %} also appears before {% highlight java %}m2{% endhighlight %} in the merged stream.
-As an alternative to the merge instance method, you also can use the [MessageStream#mergeAll](javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-) static method to merge MessageStreams without operating on an initial stream.
+The merge transform preserves the order of messages within each MessageStream. If message <code>m1</code> appears before <code>m2</code> in any provided stream, then, <code>m1</code> will also appears before <code>m2</code> in the merged stream.
+
+#### Broadcast
+Broadcasts the contents of the MessageStream to every *instance* of downstream operators via an intermediate stream.
 
-## Broadcast
-Broadcasts the MessageStream to all instances of down-stream transformation operators via the intermediate stream.
 {% highlight java %}
 
-    MessageStream<VersionChange> verChanges = ...
-    // Broadcast input data version change event to all operator instances.
-    MessageStream<VersionChange> broadcastVersionChanges = 
-        verChanges.broadcast(new JsonSerdeV2<>(VersionChange.class), // serde
-                             "broadcast-version-changes"); // operator ID
+    MessageStream<VersionChange> versionChanges = ...
+    
+    // Broadcast version change event to all downstream operator instances.
+    versionChanges
+        .broadcast(
+            new JsonSerdeV2<>(VersionChange.class), // serde
+            "version-change-broadcast"); // operator ID
+        .map(vce -> /* act on version change event in each instance */ );
+         
 {% endhighlight %}
 
-## SendTo(Stream)
-Sends all messages from this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing message.
+#### SendTo (Stream)
+Sends all messages in this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing messages.
 
 {% highlight java %}
     
-    // Output a new message with userId as the key and region as the value to the “user-region” stream.
-    OutputDescriptor<KV<String, String>> outputRegions = 
-        kafka.getOutputDescriptor(“user-region”, KVSerde.of(new StringSerde(), new StringSerde());
+    // Obtain the OutputStream using an OutputDescriptor
+    KafkaOutputDescriptor<KV<String, String>> kod = 
+        ksd.getOutputDescriptor(“user-country”, KVSerde.of(new StringSerde(), new StringSerde());
+    OutputStream<KV<String, String>> userCountries = appDescriptor.getOutputStream(od)
+    
     MessageStream<PageView> pageViews = ...
-    MessageStream<KV<String, PageView>> keyedPageViews = pageViews.map(KV.of(pageView.getUserId(), pageView.getRegion()));
-    keyedPageViews.sendTo(appDesc.getOutputStream(outputRegions));
+    // Send a new message with userId as the key and their country as the value to the “user-country” stream.
+    pageViews
+      .map(pageView -> KV.of(pageView.getUserId(), pageView.getCountry()));
+      .sendTo(userCountries);
 
 {% endhighlight %}
-## SendTo(Table)
-Sends all messages from this MessageStream to the provided table, the expected message type is KV.
+
+#### SendTo (Table)
+Sends all messages in this MessageStream to the provided Table. The expected message type is [KV](javadocs/org/apache/samza/operators/KV).
 
 {% highlight java %}
+
+    MessageStream<Profile> profilesStream = ...
+    Table<KV<Long, Profile>> profilesTable = 
     
-    // Write a new message with memberId as the key and profile as the value to a table.
-    appDesc.getInputStream(kafka.getInputDescriptor("Profile", new NoOpSerde<Profile>()))
-        .map(m -> KV.of(m.getMemberId(), m))
-        .sendTo(table);
+    profilesStream
+        .map(profile -> KV.of(profile.getMemberId(), profile))
+        .sendTo(profilesTable);
         
 {% endhighlight %}
 
-## Sink
+#### Sink
 Allows sending messages from this MessageStream to an output system using the provided [SinkFunction](javadocs/org/apache/samza/operators/functions/SinkFunction.html).
 
-This offers more control than {% highlight java %}sendTo{% endhighlight %} since the SinkFunction has access to the MessageCollector and the TaskCoordinator. For instance, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. remote databases, REST services, etc.)
+This offers more control than [SendTo (Stream)](#sendto-stream) since the SinkFunction has access to the MessageCollector and the TaskCoordinator. For example, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. a remote databases, REST services, etc.)
 
 {% highlight java %}
     
-    // Repartition pageView by userId.
     MessageStream<PageView> pageViews = ...
-    pageViews.sink( (msg, collector, coordinator) -> {
+    
+    pageViews.sink((msg, collector, coordinator) -> {
         // Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.
-        collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”,
-                       “TransformedPageViewEvent”), msg));
+        collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”, “TransformedPageViewEvent”), msg));
     } );
         
 {% endhighlight %}
 
-## Join(Stream-Stream)
-The stream-stream Join operator joins messages from two MessageStreams using the provided pairwise [JoinFunction](javadocs/org/apache/samza/operators/functions/JoinFunction.html). Messages are joined when the keys extracted from messages from the first stream match keys extracted from messages in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found.
+#### Join (Stream-Stream)
+The Stream-Stream Join operator joins messages from two MessageStreams using the provided pairwise [JoinFunction](javadocs/org/apache/samza/operators/functions/JoinFunction.html). Messages are joined when the key extracted from a message from the first stream matches the key extracted from a message in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found. Join only retains the latest message for each input stream.
 
 {% highlight java %}
     
@@ -271,7 +265,9 @@ The stream-stream Join operator joins messages from two MessageStreams using the
     MessageStream<OrderRecord> orders = …
     MessageStream<ShipmentRecord> shipments = …
 
-    MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(),
+    MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(
+        shipments, // other stream
+        new OrderShipmentJoiner(), // join function
         new StringSerde(), // serde for the join key
         new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), // serde for both streams
         Duration.ofMinutes(20), // join TTL
@@ -297,18 +293,18 @@ The stream-stream Join operator joins messages from two MessageStreams using the
     
 {% endhighlight %}
 
-## Join(Stream-Table)
-The stream-table Join operator joins messages from a MessageStream using the provided [StreamTableJoinFunction](javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html). Messages from the input stream are joined with record in table using key extracted from input messages. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided; the join function can choose to return null (inner join) or an output message (left outer join). For join to function properly, it is important to ensure the input stream and table are partitioned using the same key as this impacts the physical placement of data.
+#### Join (Stream-Table)
+The Stream-Table Join operator joins messages from a MessageStream with messages in a Table using the provided [StreamTableJoinFunction](javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html). Messages are joined when the key extracted from a message in the stream matches the key for a record in the table. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided. The join function can choose to return null for an inner join, or an output message for a left outer join. For join correctness, it is important to ensure the input stream and table are partitioned using the same key (e.g., using the partitionBy operator) as this impacts the physical placement of data.
 
 {% highlight java %}
-   
-    streamAppDesc.getInputStream(kafk.getInputDescriptor("PageView", new NoOpSerde<PageView>()))
-        .partitionBy(PageView::getMemberId, v -> v, "p1")
-        .join(table, new PageViewToProfileJoinFunction())
+
+    pageViews
+        .partitionBy(pv -> pv.getMemberId, pv -> pv, "page-views-by-memberid")
+        .join(profiles, new PageViewToProfileTableJoiner())
         ...
     
-    public class PageViewToProfileJoinFunction implements StreamTableJoinFunction
-        <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
+    public class PageViewToProfileTableJoiner implements 
+        StreamTableJoinFunction<Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
       
       @Override
       public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) {
@@ -325,10 +321,11 @@ The stream-table Join operator joins messages from a MessageStream using the pro
         return record.getKey();
       }
     }
+    
 {% endhighlight %}
 
-## Window
-### Windowing Concepts
+### Window
+#### Windowing Concepts
 **Windows, Triggers, and WindowPanes**: The window operator groups incoming messages in the MessageStream into finite windows. Each emitted result contains one or more messages in the window and is called a WindowPane.
 
 A window can have one or more associated triggers which determine when results from the window are emitted. Triggers can be either [early triggers](javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-) that allow emitting results speculatively before all data for the window has arrived, or late triggers that allow handling late messages for the window.
@@ -341,8 +338,8 @@ A discarding window clears all state for the window at every emission. Each emis
 
 An accumulating window retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.
 
-### Window Types
-The Samza high-level API currently supports tumbling and session windows.
+#### Window Types
+The Samza High Level Streams API currently supports tumbling and session windows.
 
 **Tumbling Window**: A tumbling window defines a series of contiguous, fixed size time intervals in the stream.
 
@@ -350,20 +347,26 @@ Examples:
 
 {% highlight java %}
     
-    // Group the pageView stream into 3 second tumbling windows keyed by the userId.
+    // Group the pageView stream into 30 second tumbling windows keyed by the userId.
     MessageStream<PageView> pageViews = ...
-    MessageStream<WindowPane<String, Collection<PageView>>> =
-        pageViews.window(
-            Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), // key extractor
+    MessageStream<WindowPane<String, Collection<PageView>>> = pageViews.window(
+        Windows.keyedTumblingWindow(
+            pageView -> pageView.getUserId(), // key extractor
             Duration.ofSeconds(30), // window duration
             new StringSerde(), new JsonSerdeV2<>(PageView.class)));
 
-    // Compute the maximum value over tumbling windows of 3 seconds.
+    // Compute the maximum value over tumbling windows of 30 seconds.
     MessageStream<Integer> integers = …
     Supplier<Integer> initialValue = () -> Integer.MIN_VALUE;
-    FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue);
-    MessageStream<WindowPane<Void, Integer>> windowedStream =
-        integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction, new IntegerSerde()));
+    FoldLeftFunction<Integer, Integer> aggregateFunction = 
+        (msg, oldValue) -> Math.max(msg, oldValue);
+    
+    MessageStream<WindowPane<Void, Integer>> windowedStream = integers.window(
+       Windows.tumblingWindow(
+            Duration.ofSeconds(30), 
+            initialValue, 
+            aggregateFunction, 
+            new IntegerSerde()));
    
 {% endhighlight %}
 
@@ -378,39 +381,53 @@ Examples:
     Supplier<Integer> initialValue = () -> 0
     FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> oldCount + 1;
     Duration sessionGap = Duration.ofMinutes(3);
-    MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(Windows.keyedSessionWindow(
-        pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator,
+    
+    MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(
+        Windows.keyedSessionWindow(
+            pageView -> pageView.getUserId(), 
+            sessionGap, 
+            initialValue, 
+            countAggregator,
             new StringSerde(), new IntegerSerde()));
 
-    // Compute the maximum value over tumbling windows of 3 seconds.
+    // Compute the maximum value over tumbling windows of 30 seconds.
     MessageStream<Integer> integers = …
     Supplier<Integer> initialValue = () -> Integer.MAX_INT
-
     FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
-    MessageStream<WindowPane<Void, Integer>> windowedStream =
-       integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction,
-           new IntegerSerde()));
+    
+    MessageStream<WindowPane<Void, Integer>> windowedStream = integers.window(
+        Windows.tumblingWindow(
+            Duration.ofSeconds(30), 
+            initialValue, 
+            aggregateFunction,
+            new IntegerSerde()));
          
 {% endhighlight %}
 
-# Operator IDs
-Each operator in your application is associated with a globally unique identifier. By default, each operator is assigned an ID based on its usage in the application. Some operators that create and use external resources (e.g., intermediate streams for partitionBy and broadcast, stores and changelogs for joins and windows, etc.) require you to provide an explicit ID for them. It's highly recommended to provide meaningful IDs for such operators. These IDs help you control the underlying resources when you make changes to the application logic that change the position of the operator within the DAG, and
+### Operator IDs
+Each operator in the StreamApplication is associated with a globally unique identifier. By default, each operator is assigned an ID by the framework based on its position in the operator DAG for the application. Some operators that create and use external resources require you to provide an explicit ID for them. Examples of such operators are partitionBy and broadcast with their intermediate streams, and window and join with their local stores and changelogs. It's strongly recommended to provide meaningful IDs for such operators. 
+
+These IDs help you manage the underlying resources when you make changes to the application logic that change the position of the operator within the DAG and:
+
 1. You wish to retain the previous state for the operator, since the changes to the DAG don't affect the operator semantics. For example, you added a map operator before a partitionBy operator to log the incoming message. In this case, you can retain previous the operator ID.
+
 2. You wish to discard the previous state for the operator, since the changes to the DAG change the operator semantics. For example, you added a filter operator before a partitionBy operator that discards some of the messages. In this case, you should change the operator ID. Note that by doing so you will lose any previously checkpointed messages that haven't been completely processed by the downstream operators yet.
 
 An operator ID is of the format: **jobName-jobId-opCode-opId**
-- **jobName** is the name of your job, as specified using the configuration "job.name"
-- **jobId** is the name of your job, as specified using the configuration "job.id"
-- **opCode** is an identifier for the type of the operator, e.g. map/filter/join
+
+- **jobName** is the name of your application, as specified using the configuration "app.name"
+- **jobId** is the id of your application, as specified using the configuration "app.id"
+- **opCode** is a pre-defined identifier for the type of the operator, e.g. map/filter/join
 - **opId** is either auto-generated by the framework based on the position of the operator within the DAG, or can be provided by you for operators that manage external resources.
 
-# Application Serialization
-Samza relies on Java Serialization to distribute your application logic to the processors. For this to work, all of your custom application logic needs to be Serializable. For example, all the Function interfaces implement Serializable, and your implementations need to be serializable as well. It's recommended to use the Context APIs to set up any non-serializable context that your Application needs at Runtime.
+### Data Serialization
+Producing data to and consuming data from streams and tables require serializing and de-serializing it. In addition, some stateful operators like joins and windows store data locally for durability across restarts. Such operations require you to provide a [Serde](javadocs/org/apache/samza/serializers/Serde) implementation when using them. This also helps Samza infer the type of the data in your application, thus allowing the operator transforms to be checked for type safety at compile time. Samza provides the following Serde implementations that you can use out of the box:
 
-# Data Serialization
-Producing and consuming from streams and tables require serializing and deserializing data. In addition, some operators like joins and windows store data in a local store for durability across restarts. Such operations require you to provide a Serde implementation when using them. This also helps Samza infer the type of the data in your application, thus allowing the operator transforms to be type safe. Samza provides the following Serde implementations that you can use out of the box:
+- Common Types: Serdes for common Java data types, such as ByteBuffer, Double, Long, Integer, Byte, String.
+- [SerializableSerde](javadocs/org/apache/samza/serializers/SerializableSerde): A Serde for Java classes that implement the java.io.Serializable interface. 
+- [JsonSerdeV2](javadocs/org/apache/samza/serializers/JsonSerdeV2): a Jackson based type safe JSON Serde that allows serializing from and deserializing to a POJO.
+- [KVSerde](javadocs/org/apache/samza/serializers/KVSerde): A pair of Serdes, first for the keys, and the second for the values in the incoming/outgoing message, a table record, or a [KV](javadocs/org/apache/samza/operators/KV) object.
+- [NoOpSerde](javadocs/org/apache/samza/serializers/NoOpSerde): A marker serde that indicates that the framework should not attempt any serialization/deserialization of the data. This is useful in some cases where the SystemProducer or SystemConsumer handles serialization and deserialization of the data itself.
 
-- KVSerde: A pair of Serdes, first for the keys, and the second for the values in the incoming/outgoing message or a table record.
-- NoOpSerde: A serde implementation that indicates that the framework should not attempt any serialization/deserialization of the data. Useful in some cases when the SystemProducer/SystemConsumer handle serialization/deserialization themselves.
-- JsonSerdeV2: a type-specific Json serde that allows directly deserializing the Json bytes into to specific POJO type.
-- Serdes for primitive types: serdes for primitive types, such as ByteBuffer, Double, Long, Integer, Byte, String, etc.
+### Application Serialization
+Samza uses Java Serialization to distribute an application's processing logic to the processors. For this to work, all application logic, including any Function implementations passed to operators, needs to be serializable. If you need to use any non-serializable objects at runtime, you can use the [ApplicationContainerContext](javadocs/org/apache/samza/context/ApplicationContainerContext) and [ApplicationTaskContext](javadocs/org/apache/samza/context/ApplicationContainerContext) APIs to manage their lifecycle.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/low-level-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/low-level-api.md b/docs/learn/documentation/versioned/api/low-level-api.md
index e91f74e..34d2b06 100644
--- a/docs/learn/documentation/versioned/api/low-level-api.md
+++ b/docs/learn/documentation/versioned/api/low-level-api.md
@@ -19,291 +19,268 @@ title: Low level Task API
    limitations under the License.
 -->
 
+### Table Of Contents
+- [Introduction](#introduction)
+- [Code Examples](#code-examples)
+- [Key Concepts](#key-concepts)
+  - [TaskApplication](#taskapplication)
+  - [TaskFactory](#taskfactory)
+  - [Task Interfaces](#task-interfaces)
+      - [StreamTask](#streamtask)
+      - [AsyncStreamTask](#asyncstreamtask)
+      - [Additional Task Interfaces](#additional-task-interfaces)
+          - [InitableTask](#initabletask)
+          - [ClosableTask](#closabletask)
+          - [WindowableTask](#windowabletask)
+          - [EndOfStreamListenerTask](#endofstreamlistenertask) 
+- [Common Operations](#common-operations)
+  - [Receiving Messages from Input Streams](#receiving-messages-from-input-streams)
+  - [Sending Messages to Output Streams](#sending-messages-to-output-streams)
+  - [Accessing Tables](#accessing-tables)
+- [Legacy Applications](#legacy-applications)
 
-# Introduction
-Task APIs (i.e. [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html)) are bare-metal interfaces that exposes the system implementation details in Samza. When using Task APIs, you will implement your application as a [TaskApplication](javadocs/org/apache/samza/application/TaskApplication.html). The main difference between a TaskApplication and a StreamApplication is the APIs used to describe the processing logic. In TaskApplication, the processing logic is defined via StreamTask and AsyncStreamTask.
+### Introduction
+Samza's powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. When using the Low Level Task API, you implement a [TaskApplication](javadocs/org/apache/samza/application/TaskApplication). The processing logic is defined as either a [StreamTask](javadocs/org/apache/samza/task/StreamTask) or an [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask).
 
-# Key Concepts
 
-## TaskApplication
-Here is an example of a user implemented TaskApplication:
+### Code Examples
 
-{% highlight java %}
-    
-    package com.example.samza;
+The [Hello Samza](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/task/application) Wikipedia applications demonstrate how to use Samza's Low Level Task API. These applications consume various events from Wikipedia, transform them, and calculates several statistics about them.  
 
-    public class BadPageViewFilter implements TaskApplication {
-      @Override
-      public void describe(TaskApplicationDescriptor appDesc) {
-        // Add input, output streams and tables
-        KafkaSystemDescriptor<String, PageViewEvent> kafkaSystem = 
-            new KafkaSystemDescriptor(“kafka”)
-              .withConsumerZkConnect(myZkServers)
-              .withProducerBootstrapServers(myBrokers);
-        KVSerde<String, PageViewEvent> serde = 
-            KVSerde.of(new StringSerde(), new JsonSerdeV2<PageViewEvent>());
-        // Add input, output streams and tables
-        appDesc.withInputStream(kafkaSystem.getInputDescriptor(“pageViewEvent”, serde))
-            .withOutputStream(kafkaSystem.getOutputDescriptor(“goodPageViewEvent”, serde))
-            .withTable(new RocksDBTableDescriptor(
-                “badPageUrlTable”, KVSerde.of(new StringSerde(), new IntegerSerde())
-            .withTaskFactory(new BadPageViewTaskFactory());
-      }
-    }
+- The [WikipediaFeedTaskApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java) demonstrates how to consume multiple Wikipedia event streams and merge them to an Apache Kafka topic. 
 
-{% endhighlight %}
+- The [WikipediaParserTaskApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) demonstrates how to project the incoming events from the Apache Kafka topic to a custom JSON data type.
 
-In the above example, user defines the input stream, the output stream, and a RocksDB table for the application, and then provide the processing logic defined in BadPageViewTaskFactory. All descriptors (i.e. input/output streams and tables) and the [TaskFactory](javadocs/org/apache/samza/task/TaskFactory.html) are registered to the [TaskApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor.html).
+- The [WikipediaStatsTaskApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java) demonstrates how to calculate and emit periodic statistics about the incoming events while using a local KV store for durability.
 
-## TaskFactory
-You will need to implement a [TaskFactory](javadocs/org/apache/samza/task/TaskFactory.html) to create task instances to execute user defined processing logic. Correspondingly, StreamTaskFactory and AsyncStreamTaskFactory are used to create StreamTask and AsyncStreamTask respectively. The [StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory.html) for the above example is shown below:
+### Key Concepts
 
-{% highlight java %}
+#### TaskApplication
+
+A [TaskApplication](javadocs/org/apache/samza/application/TaskApplication) describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza's Low Level Task API.
+
+A typical TaskApplication implementation consists of the following stages:
 
-    package com.example.samza;
+ 1. Configuring the inputs, outputs and state (tables) using the appropriate [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor)s, [InputDescriptor](javadocs/org/apache/samza/descriptors/InputDescriptor)s, [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor)s and [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor)s.
+ 2. Adding the descriptors above to the provided [TaskApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor)
+ 3. Defining the processing logic in a [StreamTask](javadocs/org/apache/samza/task/StreamTask) or an [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask) implementation, and adding its corresponding [StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory) or [AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory) to the TaskApplicationDescriptor.
 
-    public class BadPageViewTaskFactory implements StreamTaskFactory {
+The following example TaskApplication removes page views with "bad URLs" from the input stream:
+ 
+{% highlight java %}
+    
+    public class PageViewFilter implements TaskApplication {
       @Override
-      public StreamTask createInstance() {
-        // Add input, output streams and tables
-        return new BadPageViewFilterTask();
+      public void describe(TaskApplicationDescriptor appDescriptor) {
+        // Step 1: configure the inputs and outputs using descriptors
+        KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka")
+            .withConsumerZkConnect(ImmutableList.of("..."))
+            .withProducerBootstrapServers(ImmutableList.of("...", "..."));
+        KafkaInputDescriptor<PageViewEvent> kid = 
+            ksd.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+        KafkaOutputDescriptor<PageViewEvent>> kod = 
+            ksd.getOutputDescriptor("goodPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
+        RocksDbTableDescriptor badUrls = 
+            new RocksDbTableDescriptor(“badUrls”, KVSerde.of(new StringSerde(), new IntegerSerde());
+            
+        // Step 2: Add input, output streams and tables
+        appDescriptor
+            .withInputStream(kid)
+            .withOutputStream(kod)
+            .withTable(badUrls)
+        
+        // Step 3: define the processing logic
+        appDescriptor.withTaskFactory(new PageViewFilterTaskFactory());
       }
     }
-    
+
 {% endhighlight %}
 
-Similarly, here is an example of [AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory.html):
+#### TaskFactory
+Your [TaskFactory](javadocs/org/apache/samza/task/TaskFactory) will be  used to create instances of your Task in each of Samza's processors. If you're implementing a StreamTask, you can provide a [StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory). Similarly, if you're implementing an AsyncStreamTask, you can provide an [AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory). For example:
 
 {% highlight java %}
-    
-    package com.example.samza;
 
-    public class BadPageViewAsyncTaskFactory implements AsyncStreamTaskFactory {
+    public class PageViewFilterTaskFactory implements StreamTaskFactory {
       @Override
-      public AsyncStreamTask createInstance() {
-        // Add input, output streams and tables
-        return new BadPageViewAsyncFilterTask();
+      public StreamTask createInstance() {
+        return new PageViewFilterTask();
       }
     }
+    
 {% endhighlight %}
 
-## Task classes
+#### Task Interfaces
 
-The actual processing logic is implemented in [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) classes.
+Your processing logic can be implemented in a [StreamTask](javadocs/org/apache/samza/task/StreamTask) or an [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask).
 
-### StreamTask
-You should implement [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) for synchronous process, where the message processing is complete after the process method returns. An example of StreamTask is a computation that does not involve remote calls:
+##### StreamTask
+You can implement a [StreamTask](javadocs/org/apache/samza/task/StreamTask) for synchronous message processing. Samza delivers messages to the task one at a time, and considers each message to be processed when the process method call returns. For example:
 
 {% highlight java %}
-    
-    package com.example.samza;
 
-    public class BadPageViewFilterTask implements StreamTask {
+    public class PageViewFilterTask implements StreamTask {
       @Override
-      public void process(IncomingMessageEnvelope envelope,
-                          MessageCollector collector,
-                          TaskCoordinator coordinator) {
-        // process message synchronously
+      public void process(
+          IncomingMessageEnvelope envelope, 
+          MessageCollector collector, 
+          TaskCoordinator coordinator) {
+          
+          // process the message in the envelope synchronously
       }
     }
+
 {% endhighlight %}
 
-### AsyncStreamTask
-The [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) interface, on the other hand, supports asynchronous process, where the message processing may not be complete after the processAsync method returns. Various concurrent libraries like Java NIO, ParSeq and Akka can be used here to make asynchronous calls, and the completion is marked by invoking the [TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html). Samza will continue to process next message or shut down the container based on the callback status. An example of AsyncStreamTask is a computation that make remote calls but don’t block on the call completion:
+Note that synchronous message processing does not imply sequential execution. Multiple instances of your Task class implementation may still run concurrently within a container. 
+
+##### AsyncStreamTask
+You can implement a [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask) for asynchronous message processing. This can be useful when you need to perform long running I/O operations to process a message, e.g., making an http request. For example:
 
 {% highlight java %}
-    
-    package com.example.samza;
 
-    public class BadPageViewAsyncFilterTask implements AsyncStreamTask {
+    public class AsyncPageViewFilterTask implements AsyncStreamTask {
       @Override
       public void processAsync(IncomingMessageEnvelope envelope,
-                               MessageCollector collector,
-                               TaskCoordinator coordinator,
-                               TaskCallback callback) {
-        // process message with asynchronous calls
-        // fire callback upon completion, e.g. invoking callback from asynchronous call completion thread
+          MessageCollector collector,
+          TaskCoordinator coordinator,
+          TaskCallback callback) {
+          
+          // process message asynchronously
+          // invoke callback.complete or callback.failure upon completion
       }
     }
+
 {% endhighlight %}
 
-# Runtime Objects
+Samza delivers the incoming message and a [TaskCallback](javadocs/org/apache/samza/task/TaskCallback) with the processAsync() method call, and considers each message to be processed when its corresponding callback.complete() or callback.failure() has been invoked. If callback.failure() is invoked, or neither callback.complete() or callback.failure() is invoked within <code>task.callback.ms</code> milliseconds, Samza will shut down the running Container. 
+
+If configured, Samza will keep up to <code>task.max.concurrency</code> number of messages processing asynchronously at a time within each Task Instance. Note that while message delivery (i.e., processAsync invocation) is guaranteed to be in-order within a stream partition, message processing may complete out of order when setting <code>task.max.concurrency</code> > 1. 
+
+For more details on asynchronous and concurrent processing, see the [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide).
 
-## Task Instances in Runtime
-When you run your job, Samza will create many instances of your task class (potentially on multiple machines). These task instances process the messages from the input streams.
+##### Additional Task Interfaces
 
-## Messages from Input Streams
+There are a few other interfaces you can implement in your StreamTask or AsyncStreamTask that provide additional functionality.
 
-For each message that Samza receives from the task’s input streams, the [process](javadocs/org/apache/samza/task/StreamTask.html#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-) or [processAsync](javadocs/org/apache/samza/task/AsyncStreamTask.html#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-) method is called. The [envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three things of importance: the message, the key, and the stream that the message came from.
+###### InitableTask
+You can implement the [InitableTask](javadocs/org/apache/samza/task/InitableTask) interface to access the [Context](javadocs/org/apache/samza/context/Context). Context provides access to any runtime objects you need in your task,
+whether they're provided by the framework, or your own.
 
 {% highlight java %}
     
-    /** Every message that is delivered to a StreamTask is wrapped
-     * in an IncomingMessageEnvelope, which contains metadata about
-     * the origin of the message. */
-    public class IncomingMessageEnvelope {
-      /** A deserialized message. */
-      Object getMessage() { ... }
-
-      /** A deserialized key. */
-      Object getKey() { ... }
-
-      /** The stream and partition that this message came from. */
-      SystemStreamPartition getSystemStreamPartition() { ... }
+    public interface InitableTask {
+      void init(Context context) throws Exception;
     }
+    
 {% endhighlight %}
 
-The key and value are declared as Object, and need to be cast to the correct type. The serializer/deserializer are defined via InputDescriptor, as described [here](high-level-api.md#data-serialization). A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.
-
-The [getSystemStreamPartition()](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html#getSystemStreamPartition--) method returns a [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html) object, which tells you where the message came from. It consists of three parts:
-1. The *system*: the name of the system from which the message came, as defined as SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name.
-2. The *stream name*: the name of the stream (topic, queue) within the source system. This is also defined as InputDescriptor in the TaskApplication.
-3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is normally split into several partitions, and each partition is assigned to one task instance by Samza.
-
-The API looks like this:
+###### ClosableTask
+You can implement the [ClosableTask](javadocs/org/apache/samza/task/ClosableTask) to clean up any runtime state during shutdown. This interface is deprecated. It's recommended to use the [ApplicationContainerContext](javadocs/org/apache/samza/context/ApplicationContainerContext) and [ApplicationTaskContext](javadocs/org/apache/samza/context/ApplicationContainerContext) APIs to manage the lifecycle of any runtime objects.
 
 {% highlight java %}
-    
-    /** A triple of system name, stream name and partition. */
-    public class SystemStreamPartition extends SystemStream {
 
-      /** The name of the system which provides this stream. It is
-          defined in the Samza job's configuration. */
-      public String getSystem() { ... }
-
-      /** The name of the stream/topic/queue within the system. */
-      public String getStream() { ... }
-
-      /** The partition within the stream. */
-      public Partition getPartition() { ... }
+    public interface ClosableTask {
+      void close() throws Exception;
     }
+    
 {% endhighlight %}
 
-In the example user-implemented TaskApplication above, the system name is “kafka”, the stream name is “pageViewEvent”. (The name “kafka” isn’t special — you can give your system any name you want.) If you have several input streams feeding into your StreamTask or AsyncStreamTask, you can use the SystemStreamPartition to determine what kind of message you’ve received.
-
-## Messages to Output Streams
-What about sending messages? If you take a look at the [process()](javadocs/org/apache/samza/task/StreamTask.html#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-) method in StreamTask, you’ll see that you get a [MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html). Similarly, you will get it in [processAsync()](javadocs/org/apache/samza/task/AsyncStreamTask.html#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-) method in AsyncStreamTask as well.
+###### WindowableTask
+You can implement the [WindowableTask](javadocs/org/apache/samza/task/WindowableTask) interface to implement processing logic that is invoked periodically by the framework.
 
 {% highlight java %}
     
-    /** When a task wishes to send a message, it uses this interface. */
-    public interface MessageCollector {
-      void send(OutgoingMessageEnvelope envelope);
+    public interface WindowableTask {
+      void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
     }
-    
-{% endhighlight %}
-
-To send a message, you create an [OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) object and pass it to the MessageCollector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the [javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for details.
 
-**NOTE**: Please only use the MessageCollector object within the process() or processAsync() method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.
+{% endhighlight %}
 
-For example, here’s a simple example to send out “Good PageViewEvents” in the BadPageViewFilterTask:
+###### EndOfStreamListenerTask
+You can implement the [EndOfStreamListenerTask](javadocs/org/apache/samza/task/EndOfStreamListenerTask) interface to implement processing logic that is invoked when a Task Instance has reached the end of all input SystemStreamPartitions it's consuming. This is typically relevant when running Samza as a batch job.
 
 {% highlight java %}
-    
-    public class BadPageViewFilterTask implements StreamTask {
 
-      // Send outgoing messages to a stream called "words"
-      // in the "kafka" system.
-      private final SystemStream OUTPUT_STREAM =
-        new SystemStream("kafka", "goodPageViewEvent");
-      @Override
-      public void process(IncomingMessageEnvelope envelope,
-                          MessageCollector collector,
-                          TaskCoordinator coordinator) {
-        if (isBadPageView(envelope)) {
-          // skip the message, increment the counter, do not send it
-          return;
-        }
-        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, envelope.getKey(), envelope.getValue()));
-      }
+    public interface EndOfStreamListenerTask {
+      void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
     }
     
 {% endhighlight %}
 
-## Accessing Tables
-There are many cases that you will need to lookup a table when processing an incoming message. Samza allows access to tables by a unique name through [TaskContext.getTable()](javadocs/org/apache/samza/task/TaskContext.html#getTable-java.lang.String-) method. [TaskContext](javadocs/org/apache/samza/task/TaskContext.html) is accessed via [Context.getTaskContext()](javadocs/org/apache/samza/context/Context.html#getTaskContext--) in the [InitiableTask’s init()]((javadocs/org/apache/samza/task/InitableTask.html#init-org.apache.samza.context.Context-)) method. A user code example to access a table in the above TaskApplication example is here:
+### Common Operations
 
-{% highlight java %}
-
-    public class BadPageViewFilter implements StreamTask, InitableTask {
-      private final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, “goodPageViewEvent”);
-      private ReadWriteTable<String, Integer> badPageUrlTable;
-      @Override
-      public void init(Context context) {
-        badPageUrlTable = (ReadWriteTable<String, Integer>) context.getTaskContext().getTable("badPageUrlTable");
-      }
+#### Receiving Messages from Input Streams
 
-      @Override
-      public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-        String key = (String)message.getKey();
-        if (badPageUrlTable.containsKey(key)) {
-          // skip the message, increment the counter, do not send it
-          badPageUrlTable.put(key, badPageUrlTable.get(key) + 1);
-          return;
-        }
-        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, key, message.getValue()));
-      }
-    }
+Samza calls your Task instance's [process](javadocs/org/apache/samza/task/StreamTask#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-) or [processAsync](javadocs/org/apache/samza/task/AsyncStreamTask#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-) method with each incoming message on your input streams. The [IncomingMessageEnvelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope) can be used to obtain the following information: the de-serialized key, the de-serialized message, and the [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition) that the message came from.
 
-{% endhighlight %}
+The key and message objects need to be cast to the correct type in your Task implementation based on the [Serde](javadocs/org/apache/samza/serializers/Serde.html) provided for the InputDescriptor for the input stream.
 
-For more detailed AsyncStreamTask example, follow the tutorial in [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). For more details on APIs, please refer to [Configuration](../jobs/configuration.md) and [Javadocs](javadocs).
+The [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition) object tells you where the message came from. It consists of three parts:
+1. The *system*: the name of the system the message came from, as specified for the SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name.
+2. The *stream name*: the name of the stream (e.g., topic, queue) within the input system. This is the physical name of the stream, as specified for the InputDescriptor in your TaskApplication.
+3. The [*partition*](javadocs/org/apache/samza/Partition): A stream is normally split into several partitions, and each partition is assigned to one task instance by Samza. 
 
-# Other Task Interfaces
+If you have several input streams for your TaskApplication, you can use the SystemStreamPartition to determine what kind of message you’ve received.
 
-There are other task interfaces to allow additional processing logic to be applied, besides the main per-message processing logic defined in StreamTask and AsyncStreamTask. You will need to implement those task interfaces in addition to StreamTask or AsyncStreamTask.
+#### Sending Messages to Output Streams
+To send a message to a stream, you first create an [OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope). At a minimum, you need to provide the message you want to send, and the system and stream to send it to. Optionally you can specify the partitioning key and other parameters. See the [javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope) for details.
 
-## InitiableTask
-This task interface allows users to initialize objects that are accessed within a task instance.
+You can then send the OutgoingMessageEnvelope using the [MessageCollector](javadocs/org/apache/samza/task/MessageCollector) provided with the process() or processAsync() call. You **must** use the MessageCollector delivered for the message you're currently processing. Holding on to a MessageCollector and reusing it later will cause your messages to not be sent correctly.  
 
 {% highlight java %}
     
-    public interface InitableTask {
-      void init(Context context) throws Exception;
+    /** When a task wishes to send a message, it uses this interface. */
+    public interface MessageCollector {
+      void send(OutgoingMessageEnvelope envelope);
     }
-{% endhighlight %}
-
-## WindowableTask
-This task interface allows users to define a processing logic that is invoked periodically within a task instance.
-
-{% highlight java %}
     
-    public interface WindowableTask {
-      void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
-    }
-
 {% endhighlight %}
 
-## ClosableTask
-This task interface defines the additional logic when closing a task. Usually, it is in pair with InitableTask to release system resources allocated for this task instance.
+#### Accessing Tables
 
-{% highlight java %}
+A [Table](javadocs/org/apache/samza/table/Table) is an abstraction for data sources that support random access by key. It is an evolution of the older [KeyValueStore](javadocs/org/apache/samza/storage/kv/KeyValueStore) API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a [ReadableTable](javadocs/org/apache/samza/table/ReadableTable) or a [ReadWriteTable](javadocs/org/apache/samza/table/ReadWriteTable).
+ 
+In the Low Level API, you can obtain and use a Table as follows:
 
-    public interface ClosableTask {
-      void close() throws Exception;
-    }
-    
-{% endhighlight %}
+1. Use the appropriate TableDescriptor to specify the table properties.
+2. Register the TableDescriptor with the TaskApplicationDescriptor.
+3. Obtain a Table reference within the task implementation using [TaskContext.getTable()](javadocs/org/apache/samza/task/TaskContext#getTable-java.lang.String-). [TaskContext](javadocs/org/apache/samza/task/TaskContext) is available via [Context.getTaskContext()](javadocs/org/apache/samza/context/Context#getTaskContext--), which in turn is available by implementing [InitiableTask. init()]((javadocs/org/apache/samza/task/InitableTask#init-org.apache.samza.context.Context-)).
 
-## EndOfStreamListenerTask
-This task interface defines the additional logic when a task instance has reached the end of all input SystemStreamPartitions (see Samza as a batch job).
+For example:
 
 {% highlight java %}
 
-    public interface EndOfStreamListenerTask {
-      void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
+    public class PageViewFilterTask implements StreamTask, InitableTask {
+      private final SystemStream outputStream = new SystemStream(“kafka”, “goodPageViewEvent”);
+      
+      private ReadWriteTable<String, Integer> badUrlsTable;
+      
+      @Override
+      public void init(Context context) {
+        badUrlsTable = (ReadWriteTable<String, Integer>) context.getTaskContext().getTable("badUrls");
+      }
+
+      @Override
+      public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+        String key = (String) message.getKey();
+        if (badUrlsTable.containsKey(key)) {
+          // skip the message, increment the counter, do not send it
+          badPageUrlTable.put(key, badPageUrlTable.get(key) + 1);
+        } else {
+          collector.send(new OutgoingMessageEnvelope(outputStream, key, message.getValue()));   }
+      }
     }
-    
+
 {% endhighlight %}
 
-# Legacy Task Application
+### Legacy Applications
 
-For legacy task application which do not implement TaskApplication interface, you may specify the system, stream, and local stores in your job’s configuration, in addition to task.class. An incomplete example of configuration for legacy task application could look like this (see the [configuration](../jobs/configuration.md) documentation for more detail):
+For legacy Low Level API applications, you can continue specifying your system, stream and store properties along with your task.class in configuration. An incomplete example of configuration for legacy task application looks like this (see the [configuration](../jobs/configuration.md) documentation for more detail):
 
 {% highlight jproperties %}
 
-    # This is the class above, which Samza will instantiate when the job is run
+    # This is the Task class that Samza will instantiate when the job is run
     task.class=com.example.samza.PageViewFilterTask
 
     # Define a system called "kafka" (you can give it any name, and you can define


[4/9] samza git commit: Cleanup the connectors-overview and Kafka-connector sections. Use System/StreamDescriptors

Posted by ja...@apache.org.
Cleanup the connectors-overview and Kafka-connector sections. Use System/StreamDescriptors

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #778 from vjagadish1989/website-reorg26


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4b60e73
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4b60e73
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4b60e73

Branch: refs/heads/1.0.0
Commit: e4b60e732a063c50736599fb0c6e0cba31a6e45c
Parents: 93a2542
Author: Jagadish <jv...@linkedin.com>
Authored: Sun Oct 28 11:29:07 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:32:00 2018 -0800

----------------------------------------------------------------------
 .../documentation/versioned/connectors/kafka.md | 168 +++++++++++--------
 .../versioned/connectors/overview.md            |  38 ++---
 2 files changed, 112 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e4b60e73/docs/learn/documentation/versioned/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kafka.md b/docs/learn/documentation/versioned/connectors/kafka.md
index b6e78e4..447bfdc 100644
--- a/docs/learn/documentation/versioned/connectors/kafka.md
+++ b/docs/learn/documentation/versioned/connectors/kafka.md
@@ -19,108 +19,132 @@ title: Kafka Connector
    limitations under the License.
 -->
 
-## Overview
-Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process it and emit results to other Kafka topics or external systems.
+### Kafka I/O : QuickStart
+Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process them and emit results to other Kafka topics or databases.
 
-## Consuming from Kafka
+The `hello-samza` project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results. 
 
-### <a name="kafka-basic-configuration"></a>Basic Configuration
+- [High-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) with a corresponding [tutorial](/learn/documentation/{{site.version}}/deployment/yarn.html#starting-your-application-on-yarn)
 
-The example below provides a basic example for configuring a system called `kafka-cluster-1` that uses the provided KafkaSystemFactory.
+- [Low-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) with a corresponding [tutorial](https://github.com/apache/samza-hello-samza#hello-samza)
 
-{% highlight jproperties %}
-# Set the SystemFactory implementation to instantiate KafkaSystemConsumer, KafkaSystemProducer and KafkaSystemAdmin
-systems.kafka-cluster-1.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 
-# Define the default key and message SerDe.
-systems.kafka-cluster-1.default.stream.samza.key.serde=string
-systems.kafka-cluster-1.default.stream.samza.msg.serde=json
+### Concepts
 
-# Zookeeper nodes of the Kafka cluster
-systems.kafka-cluster-1.consumer.zookeeper.connect=localhost:2181
+####KafkaSystemDescriptor
 
-# List of network endpoints where Kafka brokers are running. Also needed by consumers for querying metadata.
-systems.kafka-cluster-1.producer.bootstrap.servers=localhost:9092,localhost:9093
+Samza refers to any IO source (eg: Kafka) it interacts with as a _system_, whose properties are set using a corresponding `SystemDescriptor`. The `KafkaSystemDescriptor` allows you to describe the Kafka cluster you are interacting with and specify its properties. 
+
+{% highlight java %}
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+            .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+            .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
 {% endhighlight %}
 
-Samza provides a built-in KafkaSystemDescriptor to consume from and produce to Kafka from the StreamApplication (High-level API) or the TaskApplication (Low-level API).
 
-Below is an example of how to use the descriptors in the describe method of a StreamApplication.
+####KafkaInputDescriptor
 
+A Kafka cluster usually has multiple topics (a.k.a _streams_). The `KafkaInputDescriptor` allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of `KafkaInputDescriptor`
+by providing a topic-name and a serializer.
 {% highlight java %}
-public class PageViewFilter implements StreamApplication {
-  @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
-    // add input and output streams
-    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1");
-    KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor("myinput", new JsonSerdeV2<>(PageView.class));
-    KafkaOutputDescriptor<DecoratedPageView> osd = ksd.getOutputDescriptor("myout", new JsonSerdeV2<>(DecordatedPageView.class));
-
-    MessageStream<PageView> ms = appDesc.getInputStream(isd);
-    OutputStream<DecoratedPageView> os = appDesc.getOutputStream(osd);
-
-    ms.filter(this::isValidPageView)
-      .map(this::addProfileInformation)
-      .sendTo(os);
-  }
-}
+    KafkaInputDescriptor<PageView> pageViewStreamDescriptor = kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class));
 {% endhighlight %}
 
-Below is an example of how to use the descriptors in the describe method of a TaskApplication
+The above example describes an input Kafka stream from the "page-view-topic" which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc.
+ 
+####KafkaOutputDescriptor
+
+Similarly, the `KafkaOutputDescriptor` allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of `KafkaOutputDescriptor`.
 
 {% highlight java %}
-public class PageViewFilterTask implements TaskApplication {
-  @Override
-  public void describe(TaskApplicationDescriptor appDesc) {
-    // add input and output streams
-    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1");
-    KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
-    KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
-
-    appDesc.addInputStream(isd);
-    appDesc.addOutputStream(osd);
-    appDesc.addTable(td);
-
-    appDesc.withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
-  }
-}
+    KafkaOutputDescriptor<DecoratedPageView> decoratedPageView = kafkaSystemDescriptor.getOutputDescriptor("my-output-topic", new JsonSerdeV2<>(DecoratedPageView.class));
 {% endhighlight %}
 
-### Advanced Configuration
 
-Prefix the configuration with `systems.system-name.consumer.` followed by any of the Kafka consumer configurations. See [Kafka Consumer Configuration Documentation](http://kafka.apache.org/documentation.html#consumerconfigs)
+### Configuration
 
-{% highlight jproperties %}
-systems.kafka-cluster-1.consumer.security.protocol=SSL
-systems.kafka-cluster-1.consumer.max.partition.fetch.bytes=524288
+#####Configuring Kafka producer and consumer
+ 
+The `KafkaSystemDescriptor` allows you to specify any [Kafka producer](https://kafka.apache.org/documentation/#producerconfigs) or [Kafka consumer](https://kafka.apache.org/documentation/#consumerconfigs)) property which are directly passed over to the underlying Kafka client. This allows for 
+precise control over the KafkaProducer and KafkaConsumer used by Samza. 
+
+{% highlight java %}
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..)
+            .withProducerBootstrapServers(..)
+            .withConsumerConfigs(..)
+            .withProducerConfigs(..)
 {% endhighlight %}
 
-## Producing to Kafka
 
-### Basic Configuration
+####Accessing an offset which is out-of-range
+This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers. 
 
-The basic configuration is the same as [Consuming from Kafka](#kafka-basic-configuration).
+{% highlight java %}
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..)
+            .withProducerBootstrapServers(..)
+            .withConsumerAutoOffsetReset("largest")
+{% endhighlight %}
 
-### Advanced Configuration
 
-#### Changelog to Kafka for State Stores
+#####Ignoring checkpointed offsets
+Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with `KafkaInputDescriptor#shouldResetOffset()`.
+Once there are no checkpoints for a stream, the `#withOffsetDefault(..)` determines whether we start consumption from the oldest or newest offset. 
 
-For Samza processors that have local state and is configured with a changelog for durability, if the changelog is configured to use Kafka, there are Kafka specific configuration parameters.
-See section on `TODO: link to state management section` State Management `\TODO` for more details.
+{% highlight java %}
+KafkaInputDescriptor<PageView> pageViewStreamDescriptor = 
+    kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class)) 
+        .shouldResetOffset()
+        .withOffsetDefault(OffsetType.OLDEST);
 
-{% highlight jproperties %}
-stores.store-name.changelog=kafka-cluster-2.changelog-topic-name
-stores.store-name.changelog.replication.factor=3
-stores.store-name.changelog.kafka.cleanup.policy=compact
 {% endhighlight %}
 
-#### Performance Tuning
+The above example configures Samza to ignore checkpointed offsets for `page-view-topic` and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using `KafkaSystemDescriptor#withDefaultStreamOffsetDefault`.
+
+ 
 
-Increasing the consumer fetch buffer thresholds may improve throughput at the expense of memory by buffering more messages. Run some performance analysis to find the optimal values.
+### Code walkthrough
+
+In this section, we walk through a complete example.
+
+#### High-level API
+{% highlight java %}
+// Define coordinates of the Kafka cluster using the KafkaSystemDescriptor
+1    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
+2        .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+3        .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+
+// Create an KafkaInputDescriptor for your input topic and a KafkaOutputDescriptor for the output topic 
+4    KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class));
+5    KafkaInputDescriptor<KV<String, PageView>> inputDescriptor =
+6        kafkaSystemDescriptor.getInputDescriptor("page-views", serde);
+7    KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor =
+8        kafkaSystemDescriptor.getOutputDescriptor("filtered-page-views", serde);
+
+
+// Obtain a message stream the input topic
+9    MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor);
+
+// Obtain an output stream for the topic    
+10    OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor);
+
+// write results to the output topic
+11    pageViews
+12       .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
+13       .sendTo(filteredPageViews);
 
-{% highlight jproperties %}
-# Max number of messages to buffer across all Kafka input topic partitions per container. Default is 50000 messages.
-systems.kafka-cluster-1.samza.fetch.threshold=10000
-# Max buffer size by bytes. This configuration takes precedence over the above configuration if value is not -1. Default is -1.
-systems.kafka-cluster-1.samza.fetch.threshold.bytes=-1
 {% endhighlight %}
+
+- Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster
+
+- Lines 4-6 defines a KafkaInputDescriptor for our input topic - `page-views`
+
+- Lines 7-9 defines a KafkaOutputDescriptor for our output topic - `filtered-page-views`
+
+- Line 9 creates a MessageStream for the input topic so that you can chain operations on it later
+
+- Line 10 creates an OuputStream for the output topic
+
+- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e4b60e73/docs/learn/documentation/versioned/connectors/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/overview.md b/docs/learn/documentation/versioned/connectors/overview.md
index 5b6ba39..6697b9c 100644
--- a/docs/learn/documentation/versioned/connectors/overview.md
+++ b/docs/learn/documentation/versioned/connectors/overview.md
@@ -20,33 +20,27 @@ title: Connectors overview
 -->
 
 Stream processing applications often read data from external sources like Kafka or HDFS. Likewise, they require processed
-results to be written to external system or data stores. As of the 1.0 release, Samza integrates with the following systems
-out-of-the-box:
+results to be written to external system or data stores. Samza is pluggable and designed to support a variety of [producers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemProducer.html) and [consumers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemConsumer.html) for your data. You can 
+integrate Samza with any streaming system by implementing the [SystemFactory](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemFactory.html) interface. 
 
-- [Apache Kafka](kafka) (consumer/producer)
-- [Microsoft Azure Eventhubs](eventhubs) (consumer/producer)
-- [Amazon AWS Kinesis Streams](kinesis) (consumer)
-- [Hadoop Filesystem](hdfs) (consumer/producer)
-- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java) (producer)
+The following integrations are supported out-of-the-box:
 
-Instructions on how to use these connectors can be found in the corresponding subsections. Please note that the
-connector API is different from [Samza Table API](../api/table-api), where the data could be read from and written to
-data stores.
+Consumers:
 
-Samza is pluggable and designed to support a variety of producers and consumers. You can provide your own producer or
-consumer by implementing the SystemFactory interface.
+- [Apache Kafka](kafka) 
 
-To associate a system with a Samza Connector, the user needs to set the following config:
+- [Microsoft Azure Eventhubs](eventhubs) 
 
-{% highlight jproperties %}
-systems.<system-name>.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-{% endhighlight %}
+- [Amazon AWS Kinesis Streams](kinesis) 
 
-Any system specific configs, could be defined as below:
+- [Hadoop Filesystem](hdfs) 
 
-{% highlight jproperties %}
-systems.<system-name>.param1=value1
-systems.<system-name>.consumer.param2=value2
-systems.<system-name>.producer.param3=value3
-{% endhighlight %}
+Producers:
 
+- [Apache Kafka](kafka) 
+
+- [Microsoft Azure Eventhubs](eventhubs) 
+
+- [Hadoop Filesystem](hdfs) 
+
+- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java)


[5/9] samza git commit: Cleanup the EventHubs connector section. Use System/Stream Descriptors

Posted by ja...@apache.org.
Cleanup the EventHubs connector section. Use System/Stream Descriptors

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #780 from vjagadish1989/website-reorg27


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/299c0317
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/299c0317
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/299c0317

Branch: refs/heads/1.0.0
Commit: 299c031795eaaaa3cf255e02d5fc636a43570e7d
Parents: e4b60e7
Author: Jagadish <jv...@linkedin.com>
Authored: Mon Oct 29 16:31:11 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:32:22 2018 -0800

----------------------------------------------------------------------
 .../versioned/connectors/eventhubs.md           | 209 +++++++------------
 .../documentation/versioned/connectors/kafka.md |   7 +-
 .../documentation/versioned/jobs/logging.md     |   2 +
 3 files changed, 75 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/299c0317/docs/learn/documentation/versioned/connectors/eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/eventhubs.md b/docs/learn/documentation/versioned/connectors/eventhubs.md
index 16120f3..0be288b 100644
--- a/docs/learn/documentation/versioned/connectors/eventhubs.md
+++ b/docs/learn/documentation/versioned/connectors/eventhubs.md
@@ -19,135 +19,59 @@ title: Event Hubs Connector
    limitations under the License.
 -->
 
-## Overview
+## EventHubs I/O: QuickStart
 
-The Samza Event Hubs connector provides access to [Azure Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft’s data streaming service on Azure. An event hub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). 
+The Samza EventHubs connector provides access to [Azure EventHubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft’s data streaming service on Azure. An eventhub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). 
 
-You may find an [example](../../../tutorials/versioned/samza-event-hubs-standalone.md) using this connector in the [hello-samza](https://github.com/apache/samza-hello-samza) project.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project includes an [example](../../../tutorials/versioned/samza-event-hubs-standalone.md) of reading and writing to EventHubs.
 
-## Consuming from Event Hubs
+### Concepts
 
-Samza’s [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). Samza's Event Hubs consumer wraps each message from Event Hubs into an EventHubMessageEnvelope. The envelope has two fields of interest - the key, which is set to the event's String partition key and the message, which is set to the actual data in the event.
-
-You can describe your Samza jobs to process data from Azure Event Hubs. To set Samza to consume from Event Hubs streams:
+####EventHubsSystemDescriptor
 
+Samza refers to any IO source (eg: Kafka) it interacts with as a _system_, whose properties are set using a corresponding `SystemDescriptor`. The `EventHubsSystemDescriptor` allows you to configure various properties for the `EventHubsClient` used by Samza.
 {% highlight java %}
- 1  public void describe(StreamApplicationDescriptor appDescriptor) {
- 2  // Define your system here
- 3  EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
- 4  
- 5  // Choose your serializer/deserializer for the EventData payload
- 6  StringSerde serde = new StringSerde();
- 7  
- 8  // Define the input descriptors with respective descriptors
- 9  EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
-10    systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde)
-11        .withSasKeyName(EVENTHUBS_SAS_KEY_NAME)
-12        .withSasKey(EVENTHUBS_SAS_KEY_TOKEN);
-13  
-14  // Define the input streams with descriptors
-15  MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
-16  
-17  //...
-18  }
-{% endhighlight %}
-
-In the code snippet above, we create the input and output streams that can consume and produce from the configured Event Hubs entities.
-
-1. Line 3: A `EventHubsSystemDescriptor` is created with the name "eventhubs". You may set different system descriptors here. 
-2. Line 6: Event Hubs messages are consumed as key value pairs. The [serde](../../documentation/versioned/container/serialization.html) is defined for the value of the incoming payload of the Event Hubs' EventData. You may use any of the serdes that samza ships with out of the box or define your own.
-The serde for the key is not set since it will always the String from the EventData [partitionKey](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__).
-3. Line 8-12: An `EventHubsInputDescriptor` is created with the required descriptors to gain access of the Event Hubs entity (`STREAM_ID`, `EVENTHUBS_NAMESPACE`, `EVENTHUBS_ENTITY`, `EVENTHUBS_SAS_KEY_NAME`, `EVENTHUBS_SAS_KEY_TOKEN`).
-These must be set to the credentials of the entities you wish to connect to.
-4. Line 15: creates an `InputStream` with the previously defined `EventHubsInputDescriptor`.
-
-Alternatively, you can set these properties in the `.properties` file of the application.
-Note: the keys set in the `.properties` file will override the ones set in code with descriptors.
-Refer to the [Event Hubs configuration reference](../../documentation/versioned/jobs/samza-configurations.html#eventhubs) for the complete list of configurations.
-
-{% highlight jproperties %}
-# define an event hub system factory with your identifier. eg: eh-system
-systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
-
-# define your streams
-systems.eh-system.stream.list=eh-input-stream
-streams.eh-stream.samza.system=eh-system
-
-# define required properties for your streams
-streams.eh-input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-streams.eh-input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-streams.eh-input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-streams.eh-input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+ 1  EventHubsSystemDescriptor eventHubsSystemDescriptor = new EventHubsSystemDescriptor("eventhubs").withNumClientThreads(5);
 {% endhighlight %}
 
-It is required to provide values for YOUR-STREAM-NAMESPACE, YOUR-ENTITY-NAME, YOUR-SAS-KEY-NAME, YOUR-SAS-KEY-TOKEN to read or write to the stream.
-
-## Producing to Event Hubs
+####EventHubsInputDescriptor
 
-Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) from Samza is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the message in the envelope. Additionally, the key and the produce timestamp are set as properties in the EventData before sending it to Event Hubs.
-Similarly, you can also configure your Samza job to write to Event Hubs. Follow the same descriptors defined in the Consuming from Event Hubs section to write to Event Hubs:
+The EventHubsInputDescriptor allows you to specify the properties of each EventHubs stream your application should read from. For each of your input streams, you should create a corresponding instance of EventHubsInputDescriptor by providing a topic-name and a serializer.
 
 {% highlight java %}
- 1  public void describe(StreamApplicationDescriptor appDescriptor) {
- 2  // Define your system here
- 3  EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
- 4  
- 5  // Choose your serializer/deserializer for the EventData payload
- 6  StringSerde serde = new StringSerde();
- 7  
- 8  // Define the input and output descriptors with respective descriptors
- 9  EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
-10    systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
-11        .withSasKeyName(EVENTHUBS_SAS_KEY_NAME)
-12        .withSasKey(EVENTHUBS_SAS_KEY_TOKEN);
-13  
-14  // Define the output streams with descriptors
-15  OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
-16  
-17  //...
-18  }
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor = 
+        systemDescriptor.getInputDescriptor(streamId, "eventhubs-namespace", "eventhubs-name", new StringSerde())
+          .withSasKeyName("secretkey")
+          .withSasKey("sasToken-123")
+          .withConsumerGroup("$notdefault");
 {% endhighlight %}
 
-In the code snippet above, we create the input and output streams that can consume and produce from the configured Event Hubs entities.
+By default, messages are sent and received as byte arrays. Samza then de-serializes them to typed objects using your provided Serde. For example, the above uses a `StringSerde` to de-serialize messages.
 
-1. Line 3: A `EventHubsSystemDescriptor` is created with the name "eventhubs". You may set different system descriptors here. 
-2. Line 6: Event Hubs messages are produced as key value pairs. The [serde](../../documentation/versioned/container/serialization.html) is defined for the value of the payload of the outgoing Event Hubs' EventData. You may use any of the serdes that samza ships with out of the box or define your own.
-The serde for the key is not set since it will always the String from the EventData [partitionKey](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__).
-3. Line 9-12: An `EventHubsOutputDescriptor` is created with the required descriptors to gain access of the Event Hubs entity (`STREAM_ID`, `EVENTHUBS_NAMESPACE`, `EVENTHUBS_ENTITY`, `EVENTHUBS_SAS_KEY_NAME`, `EVENTHUBS_SAS_KEY_TOKEN`).
-These must be set to the credentials of the entities you wish to connect to.
-4. Line 15: creates an `OutputStream` with the previously defined `EventHubsOutputDescriptor`.
 
-Alternatively, you can set these properties in the `.properties` file of the application.
-Note: the keys set in the `.properties` file will override the ones set in code with descriptors.
-Refer to the [Event Hubs configuration reference](../../documentation/versioned/jobs/samza-configurations.html#eventhubs) for the complete list of configurations.
+####EventHubsOutputDescriptor
 
-{% highlight jproperties %}
-# define an event hub system factory with your identifier. eg: eh-system
-systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+Similarly, the `EventHubsOutputDescriptor` allows you to specify the output streams for your application. For each output stream you write to in EventHubs, you should create an instance of `EventHubsOutputDescriptor`.
 
-# define your streams
-systems.eh-system.stream.list=eh-output-stream
-streams.eh-stream.samza.system=eh-system
-
-streams.eh-output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-streams.eh-output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-streams.eh-output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-streams.eh-output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+{% highlight java %}
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
+        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, new StringSerde();)
+            .withSasKeyName(..)
+            .withSasKey(..);
 {% endhighlight %}
 
-Then you can consume and produce a message to Event Hubs in your code as below:
+####Security Model
+Each EventHubs stream is scoped to a container called a _namespace_, which uniquely identifies an EventHubs in a region. EventHubs's [security model](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview) is based on Shared Access Signatures(SAS). 
+Hence, you should also provide your SAS keys and tokens to access the stream. You can generate your SAS tokens using the 
 
-{% highlight java %}
-MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
-OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
-eventhubInput.sendTo(eventhubOutput)
-{% endhighlight %}
+####Data Model
+Each event produced and consumed from an EventHubs stream is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data), which wraps a byte-array payload. When producing to EventHubs, Samza serializes your object into an `EventData` payload before sending it over the wire. Likewise, when consuming messages from EventHubs, messages are de-serialized into typed objects using the provided Serde. 
 
-## Advanced configuration
+## Configuration
 
 ###Producer partitioning
 
-The partition.method property determines how outgoing messages are partitioned. Valid values for this config are EVENT\_HUB\_HASHING, PARTITION\_KEY\_AS_PARTITION or ROUND\_ROBIN.
+You can use `#withPartitioningMethod` to control how outgoing messages are partitioned. The following partitioning schemes are supported:
 
 1. EVENT\_HUB\_HASHING: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.
 
@@ -155,22 +79,16 @@ The partition.method property determines how outgoing messages are partitioned.
 
 3. ROUND\_ROBIN: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.
 
-##### Using descriptors
 {% highlight java %}
 EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs")
         .withPartitioningMethod(PartitioningMethod.EVENT_HUB_HASHING);
 {% endhighlight %}
 
-##### Using config properties
-{% highlight jproperties %}
-systems.eh-system.partition.method = EVENT_HUB_HASHING
-{% endhighlight %}
 
 ### Consumer groups
 
-Event Hubs supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the consumer group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job by configuring an Event Hubs.consumer.group
+Event Hubs supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job using `withConsumerGroup`.
 
-##### Using descriptors
 {% highlight java %}
 EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
 EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
@@ -178,41 +96,54 @@ EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
             .withConsumerGroup("my-group");
 {% endhighlight %}
 
-##### Using config properties
-{% highlight jproperties %}
-streams.eh-input-stream.eventhubs.consumer.group = my-group
-{% endhighlight %}
 
-### Serde
+### Consumer buffer size
 
-By default, the messages from Event Hubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde for your stream.
+When the consumer reads a message from EventHubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.
 
-##### Using descriptors
 {% highlight java %}
-JsonSerde inputSerde = new JsonSerde();
-EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
-    systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, inputSerde);
-JsonSerde outputSerde = new JsonSerde();
-EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
-     systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, outputSerde);
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs")
+        .withReceiveQueueSize(10);
 {% endhighlight %}
 
-##### Using config properties
-{% highlight jproperties %}
-streams.input0.samza.msg.serde = json
-streams.output0.samza.msg.serde = json
-{% endhighlight %}
+### Code walkthrough
 
-### Consumer buffer size
+In this section, we will walk through a simple pipeline that reads from one EventHubs stream and copies each message to another output stream. 
 
-When the consumer reads a message from event hubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.
-##### Using descriptors
 {% highlight java %}
- EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs")
-        .withReceiveQueueSize(10);
+1    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs").withNumClientThreads(5);
+
+2    EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
+        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, new StringSerde())
+            .withSasKeyName(..)
+            .withSasKey(..));
+
+3    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
+        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
+            .withSasKeyName(..))
+            .withSasKey(..));
+
+4    MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
+5    OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+
+    // Define the execution flow with the high-level API
+6    eventhubInput
+7        .map((message) -> {
+8          System.out.println("Received Key: " + message.getKey());
+9          System.out.println("Received Message: " + message.getValue());
+10          return message;
+11        })
+12        .sendTo(eventhubOutput);
 {% endhighlight %}
 
-##### Using config properties
-{% highlight jproperties %}
-systems.eh-system.eventhubs.receive.queue.size = 10
-{% endhighlight %}
\ No newline at end of file
+-Line 1 instantiates an `EventHubsSystemDescriptor` configuring an EventHubsClient with 5 threads. To consume from other input sources like Kafka, you can define their corresponding descriptors. 
+
+-Line 2 creates an `EventHubsInputDescriptor` with a String serde for its values. Recall that Samza follows a KV data-model for input messages. In the case of EventHubs, the key is a string which is set to the [partitionKey](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__) in the message. Hence, no separate key serde is required. 
+
+-Line 3 creates an `EventHubsOutputDescriptor` to write to an EventHubs stream with the given credentials.
+
+-Line 4 obtains a `MessageStream` from the input descriptor that you can later chain operations on. 
+
+-Line 5 creates an `OutputStream` with the previously defined `EventHubsOutputDescriptor` that you can send messages to.
+
+-Line 7-12 define a simple pipeline that copies message from one EventHubs stream to another
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/299c0317/docs/learn/documentation/versioned/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kafka.md b/docs/learn/documentation/versioned/connectors/kafka.md
index 447bfdc..b71c736 100644
--- a/docs/learn/documentation/versioned/connectors/kafka.md
+++ b/docs/learn/documentation/versioned/connectors/kafka.md
@@ -105,11 +105,10 @@ The above example configures Samza to ignore checkpointed offsets for `page-view
 
  
 
-### Code walkthrough
+### Code walkthrough: High-level API
 
-In this section, we walk through a complete example.
+In this section, we walk through a complete example that reads from a Kafka topic, filters a few messages and writes them to another topic.
 
-#### High-level API
 {% highlight java %}
 // Define coordinates of the Kafka cluster using the KafkaSystemDescriptor
 1    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
@@ -147,4 +146,4 @@ In this section, we walk through a complete example.
 
 - Line 10 creates an OuputStream for the output topic
 
-- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream
\ No newline at end of file
+- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream

http://git-wip-us.apache.org/repos/asf/samza/blob/299c0317/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md
index 8717c28..9dde172 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -163,7 +163,9 @@ Rest all of the system properties will be set exactly like in the case of log4j,
 
 If you are already using log4j and want to upgrade to using log4j2, following are the changes you will need to make in your job:
 -	Clean your lib directory. This will be rebuilt with new dependency JARs and xml files.
+
 -	Replace log4j’s dependencies with log4j2’s in your pom.xml/build.gradle as mentioned above. Please ensure that none of log4j’s dependencies remain in pom.xml/build.gradle
+
 -	Create a log4j2.xml to match your existing log4j.xml file. 
 -	Rebuild your application
 


[3/9] samza git commit: DOCS: Clean-up the section on YARN deployments

Posted by ja...@apache.org.
DOCS: Clean-up the section on YARN deployments

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #777 from vjagadish1989/website-reorg25


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93a25420
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93a25420
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93a25420

Branch: refs/heads/1.0.0
Commit: 93a25420f5b00609e99be294128316dff40da820
Parents: ed616ec
Author: Jagadish <jv...@linkedin.com>
Authored: Sat Oct 27 11:18:08 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:29:25 2018 -0800

----------------------------------------------------------------------
 .../yarn/coordinator-internals.png              | Bin 30163 -> 39061 bytes
 .../documentation/versioned/deployment/yarn.md  | 225 ++++++++-----------
 2 files changed, 93 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/93a25420/docs/img/versioned/learn/documentation/yarn/coordinator-internals.png
----------------------------------------------------------------------
diff --git a/docs/img/versioned/learn/documentation/yarn/coordinator-internals.png b/docs/img/versioned/learn/documentation/yarn/coordinator-internals.png
index 7f4f161..9a19552 100755
Binary files a/docs/img/versioned/learn/documentation/yarn/coordinator-internals.png and b/docs/img/versioned/learn/documentation/yarn/coordinator-internals.png differ

http://git-wip-us.apache.org/repos/asf/samza/blob/93a25420/docs/learn/documentation/versioned/deployment/yarn.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/deployment/yarn.md b/docs/learn/documentation/versioned/deployment/yarn.md
index c30346b..b32ba68 100644
--- a/docs/learn/documentation/versioned/deployment/yarn.md
+++ b/docs/learn/documentation/versioned/deployment/yarn.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Run on YARN.
+title: Run on YARN
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -45,44 +45,42 @@ title: Run on YARN.
 - [Coordinator Internals](#coordinator-internals)
 
 
-# Introduction
+## Introduction
 
-YARN (Yet Another Resource Negotiator) is part of the Hadoop project and provides the ability to run distributed applications on a cluster. A YARN cluster minimally consists of a Resource Manager (RM) and multiple Node Managers (NM). The RM is responsible for coordinating allocations and tracks resources available on the NMs. The NM is an agent that executes on each node in the cluster and is responsible for running containers (user processes), monitoring their resource usage and reporting the same to the ResourceManager. Applications are run on the cluster by implementing a coordinator called an ApplicationMaster (AM). The AM is responsible for requesting resources (CPU, Memory etc) from the Resource Manager (RM) on behalf of the application. The RM allocates the requested resources on one or more NMs that   can accomodate the request made.
+Apache YARN is part of the Hadoop project and provides the ability to run distributed applications on a cluster. A YARN cluster minimally consists of a Resource Manager (RM) and multiple Node Managers (NM). The RM is responsible for managing the resources in the cluster and allocating them to applications. Every node in the cluster has an NM (Node Manager), which is responsible for managing containers on that node - starting them, monitoring their resource usage and reporting the same to the RM. 
 
-Samza provides an implementation of the AM in order to run a jobs alongside other application deployed on YARN. The AM makes decisions such as requesting allocation of containers, which machines a Samza job’s containers should run on, what to do when a container fails etc.
+Applications are run on the cluster by implementing a coordinator called an ApplicationMaster (AM). The AM is responsible for requesting resources including CPU, memory from the Resource Manager (RM) on behalf of the application. Samza provides its own implementation of the AM for each job.
 
+## Running on YARN: Quickstart
 
-# Starting your application on YARN
+We will demonstrate running a Samza application on YARN by using the `hello-samza` example. Lets first checkout our repository.
 
-## Setting up a single node YARN cluster (optional)
-
-If you already have a YARN cluster setup to deploy jobs, please jump to [Submitting the application to YARN](#submitting-the-application-to-yarn). If not the following section will help set up a single node cluster to test a Samza job deploy.
+```bash
+git clone https://github.com/apache/samza-hello-samza.git
+cd samza-hello-samza
+git checkout latest
+```
 
-We can use the `grid` script which is part of the [hello-samza](https://github.com/apache/samza-hello-samza/) repository to setup a single node YARN cluster (and optionally a Zookeeper and Kafka cluster as well).
+### Set up a single node YARN cluster
 
-Run the following to setup a single node YARN cluster:
+You can use the `grid` script included as part of the [hello-samza](https://github.com/apache/samza-hello-samza/) repository to setup a single-node cluster. The script also starts Zookeeper and Kafka locally.
 
-```bash
-./grid install yarn
-./grid start yarn
+```
+./bin/grid bootstrap
 ```
 
-## Submitting the application to YARN
+### Submitting the application to YARN
 
-Assuming you have a YARN cluster setup, let us take a look at building your application and deploying it to YARN. Samza provides shell scripts as part of the `samza-shell` module that help in submitting the application to YARN and you should include it as part of your dependencies jobs dependencies.
+Now that we have a YARN cluster ready, lets build our application. The below command does a maven-build and generates an archive in the `./target` folder. 
 
-```xml
-<dependency>
-    <groupId>org.apache.samza</groupId>
-    <artifactId>samza-shell</artifactId>
-    <version>${samza.version}</version>
-</dependency>
+```bash
+./bin/build-package.sh
 ```
 
-Samza jobs are usually deployed in a tarball and archive should contain the following as top-level directories.
+You can inspect the structure of the generated archive. To run on YARN, Samza jobs should be packaged with the following structure.
 
 ```bash
-samza-job-artifact-folder
+samza-job-name-folder
 ├── bin
 │   ├── run-app.sh
 │   ├── run-class.sh
@@ -96,94 +94,81 @@ samza-job-artifact-folder
     ├── samza-yarn_2.11-0.14.0.jar
     └── ...
 ```
-The scripts in the `samza-shell` module make the assumption that the built artifact (tarball) has the exact directory structure as seen above. The scripts in the samza-shell module should be copied to a bin directory and all jars need to be part of lib as seen above. The hello-samza project is a good example on setting the structure of your application’s build.
 
-Once the job is built, the `run-app.sh` script can be used to submit the application to the Resource Manager. The script takes 2 CLI parameters - the config factory and the config file for the application. It can be invoked as follows:
+Once the archive is built, the `run-app.sh` script can be used to submit the application to YARN's resource manager. The script takes 2 CLI parameters - the config factory and the config file for the application. As an example, lets run our [FilterExample](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) on YARN as follows:
 
 ```bash
-$ /path/to/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://path/to/config/application.properties
+$ ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path ./deploy/samza/config/filter-example.properties
 ```
 
-Make sure that the following configurations are set in your configs.
-
-```properties
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-yarn.package.path=https://url/to/artifact/artifact-version-dist.tar.gz
-```
+Congratulations, you've successfully submitted your first job to YARN! You can view the YARN Web UI to view its status. 
 
-# Application Master UI
 
-The AM implementation in Samza exposes metadata about the job via both a JSON REST interface and a Web UI.
-This Web UI can be accessed by clicking the Tracking UI (*ApplicationMaster*) link on the YARN RM dashboard.
+## Application Master UI
 
+The YARN RM provides a Web UI to view the status of applications in the cluster, their containers and logs. By default, it can be accessed from `localhost:8088` on the RM host. 
 ![diagram-medium](/img/{{site.version}}/learn/documentation/yarn/yarn-am-ui.png)
 
-The Application Master UI provides you the ability to view:
+In addition to YARN's UI, Samza also offers a REST end-point and a web interface for its ApplicationMaster. To access it, simply click on the Tracking UI link corresponding to your application. 
+Samza's Application Master UI provides you the ability to view:
 
- - Job level runtime metadata
+ - Job-level runtime metadata - eg: JMX endpoints, running JVM version
 ![diagram-small](/img/{{site.version}}/learn/documentation/yarn/am-runtime-metadata.png)
 
 
- - Container information
+ - Information about individual containers eg: their uptime, status and logs
 ![diagram-small](/img/{{site.version}}/learn/documentation/yarn/am-container-info.png)
 
- - Job model (SystemStreamPartition to Task and Container mapping)
+ - Task Groups eg: Information on individual tasks, where they run and which partitions are consumed from what host
 ![diagram-small](/img/{{site.version}}/learn/documentation/yarn/am-job-model.png)
 
 
-- Runtime configs
+ - Runtime configs for your application
 ![diagram-small](/img/{{site.version}}/learn/documentation/yarn/am-runtime-configs.png)
 
 
-# Viewing logs
+### Configurations
 
-Each container produces logs and they can be easily accessed via the Container information page in ApplicationMaster UI described in the previous section. Clicking on the container name under the Running or Failed container section will take you to the logs page that corresponds to that specific container.
+In this section, we'll look at configuring your jobs when running on YARN.
 
-If there is a need to analyze logs across containers, it is recommended to set up a centralized logging system like ELK (Elasticsearch, Logstash and Kibana). Samza provides a StreamAppender that supports emitting your logs to Kafka for this purpose. The logs written to the stream can then be ingested by service like Logstash and indexed in Elasticsearch.
+#### Configuring parallelism
 
+[Recall](/learn/documentation/{{site.version}}/architecture/architecture-overview.html#container) that Samza scales your applications by breaking them into multiple tasks. On YARN, these tasks are executed on one or more containers, each of which is a Java process. You can control the number of containers allocated to your application by configuring `cluster-manager.container.count`. For example, if we are consuming from an input topic with 5 partitions, Samza will create 5 tasks, each of which process one partition. Tasks are equally distributed among available containers. The number of containers can be utmost the number of tasks - since, we cannot have idle containers without any tasks assigned to them. 
 
-# Configuration
+#### Configuring resources
 
-In the following section let's take a look at the different tunables that exist as part of Samza to control your deployment on YARN
+Samza jobs on YARN run on a multi-tenant cluster and should be isolated from each other. YARN implements isolation by enforcing limits on memory and CPU each application can use.
 
-## Configuring parallelism
+##### Memory
 
-As a refresher, Samza scales your applications by breaking them into multiple tasks. On YARN, these tasks are executed on one or more containers, each of which is a Java process. You can control the number of containers allocated to your job by configuring `cluster-manager.container.count`. For example If we had 2 input topics with 10 partitions each processor would consist of 10 Tasks, each processing 2 partitions each. Setting `cluster-manager.container.count` to 1 would run all 10 tasks in one JVM process, setting it to 2 will distribute the tasks equally among 2 JVM processes and so on.
+You can configure the memory-limit per-container using `cluster-manager.container.memory.mb` and memory-limit for the AM using `yarn.am.container.memory.mb`. If your container process exceeds its configured memory-limits, it is automatically killed by YARN. 
 
-Please note that it is not possible to distribute 10 tasks across more than 10 containers, therefore the upper bound for `cluster-manager.container.count` is less than or equal to the number of Tasks in your job (or more generally the max of number of partitions among all input streams).
 
+##### CPU
 
-## Configuring resources
+Similar to configuring memory-limits, you can configure the maximum number of vCores (virtual cores) each container can use by setting `cluster-manager.container.cpu.cores`. A _vCore_ is YARN's abstraction over a physical core on a NodeManager which allows for over-provisioning. YARN supports [isolation]((http://riccomini.name/posts/hadoop/2013-06-14-yarn-with-cgroups/)) of cpu cores using Linux CGroups.
 
-When running Samza jobs in a shared environment, the stream processors can have an impact on each other’s performance. YARN prevents these issues by providing isolation when running applications on the cluster by enforcing strict limits on resources that each application is allowed to use. YARN (2.7.*) currently supports resource management for memory and CPU.
 
-### Memory
+#### Configuring retries
 
-All containers requests by the Application Master will have a max-memory size defined when they’re created. Samza supports configuring these memory limits using `cluster-manager.container.memory.mb` and `yarn.am.container.memory.mb`. If your container exceeds the configured memory-limits, it is automatically killed by YARN. Keep in mind that this is the maximum memory YARN will allow a Samza Container or ApplicationMaster to have and you will still need to configure your heap settings appropriately using `task.opts`, when using the JVM.
+Failures are common when running any distributed system and should be handled gracefully. The Samza AM automatically restarts containers during a failure. The following properties govern this behavior.
 
-As a cluster administrator if you are running other processes on the same box as the Node Managers (eg: samza-rest) you will want to reserve appropriate amount of memory by configuring `yarn.nodemanager.resource.system-reserved-memory-mb`. Another behaviour to keep in mind is that the Resource Manager allocates resource on the cluster in increments of `yarn.scheduler.minimum-allocation-mb` and `yarn.scheduler.minimum-allocation-vcores`, therefore requesting allocations that are not multiples of the above configs can lead to resource fragmentation.
+`cluster-manager.container.retry.count`: This property determines the maximum number of times Samza will attempt to restart a failed container within a time window. If this property is set to 0, any failed container immediately causes the whole job to fail. If it is set to a negative number, there is no limit on the number of retries.
 
 
-### CPU
-Similar to memory configurations all containers also are CPU bound to a max number of vCores (Virtual cores) on a NM that they are configured to use. YARN has the concept of a virtual core which is generally set to the number of physical cores on the NMs, but can be bump to a higher number if you want to over-provision the NMs with respect to the CPU. Samza supports configuring the vCore of each container by setting `cluster-manager.container.cpu.cores`.
+`cluster-manager.container.retry.window.ms`:  This property determines how frequently a container is allowed to fail before we give up and fail the job. If the same container has failed more than cluster-manager.container.retry.count times and the time between failures is less than this property, then Samza terminates the job. There is no limit to the number of times we restart a container, if the time between failures is greater than cluster-manager.container.retry.window.ms.
 
-Unlike memory, which YARN can enforce limits by itself (by looking at the /proc folder), YARN can’t enforce CPU isolation, since this must be done at the Linux kernel level. One of YARN’s features is its support for Linux CGroups (used to control process utilization at the kernel level in Linux). If YARN is setup to use CGroups, then it will guarantee that a container will get at least the amount of CPU that it requires. Currently, by default YARN will give you more CPU to the container, if it’s available. If enforcing “at most” CPU usage for more predictable performance by your container at the cost of underutilization you can set `yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage` to `true`, see [this article](https://hortonworks.com/blog/apache-hadoop-yarn-in-hdp-2-2-isolation-of-cpu-resources-in-your-hadoop-yarn-clusters/) for more details. For an indepth look at using YARN with CGroups take a look at [this blog post](http://riccomini.name/posts/h
 adoop/2013-06-14-yarn-with-cgroups/).
 
-## Configuring retries
+## YARN - Operations Best practices
 
-Failures are common when running a distributed system and the AM is used to handle Samza Container failures gracefully by automatically restarting containers on failure.
-It should also be noted that if a Samza Container keeps failing constantly it could indicate a deeper problem and we should kill the job rather than having the AM restart it indefinitely. `cluster-manager.container.retry.count` can be used to set the maximum number of times a failed container will be restarted within a time window (configured with `cluster-manager.container.retry.window.ms`), before shutting down the job.
-YARN also provides us a way to automatically restart the job if the AM process fails due to external issues (network partitions, hardware failures etc). By configuring the value of `yarn.resourcemanager.am.max-attempts` YARN will automatically restart the AM process for a fixed number of times before requiring manual intervention to start the job again.
+Although this section is not Samza specific, it describes some best practices for running a YARN cluster in production.
 
-## Configuring RM high-availability and NM work-preserving recovery
-
-Although this section is not Samza specific, it talks about some of the best practices for running a YARN cluster in production specifically around running a highly-available Resource Manager and NodeManager work preserving recovery.
 
 ### Resource Manager high-availability
 
-The Resource Manager (RM) component of a YARN cluster is the source of truth regarding resource utilization and resource scheduling in the cluster. Losing the host running the RM process would kill every single application running on the cluster - making it a single point of failure. The High Availability feature introduced in Hadoop 2.4 adds redundancy in the form of standby Resource Managers to remove this single point of failure.
+The Resource Manager (RM) provides services like scheduling, heartbeats, liveness monitoring to all applications running in the YARN cluster. Losing the host running the RM would kill every application running on the cluster - making it a single point of failure. The High Availability feature introduced in Hadoop 2.4 adds redundancy by allowing multiple stand-by RMs.
 
-In order to configure YARN to run the highly available Resource Manager process set your yarn-site.xml file with the following configs:
+To configure YARN's ResourceManager to be highly available Resource Manager, set your yarn-site.xml file with the following configs:
 
 ```xml
 <property>
@@ -220,108 +205,84 @@ In order to configure YARN to run the highly available Resource Manager process
 </property>
 ```
 
-### NodeManager work-preserving recovery
-
-Turning on work-preserving recovery for the NM gives you the ability to perform maintenance on the cluster (kill NM process for a short duration) without having the containers that run on the node also get killed. You can turn on this feature by setting `yarn.nodemanager.recovery.enabled` to `true` in `yarn-site.xml`
-
-It is also recommended that you change the value of `yarn.nodemanager.recovery.dir` as by default this directory is set to `${hadoop.tmp.dir}/yarn-nm-recovery` where `hadoop.tmp.dir` is set to `/tmp/hadoop-${user.name}` and usually the contents of the `/tmp` directory are not preserved across a reboots.
-
+### Reserving memory for other services
 
-## Configuring host-affinity
-
-When a stateful Samza job is deployed in YARN, the state stores for the tasks are co-located in the current working directory of YARN’s application attempt.
-
-```properties
-container_working_dir=${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}/
-# Data Stores
-ls ${container_working_dir}/state/${store-name}/${task_name}/
-```
+Often, other services including monitoring daemons like Samza-REST run on the same nodes in the YARN cluster. You can configure `yarn.nodemanager.resource.system-reserved-memory-mb` to control the amount of physical memory reserved for non-YARN processes.
 
-This allows the Node Manager’s (NM) DeletionService to clean-up the working directory once the application completes or fails. In order to re-use local state store, the state store needs to be persisted outside the scope of NM’s deletion service. The cluster administrator should set this location as an environment variable in YARN  ( `LOGGED_STORE_BASE_DIR`).
+Another behaviour to keep in mind is that the Resource Manager allocates memory and cpu on the cluster in increments of `yarn.scheduler.minimum-allocation-mb` and `yarn.scheduler.minimum-allocation-vcores`. Hence, requesting allocations that are not multiples of the above configs will cause internal fragmentation.
 
-Since we store the state stores outside of the container’s working directory it is necessary to periodically clean-up unused or orphaned state stores on the machines to manage disk-space. This can be done by running a clean up periodically from the samza-rest service (*LocalStoreMonitor*) that is meant to be deployed on all Node Manager hosts.
 
+### NodeManager work-preserving recovery
 
-## Configuring security
-
-You can run a Samza job on a secure YARN cluster. YARN uses Kerberos as its authentication and authorization mechanism. Take a look at the official YARN [documentation](https://hadoop.apache.org/docs/r2.7.4/hadoop-project-dist/hadoop-common/SecureMode.html) page for more details.
-
-### Delegation token management strategy
-
-One of the challenges for long-lived applications running on a secure YARN cluster is its token renewal strategy. Samza handles this by having the AM periodically re-authenticate itself with the given principal and keytab. It periodically creates new delegation tokens and stores them in a job specific staging directory on HDFS accessible only by the AM and its Containers. In this process each running container will get new delegation tokens from the credentials file on HDFS before the current ones expire. The AM and Containers don’t need to communicate with each other in this process and each side proceeds independently by accessing the tokens on HDFS.
+Often, NMs have to be bounced in the cluster for upgrades or maintenance reasons. By default, bouncing a Node Manager kills all containers running on its host. Work-preserving NM Restart enables NodeManagers to be restarted without losing active containers running on the node. You can turn on this feature by setting `yarn.nodemanager.recovery.enabled` to `true` in `yarn-site.xml`. You should also set `yarn.nodemanager.recovery.dir` to a directory where the NM should store its state needed for recovery.
 
-By default, any HDFS delegation token has a maximum life of 7 days (configured by `dfs.namenode.delegation.token.max-lifetime` in `hdfs-site.xml`) and the token is normally renewed every 24 hours (configured by `dfs.namenode.delegation.token.renew-interval` in `hdfs-site.xml`). What if the Application Master dies and needs restarts after 7 days? The original HDFS delegation token stored in the launcher context will be invalid no matter what. Luckily, Samza can rely on Resource Manager to handle this scenario. See the Configuration section below for details.
+### Configuring state-store directories
 
-### Security Components
+When a stateful Samza job is deployed in YARN, the state stores for the tasks are located in the current working directory of YARN’s attempt. YARN's DeletionService cleans up the working directories after an application exits. To ensure durability of Samza's state, its stores need to be persisted outside the scope of YARN's DeletionService. You can set this location by configuring an environment variable named `LOGGED_STORE_BASE_DIR` across the cluster.
 
-#### SecurityManager
+To manage disk space and clean-up state stores that are no longer necessary, Samza-REST supports periodic, long-running tasks named [monitors](/learn/documentation/{{site.version}}/rest/monitors.html).
 
-When ApplicationMaster starts, it spawns `SamzaAppMasterSecurityManager`, which runs on its separate thread. The `SamzaAppMasterSecurityManager` is responsible for periodically logging in through the given Kerberos keytab and regenerates the HDFS delegation tokens regularly. After each run, it writes new tokens on a pre-defined job specific directory on HDFS. The frequency of this process is determined by `yarn.token.renewal.interval.seconds`.
+### Configuring security
 
-Each container, upon start, runs a `SamzaContainerSecurityManager`. It reads from the credentials file on HDFS and refreshes its delegation tokens at the same interval.
+You can run Samza jobs on a secure YARN cluster. YARN uses Kerberos as its authentication and authorization mechanism. See [this article](https://www.cloudera.com/documentation/enterprise/5-7-x/topics/cdh_sg_yarn_security.html) for details on operating Hadoop in secure mode.
 
-### Security configuration
 
-For the Samza job, the following job configurations are required on a YARN cluster with security enabled.
+#### Management of Kerberos tokens
 
-#### Job
+One challenge for long-running applications on YARN is how they periodically renew their Kerberos tokens. Samza handles this by having the AM periodically create tokens and refresh them in a staging directory on HDFS. This directory is accessible only by the containers of your job. You can set your Kerberos principal and kerberos keytab file as follows:
 
 ```properties
+# Use the SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos delegation tokens when the job is running in a secure environment.
 job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
-```
 
-#### YARN
+# Kerberos principal
+yarn.kerberos.principal=your-principal-name
 
-```properties
-yarn.kerberos.principal=user/localhost
-yarn.kerberos.keytab=/etc/krb5.keytab.user
-yarn.token.renewal.interval.seconds=86400
+# Path of the keytab file (local path)
+yarn.kerberos.keytab=/tmp/keytab
 ```
-Configure the Hadoop cluster to enable Resource Manager to recreate and renew the delegation token on behalf of the application user. This will address the following 2 scenarios.
-
-- When Application Master dies unexpectedly and needs a restart after 7 days (the default maximum lifespan a delegation token can be renewed).
-- When the Samza job terminates and log aggregation is turned on for the job. Node managers need to be able to upload all the local application logs to HDFS.
 
-Enable the resource manager as a privileged user in yarn-site.xml.
+By default, Kerberos tokens on YARN have a maximum life-time of 7 days, beyond which they auto-expire. Often streaming applications are long-running and don't terminate within this life-time. To get around this, you can configure YARN's Resource Manager to automatically re-create tokens on your behalf by setting these configs in your `yarn-site.xml` file. 
 
 ```xml
 <property>
-    <name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
-    <value>true</value>
+<name>hadoop.proxyuser.yarn.hosts</name>
+<value>*</value>
 </property>
-```
-
-Make `yarn` as a proxy user, in `core-site.xml`
 
-```xml
-<property>
-    <name>hadoop.proxyuser.yarn.hosts</name>
-    <value>*</value>
-</property>
 <property>
-    <name>hadoop.proxyuser.yarn.groups</name>
-    <value>*</value>
+<name>hadoop.proxyuser.yarn.groups</name>
+<value>*</value>
 </property>
 ```
+# Samza Coordinator Internals
 
-# Coordinator Internals
+In this section, we will discuss some of implementation internals of the Samza ApplicationMaster (AM). 
 
-The `ClusterBasedJobCoordinator` is used as the control hub for a running Samza job in a cluster like YARN. Among other things it is responsible for bootstrapping configs from the Coordinator Stream on job startup, constructing the JobModel, managing container allocations and handling callbacks from the cluster manager (in YARN’s case the Resource Manager). Just like most other components in the framework, Samza has a plugable interface for managing container allocations and is configured using the key `samza.cluster-manager.factory`.
+The Samza AM is the control-hub for a Samza application running on a YARN cluster. It is responsible for coordinating work assignment across individual containers. It includes the following componeents:
 
+- YARNClusterResourceManager, which handles interactions with YARN and provides APIs for requesting resources and starting containers.
+- ContainerProcessManager, which uses the above APIs to manage Samza containers - including restarting them on failure, ensuring they stay in a healthy state.
 
-The `ClusterBasedJobCoordinator` contains a component called the `ContainerProcessManager` to handle metadata regarding container allocations. It uses the information (eg: host affinity) obtained from configs and the `CoordinatorStream` in order to make container allocation requests to the cluster manager (RM). In the case of YARN the config for `samza.cluster-manager.factory` which encapsulates the Application Master, is configured to `org.apache.samza.job.yarn.YarnResourceManagerFactory` and the `ContainerProcessManager` uses `YarnResourceManager` to interact with the RM.
 
 ![diagram-small](/img/{{site.version}}/learn/documentation/yarn/coordinator-internals.png)
 
+Here's a life-cycle of a Samza job submitted to YARN:
+
+- The `run-app.sh` script is started providing the location of your application's binaries and its config file. The script instantiates an ApplicationRunner, which is the main entry-point responsible for running your application.
+
+- The ApplicationRunner parses your configs and writes them to a special Kafka topic named - the coordinator stream for distributing them. It proceeds to submit a request to YARN to launch your application. 
+
+- The first step in launching any YARN application is starting its Application Master (AM).
+
+- The ResourceManager allocates an available host and starts the Samza AM. 
+
+- The Samza AM is then responsible for managing the overall application. It reads configs from the Coordinator Stream and computes work-assignments for individual containers. 
+
+- It also determines the hosts each container should run on taking data-locality into account. It proceeds to request resources on those nodes using the `YARNClusterResourceManager` APIs.
 
-The following is a walkthrough of the different actions taken when the `run-job.sh` script is run:
-- When the job is submitted using `run-app.sh` the JobRunner invoked as part of this script first writes all the configs to the coordinator stream.
-- The JobRunner then uses the configured StreamJob (YarnJob) to submit the request to start the AM to the RM.
-- The ResourceManager allocates the AM on an available NM and starts the ClusterBasedJobCoordinator.
-- The ClusterBasedJobCoordinator bootstraps the configs written to the Coordinator Stream in step (1) and constructs the JobModel, check for host-affinity if configured and instantiates the ClusterResourceManager (YarnClusterResourceManager).
-- The YarnClusterResourceManager is then used to make requests to the RM to start job.container.count number of containers. The RM then issues callbacks to the process when the containers are allocated.
-- When the containers are returned by the RM, the YarnClusterResourceManager allocates a SamzaContainer ID to the YARN containers to indicate which subset of tasks in the JobModel the YARN container should process on startup.
-- When the containers start up, they read the configs and the JobModel from the configs and use their own SamzaContainer ID and the JobModel to pick specific tasks and start message processing.
+- Once resources have been allocated, it proceeds to start the containers on the allocated hosts.
 
+- When it is started, each container first queries the Samza AM to determine its work-assignments and configs. It then proceeds to execute its assigned tasks. 
 
-During the course of processing message all container failures will result in a callback from the RM to the YarnClusterResourceManager. These callbacks can then be used to request for a new container and restart processing from the last checkpoint, thus making YARN deployments resilient to container failures.
+- The Samza AM periodically monitors each container using heartbeats and ensure they stay alive. 
\ No newline at end of file