You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by aj...@apache.org on 2023/01/18 19:33:31 UTC

svn commit: r1906774 [4/49] - in /samza/site: ./ archive/ blog/ case-studies/ community/ contribute/ img/latest/learn/documentation/api/ learn/documentation/latest/ learn/documentation/latest/api/ learn/documentation/latest/api/javadocs/ learn/document...

Modified: samza/site/learn/documentation/latest/api/high-level-api.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/api/high-level-api.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/api/high-level-api.html (original)
+++ samza/site/learn/documentation/latest/api/high-level-api.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/api/high-level-api">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/api/high-level-api">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/api/high-level-api">1.6.0</a></li>
 
               
@@ -640,334 +654,338 @@
 -->
 
 <h3 id="table-of-contents">Table Of Contents</h3>
-
 <ul>
-<li><a href="#introduction">Introduction</a></li>
-<li><a href="#code-examples">Code Examples</a></li>
-<li><a href="#key-concepts">Key Concepts</a>
-
-<ul>
-<li><a href="#streamapplication">StreamApplication</a></li>
-<li><a href="#messagestream">MessageStream</a></li>
-<li><a href="#table">Table</a></li>
-</ul></li>
-<li><a href="#operators">Operators</a>
-
-<ul>
-<li><a href="#map">Map</a></li>
-<li><a href="#flatmap">FlatMap</a></li>
-<li><a href="#asyncflatmap">AsyncFlatMap</a></li>
-<li><a href="#filter">Filter</a></li>
-<li><a href="#partitionby">PartitionBy</a></li>
-<li><a href="#merge">Merge</a></li>
-<li><a href="#broadcast">Broadcast</a></li>
-<li><a href="#sendto-stream">SendTo (Stream)</a></li>
-<li><a href="#sendto-table">SendTo (Table)</a></li>
-<li><a href="#sink">Sink</a></li>
-<li><a href="#join-stream-stream">Join (Stream-Stream)</a></li>
-<li><a href="#join-stream-table">Join (Stream-Table)</a></li>
-<li><a href="#window">Window</a>
-
-<ul>
-<li><a href="#windowing-concepts">Windowing Concepts</a></li>
-<li><a href="#window-types">Window Types</a></li>
-</ul></li>
-</ul></li>
-<li><a href="#operator-ids">Operator IDs</a></li>
-<li><a href="#data-serialization">Data Serialization</a></li>
-<li><a href="#application-serialization">Application Serialization</a></li>
+  <li><a href="#introduction">Introduction</a></li>
+  <li><a href="#code-examples">Code Examples</a></li>
+  <li><a href="#key-concepts">Key Concepts</a>
+    <ul>
+      <li><a href="#streamapplication">StreamApplication</a></li>
+      <li><a href="#messagestream">MessageStream</a></li>
+      <li><a href="#table">Table</a></li>
+    </ul>
+  </li>
+  <li><a href="#operators">Operators</a>
+    <ul>
+      <li><a href="#map">Map</a></li>
+      <li><a href="#flatmap">FlatMap</a></li>
+      <li><a href="#asyncflatmap">AsyncFlatMap</a></li>
+      <li><a href="#filter">Filter</a></li>
+      <li><a href="#partitionby">PartitionBy</a></li>
+      <li><a href="#merge">Merge</a></li>
+      <li><a href="#broadcast">Broadcast</a></li>
+      <li><a href="#sendto-stream">SendTo (Stream)</a></li>
+      <li><a href="#sendto-table">SendTo (Table)</a></li>
+      <li><a href="#sink">Sink</a></li>
+      <li><a href="#join-stream-stream">Join (Stream-Stream)</a></li>
+      <li><a href="#join-stream-table">Join (Stream-Table)</a></li>
+      <li><a href="#window">Window</a>
+        <ul>
+          <li><a href="#windowing-concepts">Windowing Concepts</a></li>
+          <li><a href="#window-types">Window Types</a></li>
+        </ul>
+      </li>
+    </ul>
+  </li>
+  <li><a href="#operator-ids">Operator IDs</a></li>
+  <li><a href="#data-serialization">Data Serialization</a></li>
+  <li><a href="#application-serialization">Application Serialization</a></li>
 </ul>
 
 <h3 id="introduction">Introduction</h3>
 
-<p>Samza&rsquo;s flexible High Level Streams API lets you describe your complex stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of operations on <a href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>. It provides a rich set of built-in operators that simplify common stream processing operations such as filtering, projection, repartitioning, stream-stream and stream-table joins, and windowing. </p>
+<p>Samza’s flexible High Level Streams API lets you describe your complex stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of operations on <a href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>. It provides a rich set of built-in operators that simplify common stream processing operations such as filtering, projection, repartitioning, stream-stream and stream-table joins, and windowing.</p>
 
 <h3 id="code-examples">Code Examples</h3>
 
 <p><a href="https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook">The Samza Cookbook</a> contains various recipes using the Samza High Level Streams API. These include:</p>
 
 <ul>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java">Filter example</a> demonstrates how to perform stateless operations on a stream. </p></li>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java">Join example</a> demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks</p></li>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java">Stream-Table Join example</a> demonstrates how to use the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.</p></li>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java">SessionWindow</a> and <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java">TumblingWindow</a> examples illustrate Samza&rsquo;s rich windowing and triggering capabilities.</p></li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java">Filter example</a> demonstrates how to perform stateless operations on a stream.</p>
