You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/11/14 03:40:52 UTC

[9/9] samza git commit: Updated API documentation for high and low level APIs.

Updated API documentation for high and low level APIs.

vjagadish1989 nickpan47 Please take a look.

Author: Prateek Maheshwari <pm...@apache.org>

Reviewers: Jagadish<ja...@apache.org>

Closes #802 from prateekm/api-docs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d034bbef
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d034bbef
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d034bbef

Branch: refs/heads/1.0.0
Commit: d034bbef8532fed46cff9f9ee63ad83bfadbfb9a
Parents: 3e39702
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Tue Nov 13 18:17:17 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:34:00 2018 -0800

----------------------------------------------------------------------
 .../versioned/api/high-level-api.md             | 411 ++++++++++---------
 .../versioned/api/low-level-api.md              | 351 ++++++++--------
 .../versioned/api/programming-model.md          | 118 +++---
 .../documentation/versioned/api/samza-sql.md    |   2 +-
 .../documentation/versioned/api/table-api.md    |  11 +-
 .../versioned/connectors/eventhubs.md           |   2 +-
 .../documentation/versioned/connectors/kafka.md |   6 +-
 .../versioned/core-concepts/core-concepts.md    |  14 +-
 .../documentation/versioned/hadoop/overview.md  |   2 +-
 docs/learn/documentation/versioned/index.html   |  16 +-
 .../versioned/jobs/samza-configurations.md      |   2 +-
 .../versioned/operations/monitoring.md          |  10 +-
 .../versioned/hello-samza-high-level-code.md    |   2 +-
 .../versioned/hello-samza-high-level-yarn.md    |   4 +-
 .../versioned/samza-event-hubs-standalone.md    |   2 +-
 docs/learn/tutorials/versioned/samza-sql.md     |   2 +-
 docs/startup/code-examples/versioned/index.md   |   2 +-
 .../org/apache/samza/task/ClosableTask.java     |   7 +
 18 files changed, 483 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/high-level-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/high-level-api.md b/docs/learn/documentation/versioned/api/high-level-api.md
index 9c13f24..43fd727 100644
--- a/docs/learn/documentation/versioned/api/high-level-api.md
+++ b/docs/learn/documentation/versioned/api/high-level-api.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: High-level API
+title: High Level Streams API
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,155 +19,131 @@ title: High-level API
    limitations under the License.
 -->
 