+  </li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java">Join example</a> demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks</p>
+  </li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java">Stream-Table Join example</a> demonstrates how to use the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.</p>
+  </li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java">SessionWindow</a> and <a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java">TumblingWindow</a> examples illustrate Samza’s rich windowing and triggering capabilities.</p>
+  </li>
 </ul>
 
 <h3 id="key-concepts">Key Concepts</h3>
-
 <h4 id="streamapplication">StreamApplication</h4>
-
-<p>A <a href="javadocs/org/apache/samza/application/StreamApplication">StreamApplication</a> describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza&rsquo;s High Level Streams API.</p>
+<p>A <a href="javadocs/org/apache/samza/application/StreamApplication">StreamApplication</a> describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza’s High Level Streams API.</p>
 
 <p>A typical StreamApplication implementation consists of the following stages:</p>
 
 <ol>
-<li>Configuring the inputs, outputs and state (tables) using the appropriate <a href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>s, <a href="javadocs/org/apache/samza/descriptors/InputDescriptor">InputDescriptor</a>s, <a href="javadocs/org/apache/samza/system/descriptors/OutputDescriptor">OutputDescriptor</a>s and <a href="javadocs/org/apache/samza/table/descriptors/TableDescriptor">TableDescriptor</a>s.</li>
-<li>Obtaining the corresponding <a href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>s, <a href="javadocs/org/apache/samza/operators/OutputStream">OutputStream</a>s and <a href="javadocs/org/apache/samza/table/Table">Table</a>s from the provided <a href="javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor">StreamApplicationDescriptor</a></li>
-<li>Defining the processing logic using operators and functions on the streams and tables thus obtained.</li>
+  <li>Configuring the inputs, outputs and state (tables) using the appropriate <a href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>s, <a href="javadocs/org/apache/samza/descriptors/InputDescriptor">InputDescriptor</a>s, <a href="javadocs/org/apache/samza/system/descriptors/OutputDescriptor">OutputDescriptor</a>s and <a href="javadocs/org/apache/samza/table/descriptors/TableDescriptor">TableDescriptor</a>s.</li>
+  <li>Obtaining the corresponding <a href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>s, <a href="javadocs/org/apache/samza/operators/OutputStream">OutputStream</a>s and <a href="javadocs/org/apache/samza/table/Table">Table</a>s from the provided <a href="javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor">StreamApplicationDescriptor</a></li>
+  <li>Defining the processing logic using operators and functions on the streams and tables thus obtained.</li>
 </ol>
 
 <p>The following example StreamApplication removes page views older than 1 hour from the input stream:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>   
-    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilter</span> <span class="kd">implements</span> <span class="n">StreamApplication</span> <span class="o">{</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">StreamApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
+    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilter</span> <span class="kd">implements</span> <span class="nc">StreamApplication</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="nc">StreamApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
         <span class="c1">// Step 1: configure the inputs and outputs using descriptors</span>
-        <span class="n">KafkaSystemDescriptor</span> <span class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">)</span>
-            <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">&quot;...&quot;</span><span class="o">))</span>
-            <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">&quot;...&quot;</span><span class="o">,</span> <span class="s">&quot;...&quot;</span><span class="o">));</span>
-        <span class="n">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="n">PageViewEvent</span><span class="o">&gt;</span> <span class="n">kid</span> <span class="o">=</span> 
-            <span class="n">ksd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;pageViewEvent&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
-        <span class="n">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="n">PageViewEvent</span><span class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
-            <span class="n">ksd</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">&quot;recentPageViewEvent&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span>
+        <span class="nc">KafkaSystemDescriptor</span> <span class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"..."</span><span class="o">))</span>
+            <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"..."</span><span class="o">,</span> <span class="s">"..."</span><span class="o">));</span>
+        <span class="nc">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="nc">PageViewEvent</span><span class="o">&gt;</span> <span class="n">kid</span> <span class="o">=</span> 
+            <span class="n">ksd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"pageViewEvent"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
+        <span class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="nc">PageViewEvent</span><span class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
+            <span class="n">ksd</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"recentPageViewEvent"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span>
   
         <span class="c1">// Step 2: obtain the message strems and output streams </span>
-        <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">PageViewEvent</span><span class="o">&gt;</span> <span class="n">pageViewEvents</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">kid</span><span class="o">);</span>
-        <span class="n">OutputStream</span><span class="o">&lt;</span><span class="n">PageViewEvent</span><span class="o">&gt;</span> <span class="n">recentPageViewEvents</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">kod</span><span class="o">);</span>
+        <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageViewEvent</span><span class="o">&gt;</span> <span class="n">pageViewEvents</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">kid</span><span class="o">);</span>
+        <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="nc">PageViewEvent</span><span class="o">&gt;</span> <span class="n">recentPageViewEvents</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">kod</span><span class="o">);</span>
   
         <span class="c1">// Step 3: define the processing logic</span>
         <span class="n">pageViewEvents</span>
             <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="n">m</span><span class="o">.</span><span class="na">getCreationTime</span><span class="o">()</span> <span class="o">&gt;</span> 
-                <span class="n">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">()</span> <span class="o">-</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofHours</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="na">toMillis</span><span class="o">())</span>
+                <span class="nc">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">()</span> <span class="o">-</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofHours</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="na">toMillis</span><span class="o">())</span>
             <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">recentPageViewEvents</span><span class="o">);</span>
       <span class="o">}</span>
     <span class="o">}</span>
   </code></pre></figure>
 
 <h4 id="messagestream">MessageStream</h4>
-
 <p>A <a href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>, as the name implies, represents a stream of messages. A StreamApplication is described as a Directed Acyclic Graph (DAG) of transformations on MessageStreams. You can get a MessageStream in two ways:</p>
 
 <ol>
-<li>Calling StreamApplicationDescriptor#getInputStream() with an <a href="javadocs/org/apache/samza/system/descriptors/InputDescriptor">InputDescriptor</a> obtained from a <a href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>.</li>
-<li>By transforming an existing MessageStream using operators like map, filter, window, join etc.</li>
+  <li>Calling StreamApplicationDescriptor#getInputStream() with an <a href="javadocs/org/apache/samza/system/descriptors/InputDescriptor">InputDescriptor</a> obtained from a <a href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>.</li>
+  <li>By transforming an existing MessageStream using operators like map, filter, window, join etc.</li>
 </ol>
 
 <h4 id="table">Table</h4>
-
-<p>A <a href="javadocs/org/apache/samza/table/Table">Table</a> is an abstraction for data sources that support random access by key. It is an evolution of the older <a href="javadocs/org/apache/samza/storage/kv/KeyValueStore">KeyValueStore</a> API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a <a href="javadocs/org/apache/samza/table/ReadableTable">ReadableTable</a> or a <a href="javadocs/org/apache/samza/table/ReadWriteTable">ReadWriteTable</a>.</p>
+<p>A <a href="javadocs/org/apache/samza/table/Table">Table</a> is an abstraction for data sources that support random access by key. It is an evolution of the older <a href="javadocs/org/apache/samza/storage/kv/KeyValueStore">KeyValueStore</a> API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support.</p>
 
 <p>In the High Level Streams API, you can obtain and use a Table as follows:</p>
 
 <ol>
-<li>Use the appropriate TableDescriptor to specify the table properties.</li>
-<li>Register the TableDescriptor with the StreamApplicationDescriptor. This returns a Table reference, which can be used for populate the table using the <a href="#sendto-table">Send To Table</a> operator, or for joining a stream with the table using the <a href="#join-stream-table">Stream-Table Join</a> operator.</li>
-<li>Alternatively, you can obtain a Table reference within an operator&rsquo;s <a href="javadocs/org/apache/samza/operators/functions/InitableFunction">InitableFunction</a> using the provided <a href="javadocs/org/apache/samza/context/TaskContext">TaskContext</a>.</li>
+  <li>Use the appropriate TableDescriptor to specify the table properties.</li>
+  <li>Register the TableDescriptor with the StreamApplicationDescriptor. This returns a Table reference, which can be used for populate the table using the <a href="#sendto-table">Send To Table</a> operator, or for joining a stream with the table using the <a href="#join-stream-table">Stream-Table Join</a> operator.</li>
+  <li>Alternatively, you can obtain a Table reference within an operator’s <a href="javadocs/org/apache/samza/operators/functions/InitableFunction">InitableFunction</a> using the provided <a href="javadocs/org/apache/samza/context/TaskContext">TaskContext</a>.</li>
 </ol>
 
 <h3 id="operators">Operators</h3>
-
-<p>The High Level Streams API provides common operations like map, flatmap, filter, merge, broadcast, joins, and windows on MessageStreams. Most of these operators accept their corresponding Functions as an argument. </p>
+<p>The High Level Streams API provides common operations like map, flatmap, filter, merge, broadcast, joins, and windows on MessageStreams. Most of these operators accept their corresponding Functions as an argument.</p>
 
 <h4 id="map">Map</h4>
-
 <p>Applies the provided 1:1 <a href="javadocs/org/apache/samza/operators/functions/MapFunction">MapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">numbers</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">tripled</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="n">m</span> <span class="o">*</span> <span class="mi">3</span><span class="o">);</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">stringified</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">m</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">numbers</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">tripled</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="n">m</span> <span class="o">*</span> <span class="mi">3</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">stringified</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="nc">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">m</span><span class="o">));</span></code></pre></figure>
 
 <h4 id="flatmap">FlatMap</h4>