-# Introduction
+### Table Of Contents
+- [Introduction](#introduction)
+- [Code Examples](#code-examples)
+- [Key Concepts](#key-concepts)
+  - [StreamApplication](#streamapplication)
+  - [MessageStream](#messagestream)
+  - [Table](#table)
+- [Operators](#operators)
+  - [Map](#map)
+  - [FlatMap](#flatmap)
+  - [Filter](#filter)
+  - [PartitionBy](#partitionby)
+  - [Merge](#merge)
+  - [Broadcast](#broadcast)
+  - [SendTo (Stream)](#sendto-stream)
+  - [SendTo (Table)](#sendto-table)
+  - [Sink](#sink)
+  - [Join (Stream-Stream)](#join-stream-stream)
+  - [Join (Stream-Table)](#join-stream-table)
+  - [Window](#window)
+      - [Windowing Concepts](#windowing-concepts)
+      - [Window Types](#window-types)
+- [Operator IDs](#operator-ids)
+- [Data Serialization](#data-serialization)
+- [Application Serialization](#application-serialization)
 
-The high level API provides the libraries to define your application logic. The StreamApplication is the central abstraction which your application must implement. You start by declaring your inputs as instances of MessageStream. Then you can apply operators on each MessageStream like map, filter, window, and join to define the whole end-to-end data processing in a single program.
+### Introduction
 
-Since the 0.13.0 release, Samza provides a new high level API that simplifies your applications. This API supports operations like re-partitioning, windowing, and joining on streams. You can now express your application logic concisely in few lines of code and accomplish what previously required multiple jobs.
-# Code Examples
+Samza's flexible High Level Streams API lets you describe your complex stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of operations on [MessageStream](javadocs/org/apache/samza/operators/MessageStream). It provides a rich set of built-in operators that simplify common stream processing operations such as filtering, projection, repartitioning, stream-stream and stream-table joins, and windowing. 
 
-Check out some examples to see the high-level API in action.
-1. PageView AdClick Joiner demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to analyze which pages get the most ad clicks.
-2. PageView Repartitioner illustrates re-partitioning the incoming stream of PageViews.
-3. PageView Sessionizer groups the incoming stream of events into sessions based on user activity.
-4. PageView by Region counts the number of views per-region over tumbling time intervals.
+### Code Examples
 
-# Key Concepts
-## StreamApplication
-When writing your stream processing application using the Samza high-level API, you implement a StreamApplication and define your processing logic in the describe method.
+[The Samza Cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook) contains various recipes using the Samza High Level Streams API. These include:
 
-{% highlight java %}
-
-    public void describe(StreamApplicationDescriptor appDesc) { … }
-
-{% endhighlight %}
-
-For example, here is a StreamApplication that validates and decorates page views with viewer’s profile information.
-
-{% highlight java %}
-
-    public class BadPageViewFilter implements StreamApplication {
-      @Override
-      public void describe(StreamApplicationDescriptor appDesc) {
-        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
-        InputDescriptor<PageView> pageViewInput = kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
-        OutputDescriptor<DecoratedPageView> outputPageViews = kafka.getOutputDescriptor( “decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class));    
-        MessageStream<PageView> pageViews = appDesc.getInputStream(pageViewInput);
-        pageViews.filter(this::isValidPageView)
-            .map(this::addProfileInformation)
-            .sendTo(appDesc.getOutputStream(outputPageViews));
-      }
-    }
-    
-{% endhighlight %}
+- The [Filter example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) demonstrates how to perform stateless operations on a stream. 
 
-## MessageStream
-A MessageStream, as the name implies, represents a stream of messages. A StreamApplication is described as a series of transformations on MessageStreams. You can get a MessageStream in two ways:
-1. Using StreamApplicationDescriptor.getInputStream to get the MessageStream for a given input stream (e.g., a Kafka topic).
-2. By transforming an existing MessageStream using operations like map, filter, window, join etc.
-## Table
-A Table represents a dataset that can be accessed by keys, and is one of the building blocks of the Samza high level API; the main motivation behind it is to support stream-table joins. The current K/V store is leveraged to provide backing store for local tables. More variations such as direct access and composite tables will be supported in the future. The usage of a table typically follows three steps:
-1. Create a table
-2. Populate the table using the sendTo() operator
-3. Join a stream with the table using the join() operator
+- The [Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java]) demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks
 
-{% highlight java %}
+- The [Stream-Table Join example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java) demonstrates how to use the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.
 
-    final StreamApplication app = (streamAppDesc) -> {
-      Table<KV<Integer, Profile>> table = streamAppDesc.getTable(new InMemoryTableDescriptor("t1")
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      ...
-    };
+- The [SessionWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java) and [TumblingWindow](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java) examples illustrate Samza's rich windowing and triggering capabilities.
 
-{% endhighlight %}
-
-Example above creates a TableDescriptor object, which contains all information about a table. The currently supported table types are InMemoryTableDescriptor and RocksDbTableDescriptor. Notice the type of records in a table is KV, and Serdes for both key and value of records needs to be defined (line 4). Additional parameters can be added based on individual table types.
+### Key Concepts
+#### StreamApplication
+A [StreamApplication](javadocs/org/apache/samza/application/StreamApplication) describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza's High Level Streams API.
 
-More details about step 2 and 3 can be found at [operator section](#operators).
+A typical StreamApplication implementation consists of the following stages:
 
-# Anatomy of a typical StreamApplication
-There are 3 simple steps to write your stream processing logic using the Samza high-level API.
-## Step 1: Obtain the input streams
-You can obtain the MessageStream for your input stream ID (“page-views”) using StreamApplicationDescriptor.getInputStream.
+ 1. Configuring the inputs, outputs and state (tables) using the appropriate [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor)s, [InputDescriptor](javadocs/org/apache/samza/descriptors/InputDescriptor)s, [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor)s and [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor)s.
+ 2. Obtaining the corresponding [MessageStream](javadocs/org/apache/samza/operators/MessageStream)s, [OutputStream](javadocs/org/apache/samza/operators/OutputStream)s and [Table](javadocs/org/apache/samza/table/Table)s from the provided [StreamApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor)
+ 3. Defining the processing logic using operators and functions on the streams and tables thus obtained.
 
+The following example StreamApplication removes page views older than 1 hour from the input stream:
+ 
 {% highlight java %}
-
-    KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka")
-        .withConsumerZkConnect(ImmutableList.of("localhost:2181"))
-        .withProducerBootstrapServers(ImmutableList.of("localhost:9092"));
-
-    KafkaInputDescriptor<KV<String, Integer>> pageViewInput =
-        sd.getInputDescriptor("page-views", KVSerde.of(new StringSerde(), new JsonSerdeV2(PageView.class)));
-    
-    MessageStream<PageView> pageViews = streamAppDesc.getInputStream(pageViewInput);
-
+   
+    public class PageViewFilter implements StreamApplication {
+      public void describe(StreamApplicationDescriptor appDescriptor) {
+        // Step 1: configure the inputs and outputs using descriptors
+        KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka")
+            .withConsumerZkConnect(ImmutableList.of("..."))
+            .withProducerBootstrapServers(ImmutableList.of("...", "..."));
+        KafkaInputDescriptor<PageViewEvent> kid = 
+            ksd.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+        KafkaOutputDescriptor<PageViewEvent>> kod = 
+            ksd.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
+  
+        // Step 2: obtain the message strems and output streams 
+        MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(kid);
+        OutputStream<PageViewEvent> recentPageViewEvents = appDescriptor.getOutputStream(kod);
+  
+        // Step 3: define the processing logic
+        pageViewEvents
+            .filter(m -> m.getCreationTime() > 
+                System.currentTimeMillis() - Duration.ofHours(1).toMillis())
+            .sendTo(recentPageViewEvents);
+      }
+    }
+  
 {% endhighlight %}
 
-The parameter {% highlight java %}pageViewInput{% endhighlight %} is the [InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html). Each InputDescriptor includes the full information of an input stream, including the stream ID, the serde to deserialize the input messages, and the system. By default, Samza uses the stream ID as the physical stream name and accesses the stream on the default system which is specified with the property “job.default.system”. However, the physical name and system properties can be overridden in configuration. For example, the following configuration defines the stream ID “page-views” as an alias for the PageViewEvent topic in a local Kafka cluster.
 
-{% highlight jproperties %}
+#### MessageStream
+A [MessageStream](javadocs/org/apache/samza/operators/MessageStream), as the name implies, represents a stream of messages. A StreamApplication is described as a Directed Acyclic Graph (DAG) of transformations on MessageStreams. You can get a MessageStream in two ways:
 
-    streams.page-views.samza.physical.name=PageViewEvent
+1. Calling StreamApplicationDescriptor#getInputStream() with an [InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor) obtained from a [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor).
+2. By transforming an existing MessageStream using operators like map, filter, window, join etc.
 
-{% endhighlight %}
+#### Table
+A [Table](javadocs/org/apache/samza/table/Table) is an abstraction for data sources that support random access by key. It is an evolution of the older [KeyValueStore](javadocs/org/apache/samza/storage/kv/KeyValueStore) API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a [ReadableTable](javadocs/org/apache/samza/table/ReadableTable) or a [ReadWriteTable](javadocs/org/apache/samza/table/ReadWriteTable).
+ 
+In the High Level Streams API, you can obtain and use a Table as follows:
 
-## Step 2: Define your transformation logic
-You are now ready to define your StreamApplication logic as a series of transformations on MessageStreams.
+1. Use the appropriate TableDescriptor to specify the table properties.
+2. Register the TableDescriptor with the StreamApplicationDescriptor. This returns a Table reference, which can be used for populate the table using the [Send To Table](#sendto-table) operator, or for joining a stream with the table using the [Stream-Table Join](#join-stream-table) operator.
+3. Alternatively, you can obtain a Table reference within an operator's [InitableFunction](javadocs/org/apache/samza/operators/functions/InitableFunction) using the provided [TaskContext](javadocs/org/apache/samza/context/TaskContext).
 
-{% highlight java %}
-
-    MessageStream<DecoratedPageViews> decoratedPageViews = pageViews.filter(this::isValidPageView)
-        .map(this::addProfileInformation);
-
-{% endhighlight %}
+### Operators
+The High Level Streams API provides common operations like map, flatmap, filter, merge, broadcast, joins, and windows on MessageStreams. Most of these operators accept their corresponding Functions as an argument. 
 
-## Step 3: Write to output streams
-
-Finally, you can create an OutputStream using StreamApplicationDescriptor.getOutputStream and send the transformed messages through it.
+#### Map
+Applies the provided 1:1 [MapFunction](javadocs/org/apache/samza/operators/functions/MapFunction) to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).
 
 {% highlight java %}
 
-    KafkaOutputDescriptor<DecoratedPageViews> outputPageViews =
-        sd.getInputDescriptor("page-views", new JsonSerdeV2(DecoratedPageViews.class));
-  
-    // Send messages with userId as the key to “decorated-page-views”.
-    decoratedPageViews.sendTo(streamAppDesc.getOutputStream(outputPageViews));
-
-{% endhighlight %}
-
-The parameter {% highlight java %}outputPageViews{% endhighlight %} is the [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html), which includes the stream ID, the serde to serialize the outgoing messages, the physical name and the system. Similarly, the properties for this stream can be overridden just like the stream IDs for input streams. For example:
-
-{% highlight jproperties %}
-
-    streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
-
-{% endhighlight %}
-
-# Operators
-The high level API supports common operators like map, flatmap, filter, merge, joins, and windowing on streams. Most of these operators accept corresponding Functions, which are Initable and Closable.
-## Map
-Applies the provided 1:1 MapFunction to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).
-
-{% highlight java %}
-    
     MessageStream<Integer> numbers = ...
     MessageStream<Integer> tripled = numbers.map(m -> m * 3);
     MessageStream<String> stringified = numbers.map(m -> String.valueOf(m));
 
 {% endhighlight %}
-## FlatMap
-Applies the provided 1:n FlatMapFunction to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.
+
+#### FlatMap
+Applies the provided 1:n [FlatMapFunction](javadocs/org/apache/samza/operators/functions/FlatMapFunction) to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.
 
 {% highlight java %}
     
     MessageStream<String> sentence = ...
-    // Parse the sentence into its individual words splitting by space
+    // Parse the sentence into its individual words splitting on space
     MessageStream<String> words = sentence.flatMap(sentence ->
         Arrays.asList(sentence.split(“ ”))
 
 {% endhighlight %}
-## Filter
-Applies the provided FilterFunction to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
+
+#### Filter
+Applies the provided [FilterFunction](javadocs/org/apache/samza/operators/functions/FilterFunction) to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
 
 {% highlight java %}
     
@@ -178,91 +154,109 @@ Applies the provided FilterFunction to the MessageStream and returns the filtere
     MessageStream<String> shortWords = words.filter(word -> word.size() < 3);
     
 {% endhighlight %}
-## PartitionBy
+
+#### PartitionBy
 Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.
+
 {% highlight java %}
     
     MessageStream<PageView> pageViews = ...
-    // Repartition pageView by userId.
-    MessageStream<KV<String, PageView>> partitionedPageViews = pageViews.partitionBy(
-        pageView -> pageView.getUserId(), // key extractor
-        pageView -> pageView, // value extractor
-        KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)), // serdes
-        "partitioned-page-views"); // operator ID    
+    
+    // Repartition PageViews by userId.
+    MessageStream<KV<String, PageView>> partitionedPageViews = 
+        pageViews.partitionBy(
+            pageView -> pageView.getUserId(), // key extractor
+            pageView -> pageView, // value extractor
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)), // serdes
+            "partitioned-page-views"); // operator ID    
         
 {% endhighlight %}
 
-The operator ID should be unique for an operator within the application and is used to identify the streams and stores created by the operator.
+The operator ID should be unique for each operator within the application and is used to identify the streams and stores created by the operator.
 
-## Merge
+#### Merge
 Merges the MessageStream with all the provided MessageStreams and returns the merged stream.
 {% highlight java %}
     
-    MessageStream<ServiceCall> serviceCall1 = ...
-    MessageStream<ServiceCall> serviceCall2 = ...
-    // Merge individual “ServiceCall” streams and create a new merged MessageStream
-    MessageStream<ServiceCall> serviceCallMerged = serviceCall1.merge(serviceCall2);
+    MessageStream<LogEvent> log1 = ...
+    MessageStream<LogEvent> log2 = ...
+    
+    // Merge individual “LogEvent” streams and create a new merged MessageStream
+    MessageStream<LogEvent> mergedLogs = log1.merge(log2);
+    
+    // Alternatively, use mergeAll to merge multiple streams
+    MessageStream<LogEvent> mergedLogs = MessageStream.mergeAll(ImmutableList.of(log1, log2, ...));
     
 {% endhighlight %}
 
-The merge transform preserves the order of each MessageStream, so if message {% highlight java %}m1{% endhighlight %} appears before {% highlight java %}m2{% endhighlight %} in any provided stream, then, {% highlight java %}m1{% endhighlight %} also appears before {% highlight java %}m2{% endhighlight %} in the merged stream.
-As an alternative to the merge instance method, you also can use the [MessageStream#mergeAll](javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-) static method to merge MessageStreams without operating on an initial stream.
+The merge transform preserves the order of messages within each MessageStream. If message <code>m1</code> appears before <code>m2</code> in any provided stream, then, <code>m1</code> will also appears before <code>m2</code> in the merged stream.
+
+#### Broadcast
+Broadcasts the contents of the MessageStream to every *instance* of downstream operators via an intermediate stream.
 
-## Broadcast
-Broadcasts the MessageStream to all instances of down-stream transformation operators via the intermediate stream.
 {% highlight java %}
 
-    MessageStream<VersionChange> verChanges = ...
-    // Broadcast input data version change event to all operator instances.
-    MessageStream<VersionChange> broadcastVersionChanges = 
-        verChanges.broadcast(new JsonSerdeV2<>(VersionChange.class), // serde
-                             "broadcast-version-changes"); // operator ID
+    MessageStream<VersionChange> versionChanges = ...
+    
+    // Broadcast version change event to all downstream operator instances.
+    versionChanges
+        .broadcast(
+            new JsonSerdeV2<>(VersionChange.class), // serde
+            "version-change-broadcast"); // operator ID
+        .map(vce -> /* act on version change event in each instance */ );
+         
 {% endhighlight %}
 
-## SendTo(Stream)
-Sends all messages from this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing message.
+#### SendTo (Stream)
+Sends all messages in this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing messages.
 
 {% highlight java %}
     
-    // Output a new message with userId as the key and region as the value to the “user-region” stream.
-    OutputDescriptor<KV<String, String>> outputRegions = 
-        kafka.getOutputDescriptor(“user-region”, KVSerde.of(new StringSerde(), new StringSerde());
+    // Obtain the OutputStream using an OutputDescriptor
+    KafkaOutputDescriptor<KV<String, String>> kod = 
+        ksd.getOutputDescriptor(“user-country”, KVSerde.of(new StringSerde(), new StringSerde());
+    OutputStream<KV<String, String>> userCountries = appDescriptor.getOutputStream(od)
+    
     MessageStream<PageView> pageViews = ...
-    MessageStream<KV<String, PageView>> keyedPageViews = pageViews.map(KV.of(pageView.getUserId(), pageView.getRegion()));
-    keyedPageViews.sendTo(appDesc.getOutputStream(outputRegions));
+    // Send a new message with userId as the key and their country as the value to the “user-country” stream.
+    pageViews
+      .map(pageView -> KV.of(pageView.getUserId(), pageView.getCountry()));
+      .sendTo(userCountries);
 
 {% endhighlight %}
-## SendTo(Table)
-Sends all messages from this MessageStream to the provided table, the expected message type is KV.
+
+#### SendTo (Table)
+Sends all messages in this MessageStream to the provided Table. The expected message type is [KV](javadocs/org/apache/samza/operators/KV).
 
 {% highlight java %}
+
+    MessageStream<Profile> profilesStream = ...
+    Table<KV<Long, Profile>> profilesTable = 
     
-    // Write a new message with memberId as the key and profile as the value to a table.
-    appDesc.getInputStream(kafka.getInputDescriptor("Profile", new NoOpSerde<Profile>()))
-        .map(m -> KV.of(m.getMemberId(), m))
-        .sendTo(table);
+    profilesStream
+        .map(profile -> KV.of(profile.getMemberId(), profile))
+        .sendTo(profilesTable);
         
 {% endhighlight %}
 
-## Sink
+#### Sink
 Allows sending messages from this MessageStream to an output system using the provided [SinkFunction](javadocs/org/apache/samza/operators/functions/SinkFunction.html).
 
-This offers more control than {% highlight java %}sendTo{% endhighlight %} since the SinkFunction has access to the MessageCollector and the TaskCoordinator. For instance, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. remote databases, REST services, etc.)
+This offers more control than [SendTo (Stream)](#sendto-stream) since the SinkFunction has access to the MessageCollector and the TaskCoordinator. For example, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. a remote databases, REST services, etc.)
 
 {% highlight java %}
     
-    // Repartition pageView by userId.
     MessageStream<PageView> pageViews = ...
-    pageViews.sink( (msg, collector, coordinator) -> {
+    
+    pageViews.sink((msg, collector, coordinator) -> {
         // Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.
-        collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”,
-                       “TransformedPageViewEvent”), msg));
+        collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”, “TransformedPageViewEvent”), msg));
     } );
         
 {% endhighlight %}
 
-## Join(Stream-Stream)
-The stream-stream Join operator joins messages from two MessageStreams using the provided pairwise [JoinFunction](javadocs/org/apache/samza/operators/functions/JoinFunction.html). Messages are joined when the keys extracted from messages from the first stream match keys extracted from messages in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found.
+#### Join (Stream-Stream)
+The Stream-Stream Join operator joins messages from two MessageStreams using the provided pairwise [JoinFunction](javadocs/org/apache/samza/operators/functions/JoinFunction.html). Messages are joined when the key extracted from a message from the first stream matches the key extracted from a message in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found. Join only retains the latest message for each input stream.
 
 {% highlight java %}
     
@@ -271,7 +265,9 @@ The stream-stream Join operator joins messages from two MessageStreams using the
     MessageStream<OrderRecord> orders = …
     MessageStream<ShipmentRecord> shipments = …
 
-    MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(),
+    MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(
+        shipments, // other stream
+        new OrderShipmentJoiner(), // join function
         new StringSerde(), // serde for the join key
         new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), // serde for both streams
         Duration.ofMinutes(20), // join TTL
@@ -297,18 +293,18 @@ The stream-stream Join operator joins messages from two MessageStreams using the
     
 {% endhighlight %}
 
-## Join(Stream-Table)
-The stream-table Join operator joins messages from a MessageStream using the provided [StreamTableJoinFunction](javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html). Messages from the input stream are joined with record in table using key extracted from input messages. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided; the join function can choose to return null (inner join) or an output message (left outer join). For join to function properly, it is important to ensure the input stream and table are partitioned using the same key as this impacts the physical placement of data.
+#### Join (Stream-Table)
+The Stream-Table Join operator joins messages from a MessageStream with messages in a Table using the provided [StreamTableJoinFunction](javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html). Messages are joined when the key extracted from a message in the stream matches the key for a record in the table. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided. The join function can choose to return null for an inner join, or an output message for a left outer join. For join correctness, it is important to ensure the input stream and table are partitioned using the same key (e.g., using the partitionBy operator) as this impacts the physical placement of data.
 
 {% highlight java %}
-   
-    streamAppDesc.getInputStream(kafk.getInputDescriptor("PageView", new NoOpSerde<PageView>()))
-        .partitionBy(PageView::getMemberId, v -> v, "p1")
-        .join(table, new PageViewToProfileJoinFunction())
+
+    pageViews
+        .partitionBy(pv -> pv.getMemberId, pv -> pv, "page-views-by-memberid")
+        .join(profiles, new PageViewToProfileTableJoiner())
         ...
     
-    public class PageViewToProfileJoinFunction implements StreamTableJoinFunction
-        <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
+    public class PageViewToProfileTableJoiner implements 
+        StreamTableJoinFunction<Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
       
       @Override
       public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) {
@@ -325,10 +321,11 @@ The stream-table Join operator joins messages from a MessageStream using the pro
         return record.getKey();
       }
     }
+    
 {% endhighlight %}
 
-## Window
-### Windowing Concepts
+### Window
+#### Windowing Concepts
 **Windows, Triggers, and WindowPanes**: The window operator groups incoming messages in the MessageStream into finite windows. Each emitted result contains one or more messages in the window and is called a WindowPane.
 
 A window can have one or more associated triggers which determine when results from the window are emitted. Triggers can be either [early triggers](javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-) that allow emitting results speculatively before all data for the window has arrived, or late triggers that allow handling late messages for the window.
@@ -341,8 +338,8 @@ A discarding window clears all state for the window at every emission. Each emis
 
 An accumulating window retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.
 
-### Window Types
-The Samza high-level API currently supports tumbling and session windows.
+#### Window Types
+The Samza High Level Streams API currently supports tumbling and session windows.
 
 **Tumbling Window**: A tumbling window defines a series of contiguous, fixed size time intervals in the stream.
 
@@ -350,20 +347,26 @@ Examples:
 
 {% highlight java %}
     
-    // Group the pageView stream into 3 second tumbling windows keyed by the userId.
+    // Group the pageView stream into 30 second tumbling windows keyed by the userId.
     MessageStream<PageView> pageViews = ...
-    MessageStream<WindowPane<String, Collection<PageView>>> =
-        pageViews.window(
-            Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), // key extractor
+    MessageStream<WindowPane<String, Collection<PageView>>> = pageViews.window(
+        Windows.keyedTumblingWindow(
+            pageView -> pageView.getUserId(), // key extractor
             Duration.ofSeconds(30), // window duration
             new StringSerde(), new JsonSerdeV2<>(PageView.class)));
 
-    // Compute the maximum value over tumbling windows of 3 seconds.
+    // Compute the maximum value over tumbling windows of 30 seconds.
     MessageStream<Integer> integers = …
     Supplier<Integer> initialValue = () -> Integer.MIN_VALUE;
-    FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue);
-    MessageStream<WindowPane<Void, Integer>> windowedStream =
-        integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction, new IntegerSerde()));
+    FoldLeftFunction<Integer, Integer> aggregateFunction = 
+        (msg, oldValue) -> Math.max(msg, oldValue);
+    
+    MessageStream<WindowPane<Void, Integer>> windowedStream = integers.window(
+       Windows.tumblingWindow(
+            Duration.ofSeconds(30), 
+            initialValue, 
+            aggregateFunction, 
+            new IntegerSerde()));
    
 {% endhighlight %}
 
@@ -378,39 +381,53 @@ Examples:
     Supplier<Integer> initialValue = () -> 0
     FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> oldCount + 1;
     Duration sessionGap = Duration.ofMinutes(3);
-    MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(Windows.keyedSessionWindow(
-        pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator,
+    
+    MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(
+        Windows.keyedSessionWindow(
+            pageView -> pageView.getUserId(), 
+            sessionGap, 
+            initialValue, 
+            countAggregator,
             new StringSerde(), new IntegerSerde()));
 
-    // Compute the maximum value over tumbling windows of 3 seconds.
+    // Compute the maximum value over tumbling windows of 30 seconds.
     MessageStream<Integer> integers = …
     Supplier<Integer> initialValue = () -> Integer.MAX_INT
-
     FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
-    MessageStream<WindowPane<Void, Integer>> windowedStream =
-       integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction,
-           new IntegerSerde()));
+    
+    MessageStream<WindowPane<Void, Integer>> windowedStream = integers.window(
+        Windows.tumblingWindow(
+            Duration.ofSeconds(30), 
+            initialValue, 
+            aggregateFunction,
+            new IntegerSerde()));
          
 {% endhighlight %}
 
-# Operator IDs
-Each operator in your application is associated with a globally unique identifier. By default, each operator is assigned an ID based on its usage in the application. Some operators that create and use external resources (e.g., intermediate streams for partitionBy and broadcast, stores and changelogs for joins and windows, etc.) require you to provide an explicit ID for them. It's highly recommended to provide meaningful IDs for such operators. These IDs help you control the underlying resources when you make changes to the application logic that change the position of the operator within the DAG, and
+### Operator IDs
+Each operator in the StreamApplication is associated with a globally unique identifier. By default, each operator is assigned an ID by the framework based on its position in the operator DAG for the application. Some operators that create and use external resources require you to provide an explicit ID for them. Examples of such operators are partitionBy and broadcast with their intermediate streams, and window and join with their local stores and changelogs. It's strongly recommended to provide meaningful IDs for such operators. 
+
+These IDs help you manage the underlying resources when you make changes to the application logic that change the position of the operator within the DAG and:
+
 1. You wish to retain the previous state for the operator, since the changes to the DAG don't affect the operator semantics. For example, you added a map operator before a partitionBy operator to log the incoming message. In this case, you can retain previous the operator ID.
+
 2. You wish to discard the previous state for the operator, since the changes to the DAG change the operator semantics. For example, you added a filter operator before a partitionBy operator that discards some of the messages. In this case, you should change the operator ID. Note that by doing so you will lose any previously checkpointed messages that haven't been completely processed by the downstream operators yet.
 
 An operator ID is of the format: **jobName-jobId-opCode-opId**
-- **jobName** is the name of your job, as specified using the configuration "job.name"
-- **jobId** is the name of your job, as specified using the configuration "job.id"
-- **opCode** is an identifier for the type of the operator, e.g. map/filter/join
+
+- **jobName** is the name of your application, as specified using the configuration "app.name"
+- **jobId** is the id of your application, as specified using the configuration "app.id"
+- **opCode** is a pre-defined identifier for the type of the operator, e.g. map/filter/join
 - **opId** is either auto-generated by the framework based on the position of the operator within the DAG, or can be provided by you for operators that manage external resources.
 
-# Application Serialization
-Samza relies on Java Serialization to distribute your application logic to the processors. For this to work, all of your custom application logic needs to be Serializable. For example, all the Function interfaces implement Serializable, and your implementations need to be serializable as well. It's recommended to use the Context APIs to set up any non-serializable context that your Application needs at Runtime.
+### Data Serialization
+Producing data to and consuming data from streams and tables require serializing and de-serializing it. In addition, some stateful operators like joins and windows store data locally for durability across restarts. Such operations require you to provide a [Serde](javadocs/org/apache/samza/serializers/Serde) implementation when using them. This also helps Samza infer the type of the data in your application, thus allowing the operator transforms to be checked for type safety at compile time. Samza provides the following Serde implementations that you can use out of the box:
 
-# Data Serialization
-Producing and consuming from streams and tables require serializing and deserializing data. In addition, some operators like joins and windows store data in a local store for durability across restarts. Such operations require you to provide a Serde implementation when using them. This also helps Samza infer the type of the data in your application, thus allowing the operator transforms to be type safe. Samza provides the following Serde implementations that you can use out of the box:
+- Common Types: Serdes for common Java data types, such as ByteBuffer, Double, Long, Integer, Byte, String.
+- [SerializableSerde](javadocs/org/apache/samza/serializers/SerializableSerde): A Serde for Java classes that implement the java.io.Serializable interface. 
+- [JsonSerdeV2](javadocs/org/apache/samza/serializers/JsonSerdeV2): a Jackson based type safe JSON Serde that allows serializing from and deserializing to a POJO.
+- [KVSerde](javadocs/org/apache/samza/serializers/KVSerde): A pair of Serdes, first for the keys, and the second for the values in the incoming/outgoing message, a table record, or a [KV](javadocs/org/apache/samza/operators/KV) object.
+- [NoOpSerde](javadocs/org/apache/samza/serializers/NoOpSerde): A marker serde that indicates that the framework should not attempt any serialization/deserialization of the data. This is useful in some cases where the SystemProducer or SystemConsumer handles serialization and deserialization of the data itself.
 
-- KVSerde: A pair of Serdes, first for the keys, and the second for the values in the incoming/outgoing message or a table record.
-- NoOpSerde: A serde implementation that indicates that the framework should not attempt any serialization/deserialization of the data. Useful in some cases when the SystemProducer/SystemConsumer handle serialization/deserialization themselves.
-- JsonSerdeV2: a type-specific Json serde that allows directly deserializing the Json bytes into to specific POJO type.
-- Serdes for primitive types: serdes for primitive types, such as ByteBuffer, Double, Long, Integer, Byte, String, etc.
+### Application Serialization
+Samza uses Java Serialization to distribute an application's processing logic to the processors. For this to work, all application logic, including any Function implementations passed to operators, needs to be serializable. If you need to use any non-serializable objects at runtime, you can use the [ApplicationContainerContext](javadocs/org/apache/samza/context/ApplicationContainerContext) and [ApplicationTaskContext](javadocs/org/apache/samza/context/ApplicationContainerContext) APIs to manage their lifecycle.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d034bbef/docs/learn/documentation/versioned/api/low-level-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/low-level-api.md b/docs/learn/documentation/versioned/api/low-level-api.md
index e91f74e..34d2b06 100644
--- a/docs/learn/documentation/versioned/api/low-level-api.md
+++ b/docs/learn/documentation/versioned/api/low-level-api.md
@@ -19,291 +19,268 @@ title: Low level Task API
    limitations under the License.
 -->
 
+### Table Of Contents
+- [Introduction](#introduction)
+- [Code Examples](#code-examples)
+- [Key Concepts](#key-concepts)
+  - [TaskApplication](#taskapplication)
+  - [TaskFactory](#taskfactory)
+  - [Task Interfaces](#task-interfaces)
+      - [StreamTask](#streamtask)
+      - [AsyncStreamTask](#asyncstreamtask)
+      - [Additional Task Interfaces](#additional-task-interfaces)
+          - [InitableTask](#initabletask)
+          - [ClosableTask](#closabletask)
+          - [WindowableTask](#windowabletask)
+          - [EndOfStreamListenerTask](#endofstreamlistenertask) 
+- [Common Operations](#common-operations)
+  - [Receiving Messages from Input Streams](#receiving-messages-from-input-streams)
+  - [Sending Messages to Output Streams](#sending-messages-to-output-streams)
+  - [Accessing Tables](#accessing-tables)
+- [Legacy Applications](#legacy-applications)
 
-# Introduction
-Task APIs (i.e. [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html)) are bare-metal interfaces that exposes the system implementation details in Samza. When using Task APIs, you will implement your application as a [TaskApplication](javadocs/org/apache/samza/application/TaskApplication.html). The main difference between a TaskApplication and a StreamApplication is the APIs used to describe the processing logic. In TaskApplication, the processing logic is defined via StreamTask and AsyncStreamTask.
+### Introduction
+Samza's powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. When using the Low Level Task API, you implement a [TaskApplication](javadocs/org/apache/samza/application/TaskApplication). The processing logic is defined as either a [StreamTask](javadocs/org/apache/samza/task/StreamTask) or an [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask).
 
-# Key Concepts
 
-## TaskApplication
-Here is an example of a user implemented TaskApplication:
+### Code Examples
 
-{% highlight java %}
-    
-    package com.example.samza;
+The [Hello Samza](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/task/application) Wikipedia applications demonstrate how to use Samza's Low Level Task API. These applications consume various events from Wikipedia, transform them, and calculates several statistics about them.  
 
-    public class BadPageViewFilter implements TaskApplication {
-      @Override
-      public void describe(TaskApplicationDescriptor appDesc) {
-        // Add input, output streams and tables
-        KafkaSystemDescriptor<String, PageViewEvent> kafkaSystem = 
-            new KafkaSystemDescriptor(“kafka”)
-              .withConsumerZkConnect(myZkServers)
-              .withProducerBootstrapServers(myBrokers);
-        KVSerde<String, PageViewEvent> serde = 
-            KVSerde.of(new StringSerde(), new JsonSerdeV2<PageViewEvent>());
-        // Add input, output streams and tables
-        appDesc.withInputStream(kafkaSystem.getInputDescriptor(“pageViewEvent”, serde))
-            .withOutputStream(kafkaSystem.getOutputDescriptor(“goodPageViewEvent”, serde))
-            .withTable(new RocksDBTableDescriptor(
-                “badPageUrlTable”, KVSerde.of(new StringSerde(), new IntegerSerde())
-            .withTaskFactory(new BadPageViewTaskFactory());
-      }
-    }
+- The [WikipediaFeedTaskApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java) demonstrates how to consume multiple Wikipedia event streams and merge them to an Apache Kafka topic. 
 
-{% endhighlight %}
+- The [WikipediaParserTaskApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) demonstrates how to project the incoming events from the Apache Kafka topic to a custom JSON data type.
 
-In the above example, user defines the input stream, the output stream, and a RocksDB table for the application, and then provide the processing logic defined in BadPageViewTaskFactory. All descriptors (i.e. input/output streams and tables) and the [TaskFactory](javadocs/org/apache/samza/task/TaskFactory.html) are registered to the [TaskApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor.html).
+- The [WikipediaStatsTaskApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java) demonstrates how to calculate and emit periodic statistics about the incoming events while using a local KV store for durability.
 
-## TaskFactory
-You will need to implement a [TaskFactory](javadocs/org/apache/samza/task/TaskFactory.html) to create task instances to execute user defined processing logic. Correspondingly, StreamTaskFactory and AsyncStreamTaskFactory are used to create StreamTask and AsyncStreamTask respectively. The [StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory.html) for the above example is shown below:
+### Key Concepts
 
-{% highlight java %}
+#### TaskApplication
+
+A [TaskApplication](javadocs/org/apache/samza/application/TaskApplication) describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza's Low Level Task API.
+
+A typical TaskApplication implementation consists of the following stages:
 
-    package com.example.samza;
+ 1. Configuring the inputs, outputs and state (tables) using the appropriate [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor)s, [InputDescriptor](javadocs/org/apache/samza/descriptors/InputDescriptor)s, [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor)s and [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor)s.
+ 2. Adding the descriptors above to the provided [TaskApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor)
+ 3. Defining the processing logic in a [StreamTask](javadocs/org/apache/samza/task/StreamTask) or an [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask) implementation, and adding its corresponding [StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory) or [AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory) to the TaskApplicationDescriptor.
 
-    public class BadPageViewTaskFactory implements StreamTaskFactory {
+The following example TaskApplication removes page views with "bad URLs" from the input stream:
+ 
+{% highlight java %}
+    
+    public class PageViewFilter implements TaskApplication {
       @Override
-      public StreamTask createInstance() {
-        // Add input, output streams and tables
-        return new BadPageViewFilterTask();
+      public void describe(TaskApplicationDescriptor appDescriptor) {
+        // Step 1: configure the inputs and outputs using descriptors
+        KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka")
+            .withConsumerZkConnect(ImmutableList.of("..."))
+            .withProducerBootstrapServers(ImmutableList.of("...", "..."));
+        KafkaInputDescriptor<PageViewEvent> kid = 
+            ksd.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+        KafkaOutputDescriptor<PageViewEvent>> kod = 
+            ksd.getOutputDescriptor("goodPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
+        RocksDbTableDescriptor badUrls = 
+            new RocksDbTableDescriptor(“badUrls”, KVSerde.of(new StringSerde(), new IntegerSerde());
+            
+        // Step 2: Add input, output streams and tables
+        appDescriptor
+            .withInputStream(kid)
+            .withOutputStream(kod)
+            .withTable(badUrls)
+        
+        // Step 3: define the processing logic
+        appDescriptor.withTaskFactory(new PageViewFilterTaskFactory());
       }
     }
-    
+
 {% endhighlight %}
 
-Similarly, here is an example of [AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory.html):
+#### TaskFactory
+Your [TaskFactory](javadocs/org/apache/samza/task/TaskFactory) will be  used to create instances of your Task in each of Samza's processors. If you're implementing a StreamTask, you can provide a [StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory). Similarly, if you're implementing an AsyncStreamTask, you can provide an [AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory). For example:
 
 {% highlight java %}
-    
-    package com.example.samza;
 
-    public class BadPageViewAsyncTaskFactory implements AsyncStreamTaskFactory {
+    public class PageViewFilterTaskFactory implements StreamTaskFactory {
       @Override
-      public AsyncStreamTask createInstance() {
-        // Add input, output streams and tables
-        return new BadPageViewAsyncFilterTask();
+      public StreamTask createInstance() {
+        return new PageViewFilterTask();
       }
     }
+    
 {% endhighlight %}
 
-## Task classes
+#### Task Interfaces
 
-The actual processing logic is implemented in [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) classes.
+Your processing logic can be implemented in a [StreamTask](javadocs/org/apache/samza/task/StreamTask) or an [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask).
 
-### StreamTask
-You should implement [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) for synchronous process, where the message processing is complete after the process method returns. An example of StreamTask is a computation that does not involve remote calls:
+##### StreamTask
+You can implement a [StreamTask](javadocs/org/apache/samza/task/StreamTask) for synchronous message processing. Samza delivers messages to the task one at a time, and considers each message to be processed when the process method call returns. For example:
 
 {% highlight java %}
-    
-    package com.example.samza;
 
-    public class BadPageViewFilterTask implements StreamTask {
+    public class PageViewFilterTask implements StreamTask {
       @Override
-      public void process(IncomingMessageEnvelope envelope,
-                          MessageCollector collector,
-                          TaskCoordinator coordinator) {
-        // process message synchronously
+      public void process(
+          IncomingMessageEnvelope envelope, 
+          MessageCollector collector, 
+          TaskCoordinator coordinator) {
+          
+          // process the message in the envelope synchronously
       }
     }
+
 {% endhighlight %}
 
-### AsyncStreamTask
-The [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) interface, on the other hand, supports asynchronous process, where the message processing may not be complete after the processAsync method returns. Various concurrent libraries like Java NIO, ParSeq and Akka can be used here to make asynchronous calls, and the completion is marked by invoking the [TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html). Samza will continue to process next message or shut down the container based on the callback status. An example of AsyncStreamTask is a computation that make remote calls but don’t block on the call completion:
+Note that synchronous message processing does not imply sequential execution. Multiple instances of your Task class implementation may still run concurrently within a container. 
+
+##### AsyncStreamTask
+You can implement a [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask) for asynchronous message processing. This can be useful when you need to perform long running I/O operations to process a message, e.g., making an http request. For example:
 
 {% highlight java %}
-    
-    package com.example.samza;
 
-    public class BadPageViewAsyncFilterTask implements AsyncStreamTask {
+    public class AsyncPageViewFilterTask implements AsyncStreamTask {
       @Override
       public void processAsync(IncomingMessageEnvelope envelope,
-                               MessageCollector collector,
-                               TaskCoordinator coordinator,
-                               TaskCallback callback) {
-        // process message with asynchronous calls
-        // fire callback upon completion, e.g. invoking callback from asynchronous call completion thread
+          MessageCollector collector,
+          TaskCoordinator coordinator,
+          TaskCallback callback) {
+          
+          // process message asynchronously
+          // invoke callback.complete or callback.failure upon completion
       }
     }
+
 {% endhighlight %}
 
-# Runtime Objects
+Samza delivers the incoming message and a [TaskCallback](javadocs/org/apache/samza/task/TaskCallback) with the processAsync() method call, and considers each message to be processed when its corresponding callback.complete() or callback.failure() has been invoked. If callback.failure() is invoked, or neither callback.complete() or callback.failure() is invoked within <code>task.callback.ms</code> milliseconds, Samza will shut down the running Container. 
+
+If configured, Samza will keep up to <code>task.max.concurrency</code> number of messages processing asynchronously at a time within each Task Instance. Note that while message delivery (i.e., processAsync invocation) is guaranteed to be in-order within a stream partition, message processing may complete out of order when setting <code>task.max.concurrency</code> > 1. 
+
+For more details on asynchronous and concurrent processing, see the [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide).
 
-## Task Instances in Runtime
-When you run your job, Samza will create many instances of your task class (potentially on multiple machines). These task instances process the messages from the input streams.
+##### Additional Task Interfaces
 
-## Messages from Input Streams
+There are a few other interfaces you can implement in your StreamTask or AsyncStreamTask that provide additional functionality.
 
-For each message that Samza receives from the task’s input streams, the [process](javadocs/org/apache/samza/task/StreamTask.html#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-) or [processAsync](javadocs/org/apache/samza/task/AsyncStreamTask.html#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-) method is called. The [envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three things of importance: the message, the key, and the stream that the message came from.
+###### InitableTask
+You can implement the [InitableTask](javadocs/org/apache/samza/task/InitableTask) interface to access the [Context](javadocs/org/apache/samza/context/Context). Context provides access to any runtime objects you need in your task,
+whether they're provided by the framework, or your own.
 
 {% highlight java %}
     
-    /** Every message that is delivered to a StreamTask is wrapped
-     * in an IncomingMessageEnvelope, which contains metadata about
-     * the origin of the message. */
-    public class IncomingMessageEnvelope {
-      /** A deserialized message. */
-      Object getMessage() { ... }
-
-      /** A deserialized key. */
-      Object getKey() { ... }
-
-      /** The stream and partition that this message came from. */
-      SystemStreamPartition getSystemStreamPartition() { ... }
+    public interface InitableTask {
+      void init(Context context) throws Exception;
     }
+    
 {% endhighlight %}
 
-The key and value are declared as Object, and need to be cast to the correct type. The serializer/deserializer are defined via InputDescriptor, as described [here](high-level-api.md#data-serialization). A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.
-
-The [getSystemStreamPartition()](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html#getSystemStreamPartition--) method returns a [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html) object, which tells you where the message came from. It consists of three parts:
-1. The *system*: the name of the system from which the message came, as defined as SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name.
-2. The *stream name*: the name of the stream (topic, queue) within the source system. This is also defined as InputDescriptor in the TaskApplication.
-3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is normally split into several partitions, and each partition is assigned to one task instance by Samza.
-
-The API looks like this:
+###### ClosableTask
+You can implement the [ClosableTask](javadocs/org/apache/samza/task/ClosableTask) to clean up any runtime state during shutdown. This interface is deprecated. It's recommended to use the [ApplicationContainerContext](javadocs/org/apache/samza/context/ApplicationContainerContext) and [ApplicationTaskContext](javadocs/org/apache/samza/context/ApplicationContainerContext) APIs to manage the lifecycle of any runtime objects.
 
 {% highlight java %}
-    
-    /** A triple of system name, stream name and partition. */
-    public class SystemStreamPartition extends SystemStream {
 
-      /** The name of the system which provides this stream. It is
-          defined in the Samza job's configuration. */
-      public String getSystem() { ... }
-
-      /** The name of the stream/topic/queue within the system. */
-      public String getStream() { ... }
-
-      /** The partition within the stream. */
-      public Partition getPartition() { ... }
+    public interface ClosableTask {
+      void close() throws Exception;
     }
+    
 {% endhighlight %}
 
-In the example user-implemented TaskApplication above, the system name is “kafka”, the stream name is “pageViewEvent”. (The name “kafka” isn’t special — you can give your system any name you want.) If you have several input streams feeding into your StreamTask or AsyncStreamTask, you can use the SystemStreamPartition to determine what kind of message you’ve received.
-
-## Messages to Output Streams
-What about sending messages? If you take a look at the [process()](javadocs/org/apache/samza/task/StreamTask.html#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-) method in StreamTask, you’ll see that you get a [MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html). Similarly, you will get it in [processAsync()](javadocs/org/apache/samza/task/AsyncStreamTask.html#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-) method in AsyncStreamTask as well.
+###### WindowableTask
+You can implement the [WindowableTask](javadocs/org/apache/samza/task/WindowableTask) interface to implement processing logic that is invoked periodically by the framework.
 
 {% highlight java %}
     
-    /** When a task wishes to send a message, it uses this interface. */
-    public interface MessageCollector {
-      void send(OutgoingMessageEnvelope envelope);
+    public interface WindowableTask {
+      void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
     }
-    
-{% endhighlight %}
-
-To send a message, you create an [OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) object and pass it to the MessageCollector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the [javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for details.
 
-**NOTE**: Please only use the MessageCollector object within the process() or processAsync() method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.
+{% endhighlight %}
 
-For example, here’s a simple example to send out “Good PageViewEvents” in the BadPageViewFilterTask:
+###### EndOfStreamListenerTask
+You can implement the [EndOfStreamListenerTask](javadocs/org/apache/samza/task/EndOfStreamListenerTask) interface to implement processing logic that is invoked when a Task Instance has reached the end of all input SystemStreamPartitions it's consuming. This is typically relevant when running Samza as a batch job.
 
 {% highlight java %}
-    
-    public class BadPageViewFilterTask implements StreamTask {
 
-      // Send outgoing messages to a stream called "words"
-      // in the "kafka" system.
-      private final SystemStream OUTPUT_STREAM =
-        new SystemStream("kafka", "goodPageViewEvent");
-      @Override
-      public void process(IncomingMessageEnvelope envelope,
-                          MessageCollector collector,
-                          TaskCoordinator coordinator) {
-        if (isBadPageView(envelope)) {
-          // skip the message, increment the counter, do not send it
-          return;
-        }
-        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, envelope.getKey(), envelope.getValue()));
-      }
+    public interface EndOfStreamListenerTask {
+      void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
     }
     
 {% endhighlight %}
 
-## Accessing Tables
-There are many cases that you will need to lookup a table when processing an incoming message. Samza allows access to tables by a unique name through [TaskContext.getTable()](javadocs/org/apache/samza/task/TaskContext.html#getTable-java.lang.String-) method. [TaskContext](javadocs/org/apache/samza/task/TaskContext.html) is accessed via [Context.getTaskContext()](javadocs/org/apache/samza/context/Context.html#getTaskContext--) in the [InitiableTask’s init()]((javadocs/org/apache/samza/task/InitableTask.html#init-org.apache.samza.context.Context-)) method. A user code example to access a table in the above TaskApplication example is here:
+### Common Operations
 
-{% highlight java %}
-
-    public class BadPageViewFilter implements StreamTask, InitableTask {
-      private final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, “goodPageViewEvent”);
-      private ReadWriteTable<String, Integer> badPageUrlTable;
-      @Override
-      public void init(Context context) {
-        badPageUrlTable = (ReadWriteTable<String, Integer>) context.getTaskContext().getTable("badPageUrlTable");
-      }
+#### Receiving Messages from Input Streams
 
-      @Override
-      public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-        String key = (String)message.getKey();
-        if (badPageUrlTable.containsKey(key)) {
-          // skip the message, increment the counter, do not send it
-          badPageUrlTable.put(key, badPageUrlTable.get(key) + 1);
-          return;
-        }
-        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, key, message.getValue()));
-      }
-    }
+Samza calls your Task instance's [process](javadocs/org/apache/samza/task/StreamTask#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-) or [processAsync](javadocs/org/apache/samza/task/AsyncStreamTask#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-) method with each incoming message on your input streams. The [IncomingMessageEnvelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope) can be used to obtain the following information: the de-serialized key, the de-serialized message, and the [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition) that the message came from.
 
-{% endhighlight %}
+The key and message objects need to be cast to the correct type in your Task implementation based on the [Serde](javadocs/org/apache/samza/serializers/Serde.html) provided for the InputDescriptor for the input stream.
 
-For more detailed AsyncStreamTask example, follow the tutorial in [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). For more details on APIs, please refer to [Configuration](../jobs/configuration.md) and [Javadocs](javadocs).
+The [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition) object tells you where the message came from. It consists of three parts:
+1. The *system*: the name of the system the message came from, as specified for the SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name.
+2. The *stream name*: the name of the stream (e.g., topic, queue) within the input system. This is the physical name of the stream, as specified for the InputDescriptor in your TaskApplication.
+3. The [*partition*](javadocs/org/apache/samza/Partition): A stream is normally split into several partitions, and each partition is assigned to one task instance by Samza. 
 
-# Other Task Interfaces
+If you have several input streams for your TaskApplication, you can use the SystemStreamPartition to determine what kind of message you’ve received.
 
-There are other task interfaces to allow additional processing logic to be applied, besides the main per-message processing logic defined in StreamTask and AsyncStreamTask. You will need to implement those task interfaces in addition to StreamTask or AsyncStreamTask.
+#### Sending Messages to Output Streams
+To send a message to a stream, you first create an [OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope). At a minimum, you need to provide the message you want to send, and the system and stream to send it to. Optionally you can specify the partitioning key and other parameters. See the [javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope) for details.
 
-## InitiableTask
-This task interface allows users to initialize objects that are accessed within a task instance.
+You can then send the OutgoingMessageEnvelope using the [MessageCollector](javadocs/org/apache/samza/task/MessageCollector) provided with the process() or processAsync() call. You **must** use the MessageCollector delivered for the message you're currently processing. Holding on to a MessageCollector and reusing it later will cause your messages to not be sent correctly.  
 
 {% highlight java %}
     
-    public interface InitableTask {
-      void init(Context context) throws Exception;
+    /** When a task wishes to send a message, it uses this interface. */
+    public interface MessageCollector {
+      void send(OutgoingMessageEnvelope envelope);
     }
-{% endhighlight %}
-
-## WindowableTask
-This task interface allows users to define a processing logic that is invoked periodically within a task instance.
-
-{% highlight java %}
     
-    public interface WindowableTask {
-      void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
-    }
-
 {% endhighlight %}
 
-## ClosableTask
-This task interface defines the additional logic when closing a task. Usually, it is in pair with InitableTask to release system resources allocated for this task instance.
+#### Accessing Tables
 
-{% highlight java %}
+A [Table](javadocs/org/apache/samza/table/Table) is an abstraction for data sources that support random access by key. It is an evolution of the older [KeyValueStore](javadocs/org/apache/samza/storage/kv/KeyValueStore) API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a [ReadableTable](javadocs/org/apache/samza/table/ReadableTable) or a [ReadWriteTable](javadocs/org/apache/samza/table/ReadWriteTable).
+ 
+In the Low Level API, you can obtain and use a Table as follows:
 
-    public interface ClosableTask {
-      void close() throws Exception;
-    }
-    
-{% endhighlight %}
+1. Use the appropriate TableDescriptor to specify the table properties.
+2. Register the TableDescriptor with the TaskApplicationDescriptor.
+3. Obtain a Table reference within the task implementation using [TaskContext.getTable()](javadocs/org/apache/samza/task/TaskContext#getTable-java.lang.String-). [TaskContext](javadocs/org/apache/samza/task/TaskContext) is available via [Context.getTaskContext()](javadocs/org/apache/samza/context/Context#getTaskContext--), which in turn is available by implementing [InitiableTask. init()]((javadocs/org/apache/samza/task/InitableTask#init-org.apache.samza.context.Context-)).
 
-## EndOfStreamListenerTask
-This task interface defines the additional logic when a task instance has reached the end of all input SystemStreamPartitions (see Samza as a batch job).
+For example:
 
 {% highlight java %}
 
-    public interface EndOfStreamListenerTask {
-      void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception;
+    public class PageViewFilterTask implements StreamTask, InitableTask {
+      private final SystemStream outputStream = new SystemStream(“kafka”, “goodPageViewEvent”);
+      
+      private ReadWriteTable<String, Integer> badUrlsTable;
+      
+      @Override
+      public void init(Context context) {
+        badUrlsTable = (ReadWriteTable<String, Integer>) context.getTaskContext().getTable("badUrls");
+      }
+
+      @Override
+      public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+        String key = (String) message.getKey();
+        if (badUrlsTable.containsKey(key)) {
+          // skip the message, increment the counter, do not send it
+          badPageUrlTable.put(key, badPageUrlTable.get(key) + 1);
+        } else {
+          collector.send(new OutgoingMessageEnvelope(outputStream, key, message.getValue()));   }
+      }
     }
-    
+
 {% endhighlight %}
 
-# Legacy Task Application
+### Legacy Applications
 
-For legacy task application which do not implement TaskApplication interface, you may specify the system, stream, and local stores in your job’s configuration, in addition to task.class. An incomplete example of configuration for legacy task application could look like this (see the [configuration](../jobs/configuration.md) documentation for more detail):
+For legacy Low Level API applications, you can continue specifying your system, stream and store properties along with your task.class in configuration. An incomplete example of configuration for legacy task application looks like this (see the [configuration](../jobs/configuration.md) documentation for more detail):
 
 {% highlight jproperties %}
 
-    # This is the class above, which Samza will instantiate when the job is run
+    # This is the Task class that Samza will instantiate when the job is run
     task.class=com.example.samza.PageViewFilterTask
 
     # Define a system called "kafka" (you can give it any name, and you can define