-
 <p>Applies the provided 1:n <a href="javadocs/org/apache/samza/operators/functions/FlatMapFunction">FlatMapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">sentence</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">sentence</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Parse the sentence into its individual words splitting on space</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentence</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">sentence</span> <span class="o">-&gt;</span>
-        <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">sentence</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="err">“</span> <span class="err">”</span><span class="o">))</span></code></pre></figure>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentence</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">sentence</span> <span class="o">-&gt;</span>
+        <span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">sentence</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="err">“</span> <span class="err">”</span><span class="o">))</span></code></pre></figure>
 
 <h4 id="asyncflatmap">AsyncFlatMap</h4>
-
 <p>Applies the provided 1:n <a href="javadocs/org/apache/samza/operators/functions/AsyncFlatMapFunction">AsyncFlatMapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The AsyncFlatMapFunction takes in a single message and returns a future of zero or more messages.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">RestClient</span> <span class="n">restClient</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">RestClient</span> <span class="n">restClient</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Transform each incoming word into its meaning using a dictionary look up service</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">meanings</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">asyncFlatMap</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="o">{</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">meanings</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">asyncFlatMap</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="o">{</span>
        <span class="c1">// Builds a look up request to the dictionary service</span>
-       <span class="n">Request</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">dictionaryRequest</span> <span class="o">=</span> <span class="n">buildDictionaryRequest</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
-       <span class="n">CompletableFuture</span><span class="o">&lt;</span><span class="n">DictionaryResponse</span><span class="o">&gt;</span> <span class="n">dictionaryResponseFuture</span> <span class="o">=</span> <span class="n">restClient</span><span class="o">.</span><span class="na">sendRequest</span><span class="o">(</span><span class="n">dictionaryRequest</span><span class="o">);</span>
+       <span class="nc">Request</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">dictionaryRequest</span> <span class="o">=</span> <span class="n">buildDictionaryRequest</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
+       <span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">DictionaryResponse</span><span class="o">&gt;</span> <span class="n">dictionaryResponseFuture</span> <span class="o">=</span> <span class="n">restClient</span><span class="o">.</span><span class="na">sendRequest</span><span class="o">(</span><span class="n">dictionaryRequest</span><span class="o">);</span>
        <span class="k">return</span> <span class="n">dictionaryResponseFuture</span>
-            <span class="o">.</span><span class="na">thenApply</span><span class="o">(</span><span class="n">response</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">Pair</span><span class="o">&lt;&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="n">response</span><span class="o">.</span><span class="na">getMeaning</span><span class="o">()));</span>
+            <span class="o">.</span><span class="na">thenApply</span><span class="o">(</span><span class="n">response</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="nc">Pair</span><span class="o">&lt;&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="n">response</span><span class="o">.</span><span class="na">getMeaning</span><span class="o">()));</span>
     <span class="o">});</span></code></pre></figure>
 
 <p>For more details on asynchronous processing, see <a href="../../../tutorials/latest/samza-async-user-guide">Samza Async API and Multithreading User Guide</a></p>
 
 <h4 id="filter">Filter</h4>
-
 <p>Applies the provided <a href="javadocs/org/apache/samza/operators/functions/FilterFunction">FilterFunction</a> to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Extract only the long words</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">longWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">&gt;</span> <span class="mi">15</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">longWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">&gt;</span> <span class="mi">15</span><span class="o">);</span>
     <span class="c1">// Extract only the short words</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">shortWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">&lt;</span> <span class="mi">3</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">shortWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">&lt;</span> <span class="mi">3</span><span class="o">);</span>
     </code></pre></figure>
 
 <h4 id="partitionby">PartitionBy</h4>
-
 <p>Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="c1">// Repartition PageViews by userId.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;&gt;</span> <span class="n">partitionedPageViews</span> <span class="o">=</span> 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">partitionedPageViews</span> <span class="o">=</span> 
         <span class="n">pageViews</span><span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key extractor</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">,</span> <span class="c1">// value extractor</span>
-            <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)),</span> <span class="c1">// serdes</span>
-            <span class="s">&quot;partitioned-page-views&quot;</span><span class="o">);</span> <span class="c1">// operator ID    </span>
+            <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)),</span> <span class="c1">// serdes</span>
+            <span class="s">"partitioned-page-views"</span><span class="o">);</span> <span class="c1">// operator ID    </span>
         </code></pre></figure>
 
 <p>The operator ID should be unique for each operator within the application and is used to identify the streams and stores created by the operator.</p>
 
 <h4 id="merge">Merge</h4>
-
 <p>Merges the MessageStream with all the provided MessageStreams and returns the merged stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">LogEvent</span><span class="o">&gt;</span> <span class="n">log1</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">LogEvent</span><span class="o">&gt;</span> <span class="n">log2</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">LogEvent</span><span class="o">&gt;</span> <span class="n">log1</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">LogEvent</span><span class="o">&gt;</span> <span class="n">log2</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="c1">// Merge individual “LogEvent” streams and create a new merged MessageStream</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">LogEvent</span><span class="o">&gt;</span> <span class="n">mergedLogs</span> <span class="o">=</span> <span class="n">log1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">log2</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">LogEvent</span><span class="o">&gt;</span> <span class="n">mergedLogs</span> <span class="o">=</span> <span class="n">log1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">log2</span><span class="o">);</span>
     
     <span class="c1">// Alternatively, use mergeAll to merge multiple streams</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">LogEvent</span><span class="o">&gt;</span> <span class="n">mergedLogs</span> <span class="o">=</span> <span class="n">MessageStream</span><span class="o">.</span><span class="na">mergeAll</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">log1</span><span class="o">,</span> <span class="n">log2</span><span class="o">,</span> <span class="o">...));</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">LogEvent</span><span class="o">&gt;</span> <span class="n">mergedLogs</span> <span class="o">=</span> <span class="nc">MessageStream</span><span class="o">.</span><span class="na">mergeAll</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">log1</span><span class="o">,</span> <span class="n">log2</span><span class="o">,</span> <span class="o">...));</span>
     </code></pre></figure>
 
 <p>The merge transform preserves the order of messages within each MessageStream. If message <code>m1</code> appears before <code>m2</code> in any provided stream, then, <code>m1</code> will also appears before <code>m2</code> in the merged stream.</p>
 
 <h4 id="broadcast">Broadcast</h4>
-
 <p>Broadcasts the contents of the MessageStream to every <em>instance</em> of downstream operators via an intermediate stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">VersionChange</span><span class="o">&gt;</span> <span class="n">versionChanges</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">VersionChange</span><span class="o">&gt;</span> <span class="n">versionChanges</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="c1">// Broadcast version change event to all downstream operator instances.</span>
     <span class="n">versionChanges</span>
         <span class="o">.</span><span class="na">broadcast</span><span class="o">(</span>
-            <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">VersionChange</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="c1">// serde</span>
-            <span class="s">&quot;version-change-broadcast&quot;</span><span class="o">);</span> <span class="c1">// operator ID</span>
+            <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">VersionChange</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="c1">// serde</span>
+            <span class="s">"version-change-broadcast"</span><span class="o">);</span> <span class="c1">// operator ID</span>
         <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">vce</span> <span class="o">-&gt;</span> <span class="cm">/* act on version change event in each instance */</span> <span class="o">);</span>
          </code></pre></figure>
 
 <h4 id="sendto-stream">SendTo (Stream)</h4>
-
 <p>Sends all messages in this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing messages.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
     <span class="c1">// Obtain the OutputStream using an OutputDescriptor</span>
-    <span class="n">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
-        <span class="n">ksd</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="err">“</span><span class="n">user</span><span class="o">-</span><span class="n">country</span><span class="err">”</span><span class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">StringSerde</span><span class="o">());</span>
-    <span class="n">OutputStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">userCountries</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">od</span><span class="o">)</span>
+    <span class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
+        <span class="n">ksd</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="err">“</span><span class="n">user</span><span class="o">-</span><span class="n">country</span><span class="err">”</span><span class="o">,</span> <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">StringSerde</span><span class="o">());</span>
+    <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">userCountries</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">od</span><span class="o">)</span>
     
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Send a new message with userId as the key and their country as the value to the “user-country” stream.</span>
     <span class="n">pageViews</span>
-      <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getCountry</span><span class="o">()));</span>
+      <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> <span class="no">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getCountry</span><span class="o">()));</span>
       <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">userCountries</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="sendto-table">SendTo (Table)</h4>
-
 <p>Sends all messages in this MessageStream to the provided Table. The expected message type is <a href="javadocs/org/apache/samza/operators/KV">KV</a>.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">Profile</span><span class="o">&gt;</span> <span class="n">profilesStream</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">Table</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Profile</span><span class="o">&gt;&gt;</span> <span class="n">profilesTable</span> <span class="o">=</span> 
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Profile</span><span class="o">&gt;</span> <span class="n">profilesStream</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">Table</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Long</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;&gt;</span> <span class="n">profilesTable</span> <span class="o">=</span> 
     
     <span class="n">profilesStream</span>
-        <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">profile</span> <span class="o">-&gt;</span> <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">profile</span><span class="o">.</span><span class="na">getMemberId</span><span class="o">(),</span> <span class="n">profile</span><span class="o">))</span>
+        <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">profile</span> <span class="o">-&gt;</span> <span class="no">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">profile</span><span class="o">.</span><span class="na">getMemberId</span><span class="o">(),</span> <span class="n">profile</span><span class="o">))</span>
         <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">profilesTable</span><span class="o">);</span>
         </code></pre></figure>
 
-<h4 id="sink">Sink</h4>
+<p>Sends all update messages in this MessageStream to the provided Table. The expected message type is <a href="javadocs/org/apache/samza/operators/KV">KV</a>.
+V should be on type UpdateMessage which defines an update message and an optional default to be inserted in the absence of an existing record.
+User also needs to pass an UpdateOptions parameter as well in the sendTo call of MessageStream. It defines the behavior of the sendTo-table operator 
+in terms of whether it should insert a default in the absence of an existing record or not.</p>
+
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">UserInfoEvent</span><span class="o">&gt;</span> <span class="n">userInfoEventStream</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">Table</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Long</span><span class="o">,</span> <span class="nc">UserInfo</span><span class="o">&gt;&gt;</span> <span class="n">userInfoTable</span> <span class="o">=</span> <span class="o">...</span>
+
+    <span class="n">userInfoEventStream</span>
+        <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">event</span> <span class="o">-&gt;</span> <span class="o">{</span>
+          <span class="nc">UserInfo</span> <span class="n">userInfo</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="na">getUserInfo</span><span class="o">();</span>
+          <span class="nc">String</span> <span class="n">update</span> <span class="o">=</span> <span class="o">...;</span> 
+          <span class="k">return</span> <span class="no">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">userInfo</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="nc">UpdateMessage</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">update</span><span class="o">,</span> <span class="n">userInfo</span><span class="o">));</span>
+        <span class="o">})</span>
+        <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">userInfoTable</span><span class="o">,</span> <span class="nc">UpdateOptions</span><span class="o">.</span><span class="na">UPDATE_WITH_DEFAULTS</span><span class="o">);</span></code></pre></figure>
 
+<h4 id="sink">Sink</h4>
 <p>Allows sending messages from this MessageStream to an output system using the provided <a href="javadocs/org/apache/samza/operators/functions/SinkFunction.html">SinkFunction</a>.</p>
 
 <p>This offers more control than <a href="#sendto-stream">SendTo (Stream)</a> since the SinkFunction has access to the MessageCollector and the TaskCoordinator. For example, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. a remote databases, REST services, etc.)</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="n">pageViews</span><span class="o">.</span><span class="na">sink</span><span class="o">((</span><span class="n">msg</span><span class="o">,</span> <span class="n">collector</span><span class="o">,</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
         <span class="c1">// Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.</span>
-        <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span class="k">new</span> <span class="n">SystemStream</span><span class="o">(</span><span class="err">“</span><span class="n">kafka</span><span class="err">”</span><span class="o">,</span> <span class="err">“</span><span class="n">TransformedPageViewEvent</span><span class="err">”</span><span class="o">),</span> <span class="n">msg</span><span class="o">));</span>
+        <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="nc">OutgoingMessageEnvelope</span><span class="o">(</span><span class="k">new</span> <span class="nc">SystemStream</span><span class="o">(</span><span class="err">“</span><span class="n">kafka</span><span class="err">”</span><span class="o">,</span> <span class="err">“</span><span class="nc">TransformedPageViewEvent</span><span class="err">”</span><span class="o">),</span> <span class="n">msg</span><span class="o">));</span>
     <span class="o">}</span> <span class="o">);</span>
         </code></pre></figure>
 
 <h4 id="join-stream-stream">Join (Stream-Stream)</h4>
-
 <p>The Stream-Stream Join operator joins messages from two MessageStreams using the provided pairwise <a href="javadocs/org/apache/samza/operators/functions/JoinFunction.html">JoinFunction</a>. Messages are joined when the key extracted from a message from the first stream matches the key extracted from a message in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found. Join only retains the latest message for each input stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
     <span class="c1">// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.</span>
     <span class="c1">// Results are produced to a new stream of FulfilledOrderRecord.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">OrderRecord</span><span class="o">&gt;</span> <span class="n">orders</span> <span class="o">=</span> <span class="err">…</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">ShipmentRecord</span><span class="o">&gt;</span> <span class="n">shipments</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">OrderRecord</span><span class="o">&gt;</span> <span class="n">orders</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">ShipmentRecord</span><span class="o">&gt;</span> <span class="n">shipments</span> <span class="o">=</span> <span class="err">…</span>
 
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">FulfilledOrderRecord</span><span class="o">&gt;</span> <span class="n">shippedOrders</span> <span class="o">=</span> <span class="n">orders</span><span class="o">.</span><span class="na">join</span><span class="o">(</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">FulfilledOrderRecord</span><span class="o">&gt;</span> <span class="n">shippedOrders</span> <span class="o">=</span> <span class="n">orders</span><span class="o">.</span><span class="na">join</span><span class="o">(</span>
         <span class="n">shipments</span><span class="o">,</span> <span class="c1">// other stream</span>
-        <span class="k">new</span> <span class="n">OrderShipmentJoiner</span><span class="o">(),</span> <span class="c1">// join function</span>
-        <span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="c1">// serde for the join key</span>
-        <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">OrderRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">ShipmentRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="c1">// serde for both streams</span>
-        <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">20</span><span class="o">),</span> <span class="c1">// join TTL</span>
-        <span class="s">&quot;shipped-order-stream&quot;</span><span class="o">)</span> <span class="c1">// operator ID</span>
+        <span class="k">new</span> <span class="nf">OrderShipmentJoiner</span><span class="o">(),</span> <span class="c1">// join function</span>
+        <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="c1">// serde for the join key</span>
+        <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">OrderRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">ShipmentRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="c1">// serde for both streams</span>
+        <span class="nc">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">20</span><span class="o">),</span> <span class="c1">// join TTL</span>
+        <span class="s">"shipped-order-stream"</span><span class="o">)</span> <span class="c1">// operator ID</span>
 
     <span class="c1">// Constructs a new FulfilledOrderRecord by extracting the order timestamp from the OrderRecord and the shipment timestamp from the ShipmentRecord.</span>
-    <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> <span class="kd">implements</span> <span class="n">JoinFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">OrderRecord</span><span class="o">,</span> <span class="n">ShipmentRecord</span><span class="o">,</span> <span class="n">FulfilledOrderRecord</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> <span class="kd">implements</span> <span class="nc">JoinFunction</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">OrderRecord</span><span class="o">,</span> <span class="nc">ShipmentRecord</span><span class="o">,</span> <span class="nc">FulfilledOrderRecord</span><span class="o">&gt;</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">FulfilledOrderRecord</span> <span class="nf">apply</span><span class="o">(</span><span class="n">OrderRecord</span> <span class="n">message</span><span class="o">,</span> <span class="n">ShipmentRecord</span> <span class="n">otherMessage</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="k">new</span> <span class="n">FulfilledOrderRecord</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">orderTimestamp</span><span class="o">,</span> <span class="n">otherMessage</span><span class="o">.</span><span class="na">shipTimestamp</span><span class="o">);</span>
+      <span class="kd">public</span> <span class="nc">FulfilledOrderRecord</span> <span class="nf">apply</span><span class="o">(</span><span class="nc">OrderRecord</span> <span class="n">message</span><span class="o">,</span> <span class="nc">ShipmentRecord</span> <span class="n">otherMessage</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="nf">FulfilledOrderRecord</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">orderTimestamp</span><span class="o">,</span> <span class="n">otherMessage</span><span class="o">.</span><span class="na">shipTimestamp</span><span class="o">);</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">String</span> <span class="nf">getFirstKey</span><span class="o">(</span><span class="n">OrderRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getFirstKey</span><span class="o">(</span><span class="nc">OrderRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">;</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">String</span> <span class="nf">getSecondKey</span><span class="o">(</span><span class="n">ShipmentRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getSecondKey</span><span class="o">(</span><span class="nc">ShipmentRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">;</span>
       <span class="o">}</span>
     <span class="o">}</span>
     </code></pre></figure>
 
 <h4 id="join-stream-table">Join (Stream-Table)</h4>
-
 <p>The Stream-Table Join operator joins messages from a MessageStream with messages in a Table using the provided <a href="javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html">StreamTableJoinFunction</a>. Messages are joined when the key extracted from a message in the stream matches the key for a record in the table. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided. The join function can choose to return null for an inner join, or an output message for a left outer join. For join correctness, it is important to ensure the input stream and table are partitioned using the same key (e.g., using the partitionBy operator) as this impacts the physical placement of data.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">pageViews</span>
-        <span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="n">pv</span> <span class="o">-&gt;</span> <span class="n">pv</span><span class="o">.</span><span class="na">getMemberId</span><span class="o">,</span> <span class="n">pv</span> <span class="o">-&gt;</span> <span class="n">pv</span><span class="o">,</span> <span class="s">&quot;page-views-by-memberid&quot;</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">profiles</span><span class="o">,</span> <span class="k">new</span> <span class="n">PageViewToProfileTableJoiner</span><span class="o">())</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="n">pageViews</span>
+        <span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="n">pv</span> <span class="o">-&gt;</span> <span class="n">pv</span><span class="o">.</span><span class="na">getMemberId</span><span class="o">,</span> <span class="n">pv</span> <span class="o">-&gt;</span> <span class="n">pv</span><span class="o">,</span> <span class="s">"page-views-by-memberid"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">profiles</span><span class="o">,</span> <span class="k">new</span> <span class="nc">PageViewToProfileTableJoiner</span><span class="o">())</span>
         <span class="o">...</span>
     
     <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewToProfileTableJoiner</span> <span class="kd">implements</span> 
-        <span class="n">StreamTableJoinFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;,</span> <span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">&gt;,</span> <span class="n">EnrichedPageView</span><span class="o">&gt;</span> <span class="o">{</span>
+        <span class="nc">StreamTableJoinFunction</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;,</span> <span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;,</span> <span class="nc">EnrichedPageView</span><span class="o">&gt;</span> <span class="o">{</span>
       
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">EnrichedPageView</span> <span class="nf">apply</span><span class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;</span> <span class="n">m</span><span class="o">,</span> <span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">&gt;</span> <span class="n">r</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> <span class="k">new</span> <span class="n">EnrichedPageView</span><span class="o">(...)</span> <span class="o">:</span> <span class="kc">null</span><span class="o">;</span>
+      <span class="kd">public</span> <span class="nc">EnrichedPageView</span> <span class="nf">apply</span><span class="o">(</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">m</span><span class="o">,</span> <span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;</span> <span class="n">r</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> <span class="k">new</span> <span class="nc">EnrichedPageView</span><span class="o">(...)</span> <span class="o">:</span> <span class="kc">null</span><span class="o">;</span>
       <span class="o">}</span>
        
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">getMessageKey</span><span class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">getMessageKey</span><span class="o">(</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">getRecordKey</span><span class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">&gt;</span> <span class="n">record</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">getRecordKey</span><span class="o">(</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;</span> <span class="n">record</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">record</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span>
       <span class="o">}</span>
     <span class="o">}</span>
     </code></pre></figure>
 
 <h3 id="window">Window</h3>
-
 <h4 id="windowing-concepts">Windowing Concepts</h4>
-
 <p><strong>Windows, Triggers, and WindowPanes</strong>: The window operator groups incoming messages in the MessageStream into finite windows. Each emitted result contains one or more messages in the window and is called a WindowPane.</p>
 
 <p>A window can have one or more associated triggers which determine when results from the window are emitted. Triggers can be either <a href="javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-">early triggers</a> that allow emitting results speculatively before all data for the window has arrived, or late triggers that allow handling late messages for the window.</p>
@@ -981,102 +999,95 @@
 <p>An accumulating window retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.</p>
 
 <h4 id="window-types">Window Types</h4>
-
 <p>The Samza High Level Streams API currently supports tumbling and session windows.</p>
 
 <p><strong>Tumbling Window</strong>: A tumbling window defines a series of contiguous, fixed size time intervals in the stream.</p>
 
 <p>Examples:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
     <span class="c1">// Group the pageView stream into 30 second tumbling windows keyed by the userId.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WindowPane</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Collection</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;&gt;&gt;</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
-        <span class="n">Windows</span><span class="o">.</span><span class="na">keyedTumblingWindow</span><span class="o">(</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;&gt;&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
+        <span class="nc">Windows</span><span class="o">.</span><span class="na">keyedTumblingWindow</span><span class="o">(</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key extractor</span>
-            <span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="c1">// window duration</span>
-            <span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span>
+            <span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="c1">// window duration</span>
+            <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span>
 
     <span class="c1">// Compute the maximum value over tumbling windows of 30 seconds.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">integers</span> <span class="o">=</span> <span class="err">…</span>
-    <span class="n">Supplier</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="n">Integer</span><span class="o">.</span><span class="na">MIN_VALUE</span><span class="o">;</span>
-    <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">aggregateFunction</span> <span class="o">=</span> 
-        <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">);</span>
-    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WindowPane</span><span class="o">&lt;</span><span class="n">Void</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
-       <span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span>
-            <span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">integers</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">Supplier</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="nc">Integer</span><span class="o">.</span><span class="na">MIN_VALUE</span><span class="o">;</span>
+    <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">aggregateFunction</span> <span class="o">=</span> 
+        <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">);</span>
+    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
+       <span class="nc">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span>
+            <span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> 
             <span class="n">initialValue</span><span class="o">,</span> 
             <span class="n">aggregateFunction</span><span class="o">,</span> 
-            <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">()));</span>
+            <span class="k">new</span> <span class="nf">IntegerSerde</span><span class="o">()));</span>
    </code></pre></figure>
 
 <p><strong>Session Window</strong>: A session window groups a MessageStream into sessions. A session captures a period of activity over a MessageStream and is defined by a gap. A session is closed and results are emitted if no new messages arrive for the window for the gap duration.</p>
 
 <p>Examples:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="c1">// Sessionize a stream of page views, and count the number of page-views in a session for every user.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
-    <span class="n">Supplier</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span>
-    <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">countAggregator</span> <span class="o">=</span> <span class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span class="n">oldCount</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">oldCount</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span>
-    <span class="n">Duration</span> <span class="n">sessionGap</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="c1">// Sessionize a stream of page views with a session gap of 10 seconds</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;,</span> <span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
+      <span class="nc">Windows</span><span class="o">.</span><span class="na">keyedSessionWindow</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">)));</span>
+    
+    <span class="c1">// Sessionize a stream of page views, and count the number of page-views in a session for every user.</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">Supplier</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span>
+    <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">countAggregator</span> <span class="o">=</span> <span class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span class="n">oldCount</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">oldCount</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span>
+    <span class="nc">Duration</span> <span class="n">sessionGap</span> <span class="o">=</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
     
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WindowPane</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">sessionCounts</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
-        <span class="n">Windows</span><span class="o">.</span><span class="na">keyedSessionWindow</span><span class="o">(</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">sessionCounts</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span>
+        <span class="nc">Windows</span><span class="o">.</span><span class="na">keyedSessionWindow</span><span class="o">(</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> 
             <span class="n">sessionGap</span><span class="o">,</span> 
             <span class="n">initialValue</span><span class="o">,</span> 
             <span class="n">countAggregator</span><span class="o">,</span>

[... 72 lines stripped ...]