You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by aj...@apache.org on 2023/01/18 19:33:31 UTC

svn commit: r1906774 [24/49] - in /samza/site: ./ archive/ blog/ case-studies/ community/ contribute/ img/latest/learn/documentation/api/ learn/documentation/latest/ learn/documentation/latest/api/ learn/documentation/latest/api/javadocs/ learn/documen...

Modified: samza/site/learn/documentation/latest/api/low-level-api.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/api/low-level-api.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/api/low-level-api.html (original)
+++ samza/site/learn/documentation/latest/api/low-level-api.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/api/low-level-api">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/api/low-level-api">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/api/low-level-api">1.6.0</a></li>
 
               
@@ -640,84 +654,88 @@
 -->
 
 <h3 id="table-of-contents">Table Of Contents</h3>
-
-<ul>
-<li><a href="#introduction">Introduction</a></li>
-<li><a href="#code-examples">Code Examples</a></li>
-<li><a href="#key-concepts">Key Concepts</a>
-
-<ul>
-<li><a href="#taskapplication">TaskApplication</a></li>
-<li><a href="#taskfactory">TaskFactory</a></li>
-<li><a href="#task-interfaces">Task Interfaces</a>
-
 <ul>
-<li><a href="#streamtask">StreamTask</a></li>
-<li><a href="#asyncstreamtask">AsyncStreamTask</a></li>
-<li><a href="#additional-task-interfaces">Additional Task Interfaces</a>
-
-<ul>
-<li><a href="#initabletask">InitableTask</a></li>
-<li><a href="#closabletask">ClosableTask</a></li>
-<li><a href="#windowabletask">WindowableTask</a></li>
-<li><a href="#endofstreamlistenertask">EndOfStreamListenerTask</a> </li>
-</ul></li>
-</ul></li>
-</ul></li>
-<li><a href="#common-operations">Common Operations</a>
-
-<ul>
-<li><a href="#receiving-messages-from-input-streams">Receiving Messages from Input Streams</a></li>
-<li><a href="#sending-messages-to-output-streams">Sending Messages to Output Streams</a></li>
-<li><a href="#accessing-tables">Accessing Tables</a></li>
-</ul></li>
-<li><a href="#legacy-applications">Legacy Applications</a></li>
+  <li><a href="#introduction">Introduction</a></li>
+  <li><a href="#code-examples">Code Examples</a></li>
+  <li><a href="#key-concepts">Key Concepts</a>
+    <ul>
+      <li><a href="#taskapplication">TaskApplication</a></li>
+      <li><a href="#taskfactory">TaskFactory</a></li>
+      <li><a href="#task-interfaces">Task Interfaces</a>
+        <ul>
+          <li><a href="#streamtask">StreamTask</a></li>
+          <li><a href="#asyncstreamtask">AsyncStreamTask</a></li>
+          <li><a href="#additional-task-interfaces">Additional Task Interfaces</a>
+            <ul>
+              <li><a href="#initabletask">InitableTask</a></li>
+              <li><a href="#closabletask">ClosableTask</a></li>
+              <li><a href="#windowabletask">WindowableTask</a></li>
+              <li><a href="#endofstreamlistenertask">EndOfStreamListenerTask</a></li>
+            </ul>
+          </li>
+        </ul>
+      </li>
+    </ul>
+  </li>
+  <li><a href="#common-operations">Common Operations</a>
+    <ul>
+      <li><a href="#receiving-messages-from-input-streams">Receiving Messages from Input Streams</a></li>
+      <li><a href="#sending-messages-to-output-streams">Sending Messages to Output Streams</a></li>
+      <li><a href="#accessing-tables">Accessing Tables</a></li>
+    </ul>
+  </li>
+  <li><a href="#legacy-applications">Legacy Applications</a></li>
 </ul>
 
 <h3 id="introduction">Introduction</h3>
-
-<p>Samza&rsquo;s powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. When using the Low Level Task API, you implement a <a href="javadocs/org/apache/samza/application/TaskApplication">TaskApplication</a>. The processing logic is defined as either a <a href="javadocs/org/apache/samza/task/StreamTask">StreamTask</a> or an <a href="javadocs/org/apache/samza/task/AsyncStreamTask">AsyncStreamTask</a>.</p>
+<p>Samza’s powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. When using the Low Level Task API, you implement a <a href="javadocs/org/apache/samza/application/TaskApplication">TaskApplication</a>. The processing logic is defined as either a <a href="javadocs/org/apache/samza/task/StreamTask">StreamTask</a> or an <a href="javadocs/org/apache/samza/task/AsyncStreamTask">AsyncStreamTask</a>.</p>
 
 <h3 id="code-examples">Code Examples</h3>
 
-<p>The <a href="https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/task/application">Hello Samza</a> Wikipedia applications demonstrate how to use Samza&rsquo;s Low Level Task API. These applications consume various events from Wikipedia, transform them, and calculates several statistics about them.  </p>
+<p>The <a href="https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/task/application">Hello Samza</a> Wikipedia applications demonstrate how to use Samza’s Low Level Task API. These applications consume various events from Wikipedia, transform them, and calculates several statistics about them.</p>
 
 <ul>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java">WikipediaFeedTaskApplication</a> demonstrates how to consume multiple Wikipedia event streams and merge them to an Apache Kafka topic. </p></li>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java">WikipediaParserTaskApplication</a> demonstrates how to project the incoming events from the Apache Kafka topic to a custom JSON data type.</p></li>
-<li><p>The <a href="https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java">WikipediaStatsTaskApplication</a> demonstrates how to calculate and emit periodic statistics about the incoming events while using a local KV store for durability.</p></li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java">WikipediaFeedTaskApplication</a> demonstrates how to consume multiple Wikipedia event streams and merge them to an Apache Kafka topic.</p>
+  </li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java">WikipediaParserTaskApplication</a> demonstrates how to project the incoming events from the Apache Kafka topic to a custom JSON data type.</p>
+  </li>
+  <li>
+    <p>The <a href="https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java">WikipediaStatsTaskApplication</a> demonstrates how to calculate and emit periodic statistics about the incoming events while using a local KV store for durability.</p>
+  </li>
 </ul>
 
 <h3 id="key-concepts">Key Concepts</h3>
 
 <h4 id="taskapplication">TaskApplication</h4>
 
-<p>A <a href="javadocs/org/apache/samza/application/TaskApplication">TaskApplication</a> describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza&rsquo;s Low Level Task API.</p>
+<p>A <a href="javadocs/org/apache/samza/application/TaskApplication">TaskApplication</a> describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza’s Low Level Task API.</p>
 
 <p>A typical TaskApplication implementation consists of the following stages:</p>
 
 <ol>
-<li>Configuring the inputs, outputs and state (tables) using the appropriate <a href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>s, <a href="javadocs/org/apache/samza/descriptors/InputDescriptor">InputDescriptor</a>s, <a href="javadocs/org/apache/samza/system/descriptors/OutputDescriptor">OutputDescriptor</a>s and <a href="javadocs/org/apache/samza/table/descriptors/TableDescriptor">TableDescriptor</a>s.</li>
-<li>Adding the descriptors above to the provided <a href="javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor">TaskApplicationDescriptor</a></li>
-<li>Defining the processing logic in a <a href="javadocs/org/apache/samza/task/StreamTask">StreamTask</a> or an <a href="javadocs/org/apache/samza/task/AsyncStreamTask">AsyncStreamTask</a> implementation, and adding its corresponding <a href="javadocs/org/apache/samza/task/StreamTaskFactory">StreamTaskFactory</a> or <a href="javadocs/org/apache/samza/task/AsyncStreamTaskFactory">AsyncStreamTaskFactory</a> to the TaskApplicationDescriptor.</li>
+  <li>Configuring the inputs, outputs and state (tables) using the appropriate <a href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>s, <a href="javadocs/org/apache/samza/descriptors/InputDescriptor">InputDescriptor</a>s, <a href="javadocs/org/apache/samza/system/descriptors/OutputDescriptor">OutputDescriptor</a>s and <a href="javadocs/org/apache/samza/table/descriptors/TableDescriptor">TableDescriptor</a>s.</li>
+  <li>Adding the descriptors above to the provided <a href="javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor">TaskApplicationDescriptor</a></li>
+  <li>Defining the processing logic in a <a href="javadocs/org/apache/samza/task/StreamTask">StreamTask</a> or an <a href="javadocs/org/apache/samza/task/AsyncStreamTask">AsyncStreamTask</a> implementation, and adding its corresponding <a href="javadocs/org/apache/samza/task/StreamTaskFactory">StreamTaskFactory</a> or <a href="javadocs/org/apache/samza/task/AsyncStreamTaskFactory">AsyncStreamTaskFactory</a> to the TaskApplicationDescriptor.</li>
 </ol>
 
-<p>The following example TaskApplication removes page views with &ldquo;bad URLs&rdquo; from the input stream:</p>
+<p>The following example TaskApplication removes page views with “bad URLs” from the input stream:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
-    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilter</span> <span class="kd">implements</span> <span class="n">TaskApplication</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
+    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilter</span> <span class="kd">implements</span> <span class="nc">TaskApplication</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">TaskApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="nc">TaskApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
         <span class="c1">// Step 1: configure the inputs and outputs using descriptors</span>
-        <span class="n">KafkaSystemDescriptor</span> <span class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">)</span>
-            <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">&quot;...&quot;</span><span class="o">))</span>
-            <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">&quot;...&quot;</span><span class="o">,</span> <span class="s">&quot;...&quot;</span><span class="o">));</span>
-        <span class="n">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="n">PageViewEvent</span><span class="o">&gt;</span> <span class="n">kid</span> <span class="o">=</span> 
-            <span class="n">ksd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;pageViewEvent&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
-        <span class="n">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="n">PageViewEvent</span><span class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
-            <span class="n">ksd</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">&quot;goodPageViewEvent&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span>
-        <span class="n">RocksDbTableDescriptor</span> <span class="n">badUrls</span> <span class="o">=</span> 
-            <span class="k">new</span> <span class="n">RocksDbTableDescriptor</span><span class="o">(</span><span class="err">“</span><span class="n">badUrls</span><span class="err">”</span><span class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">());</span>
+        <span class="nc">KafkaSystemDescriptor</span> <span class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"..."</span><span class="o">))</span>
+            <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"..."</span><span class="o">,</span> <span class="s">"..."</span><span class="o">));</span>
+        <span class="nc">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="nc">PageViewEvent</span><span class="o">&gt;</span> <span class="n">kid</span> <span class="o">=</span> 
+            <span class="n">ksd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"pageViewEvent"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
+        <span class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="nc">PageViewEvent</span><span class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
+            <span class="n">ksd</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"goodPageViewEvent"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageViewEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span>
+        <span class="nc">RocksDbTableDescriptor</span> <span class="n">badUrls</span> <span class="o">=</span> 
+            <span class="k">new</span> <span class="nf">RocksDbTableDescriptor</span><span class="o">(</span><span class="err">“</span><span class="n">badUrls</span><span class="err">”</span><span class="o">,</span> <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">IntegerSerde</span><span class="o">());</span>
             
         <span class="c1">// Step 2: Add input, output streams and tables</span>
         <span class="n">appDescriptor</span>
@@ -726,18 +744,17 @@
             <span class="o">.</span><span class="na">withTable</span><span class="o">(</span><span class="n">badUrls</span><span class="o">)</span>
         
         <span class="c1">// Step 3: define the processing logic</span>
-        <span class="n">appDescriptor</span><span class="o">.</span><span class="na">withTaskFactory</span><span class="o">(</span><span class="k">new</span> <span class="n">PageViewFilterTaskFactory</span><span class="o">());</span>
+        <span class="n">appDescriptor</span><span class="o">.</span><span class="na">withTaskFactory</span><span class="o">(</span><span class="k">new</span> <span class="nc">PageViewFilterTaskFactory</span><span class="o">());</span>
       <span class="o">}</span>
     <span class="o">}</span></code></pre></figure>
 
 <h4 id="taskfactory">TaskFactory</h4>
+<p>Your <a href="javadocs/org/apache/samza/task/TaskFactory">TaskFactory</a> will be  used to create instances of your Task in each of Samza’s processors. If you’re implementing a StreamTask, you can provide a <a href="javadocs/org/apache/samza/task/StreamTaskFactory">StreamTaskFactory</a>. Similarly, if you’re implementing an AsyncStreamTask, you can provide an <a href="javadocs/org/apache/samza/task/AsyncStreamTaskFactory">AsyncStreamTaskFactory</a>. For example:</p>
 
-<p>Your <a href="javadocs/org/apache/samza/task/TaskFactory">TaskFactory</a> will be  used to create instances of your Task in each of Samza&rsquo;s processors. If you&rsquo;re implementing a StreamTask, you can provide a <a href="javadocs/org/apache/samza/task/StreamTaskFactory">StreamTaskFactory</a>. Similarly, if you&rsquo;re implementing an AsyncStreamTask, you can provide an <a href="javadocs/org/apache/samza/task/AsyncStreamTaskFactory">AsyncStreamTaskFactory</a>. For example:</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilterTaskFactory</span> <span class="kd">implements</span> <span class="n">StreamTaskFactory</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilterTaskFactory</span> <span class="kd">implements</span> <span class="nc">StreamTaskFactory</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">StreamTask</span> <span class="nf">createInstance</span><span class="o">()</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="k">new</span> <span class="n">PageViewFilterTask</span><span class="o">();</span>
+      <span class="kd">public</span> <span class="nc">StreamTask</span> <span class="nf">createInstance</span><span class="o">()</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="nf">PageViewFilterTask</span><span class="o">();</span>
       <span class="o">}</span>
     <span class="o">}</span>
     </code></pre></figure>
@@ -747,41 +764,39 @@
 <p>Your processing logic can be implemented in a <a href="javadocs/org/apache/samza/task/StreamTask">StreamTask</a> or an <a href="javadocs/org/apache/samza/task/AsyncStreamTask">AsyncStreamTask</a>.</p>
 
 <h5 id="streamtask">StreamTask</h5>
-
 <p>You can implement a <a href="javadocs/org/apache/samza/task/StreamTask">StreamTask</a> for synchronous message processing. Samza delivers messages to the task one at a time, and considers each message to be processed when the process method call returns. For example:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilterTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilterTask</span> <span class="kd">implements</span> <span class="nc">StreamTask</span> <span class="o">{</span>
       <span class="nd">@Override</span>
       <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span>
-          <span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> 
-          <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> 
-          <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+          <span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> 
+          <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> 
+          <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
           
           <span class="c1">// process the message in the envelope synchronously</span>
       <span class="o">}</span>
     <span class="o">}</span></code></pre></figure>
 
-<p>Note that synchronous message processing does not imply sequential execution. Multiple instances of your Task class implementation may still run concurrently within a container. </p>
+<p>Note that synchronous message processing does not imply sequential execution. Multiple instances of your Task class implementation may still run concurrently within a container.</p>
 
 <h5 id="asyncstreamtask">AsyncStreamTask</h5>
-
 <p>You can implement a <a href="javadocs/org/apache/samza/task/AsyncStreamTask">AsyncStreamTask</a> for asynchronous message processing. This can be useful when you need to perform long running I/O operations to process a message, e.g., making an http request. For example:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">AsyncPageViewFilterTask</span> <span class="kd">implements</span> <span class="n">AsyncStreamTask</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">AsyncPageViewFilterTask</span> <span class="kd">implements</span> <span class="nc">AsyncStreamTask</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processAsync</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span>
-          <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
-          <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">,</span>
-          <span class="n">TaskCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processAsync</span><span class="o">(</span><span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span>
+          <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
+          <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">,</span>
+          <span class="nc">TaskCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{</span>
           
           <span class="c1">// process message asynchronously</span>
           <span class="c1">// invoke callback.complete or callback.failure upon completion</span>
       <span class="o">}</span>
     <span class="o">}</span></code></pre></figure>
 
-<p>Samza delivers the incoming message and a <a href="javadocs/org/apache/samza/task/TaskCallback">TaskCallback</a> with the processAsync() method call, and considers each message to be processed when its corresponding callback.complete() or callback.failure() has been invoked. If callback.failure() is invoked, or neither callback.complete() or callback.failure() is invoked within <code>task.callback.ms</code> milliseconds, Samza will shut down the running Container. </p>
+<p>Samza delivers the incoming message and a <a href="javadocs/org/apache/samza/task/TaskCallback">TaskCallback</a> with the processAsync() method call, and considers each message to be processed when its corresponding callback.complete() or callback.failure() has been invoked. If callback.failure() is invoked, or neither callback.complete() or callback.failure() is invoked within <code>task.callback.ms</code> milliseconds, Samza will shut down the running Container.</p>
 
-<p>If configured, Samza will keep up to <code>task.max.concurrency</code> number of messages processing asynchronously at a time within each Task Instance. Note that while message delivery (i.e., processAsync invocation) is guaranteed to be in-order within a stream partition, message processing may complete out of order when setting <code>task.max.concurrency</code> &gt; 1. </p>
+<p>If configured, Samza will keep up to <code>task.max.concurrency</code> number of messages processing asynchronously at a time within each Task Instance. Note that while message delivery (i.e., processAsync invocation) is guaranteed to be in-order within a stream partition, message processing may complete out of order when setting <code>task.max.concurrency</code> &gt; 1.</p>
 
 <p>For more details on asynchronous and concurrent processing, see the <a href="../../../tutorials/latest/samza-async-user-guide">Samza Async API and Multithreading User Guide</a>.</p>
 
@@ -790,40 +805,36 @@
 <p>There are a few other interfaces you can implement in your StreamTask or AsyncStreamTask that provide additional functionality.</p>
 
 <h6 id="initabletask">InitableTask</h6>
-
 <p>You can implement the <a href="javadocs/org/apache/samza/task/InitableTask">InitableTask</a> interface to access the <a href="javadocs/org/apache/samza/context/Context">Context</a>. Context provides access to any runtime objects you need in your task,
-whether they&rsquo;re provided by the framework, or your own.</p>
+whether they’re provided by the framework, or your own.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
     <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">InitableTask</span> <span class="o">{</span>
-      <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span><span class="o">;</span>
+      <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Context</span> <span class="n">context</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span><span class="o">;</span>
     <span class="o">}</span>
     </code></pre></figure>
 
 <h6 id="closabletask">ClosableTask</h6>
+<p>You can implement the <a href="javadocs/org/apache/samza/task/ClosableTask">ClosableTask</a> to clean up any runtime state during shutdown. This interface is deprecated. It’s recommended to use the <a href="javadocs/org/apache/samza/context/ApplicationContainerContext">ApplicationContainerContext</a> and <a href="javadocs/org/apache/samza/context/ApplicationContainerContext">ApplicationTaskContext</a> APIs to manage the lifecycle of any runtime objects.</p>
 
-<p>You can implement the <a href="javadocs/org/apache/samza/task/ClosableTask">ClosableTask</a> to clean up any runtime state during shutdown. This interface is deprecated. It&rsquo;s recommended to use the <a href="javadocs/org/apache/samza/context/ApplicationContainerContext">ApplicationContainerContext</a> and <a href="javadocs/org/apache/samza/context/ApplicationContainerContext">ApplicationTaskContext</a> APIs to manage the lifecycle of any runtime objects.</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">ClosableTask</span> <span class="o">{</span>
-      <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span><span class="o">;</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">ClosableTask</span> <span class="o">{</span>
+      <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="nc">Exception</span><span class="o">;</span>
     <span class="o">}</span>
     </code></pre></figure>
 
 <h6 id="windowabletask">WindowableTask</h6>
-
 <p>You can implement the <a href="javadocs/org/apache/samza/task/WindowableTask">WindowableTask</a> interface to implement processing logic that is invoked periodically by the framework.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
     <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">WindowableTask</span> <span class="o">{</span>
-      <span class="kt">void</span> <span class="nf">window</span><span class="o">(</span><span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span><span class="o">;</span>
+      <span class="kt">void</span> <span class="nf">window</span><span class="o">(</span><span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span><span class="o">;</span>
     <span class="o">}</span></code></pre></figure>
 
 <h6 id="endofstreamlistenertask">EndOfStreamListenerTask</h6>
+<p>You can implement the <a href="javadocs/org/apache/samza/task/EndOfStreamListenerTask">EndOfStreamListenerTask</a> interface to implement processing logic that is invoked when a Task Instance has reached the end of all input SystemStreamPartitions it’s consuming. This is typically relevant when running Samza as a batch job.</p>
 
-<p>You can implement the <a href="javadocs/org/apache/samza/task/EndOfStreamListenerTask">EndOfStreamListenerTask</a> interface to implement processing logic that is invoked when a Task Instance has reached the end of all input SystemStreamPartitions it&rsquo;s consuming. This is typically relevant when running Samza as a batch job.</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">EndOfStreamListenerTask</span> <span class="o">{</span>
-      <span class="kt">void</span> <span class="nf">onEndOfStream</span><span class="o">(</span><span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span><span class="o">;</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">EndOfStreamListenerTask</span> <span class="o">{</span>
+      <span class="kt">void</span> <span class="nf">onEndOfStream</span><span class="o">(</span><span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span><span class="o">;</span>
     <span class="o">}</span>
     </code></pre></figure>
 
@@ -831,27 +842,28 @@ whether they&rsquo;re provided by the fr
 
 <h4 id="receiving-messages-from-input-streams">Receiving Messages from Input Streams</h4>
 
-<p>Samza calls your Task instance&rsquo;s <a href="javadocs/org/apache/samza/task/StreamTask#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-">process</a> or <a href="javadocs/org/apache/samza/task/AsyncStreamTask#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-">processAsync</a> method with each incoming message on your input streams. The <a href="javadocs/org/apache/samza/system/IncomingMessageEnvelope">IncomingMessageEnvelope</a> can be used to obtain the following information: the de-serialized key, the de-serialized message, and the <a href="javadocs/org/apache/samza/system/SystemStreamPartition">SystemStreamPartition</a> that the message came from.</p>
+<p>Samza calls your Task instance’s <a href="javadocs/org/apache/samza/task/StreamTask#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-">process</a> or <a href="javadocs/org/apache/samza/task/AsyncStreamTask#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-">processAsync</a> method with each incoming message on your input streams. The <a href="javadocs/org/apache/samza/system/IncomingMessageEnvelope">IncomingMessageEnvelope</a> can be used to obtain the following information: the de-serialized key, the de-serialized message, and the <a href="javadocs/org/apache/samza/system/SystemStreamPartition">SystemStreamPartition</a> that the message came from.</p>
 
 <p>The key and message objects need to be cast to the correct type in your Task implementation based on the <a href="javadocs/org/apache/samza/serializers/Serde.html">Serde</a> provided for the InputDescriptor for the input stream.</p>
 
-<p>The <a href="javadocs/org/apache/samza/system/SystemStreamPartition">SystemStreamPartition</a> object tells you where the message came from. It consists of three parts:
-1. The <em>system</em>: the name of the system the message came from, as specified for the SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name.
-2. The <em>stream name</em>: the name of the stream (e.g., topic, queue) within the input system. This is the physical name of the stream, as specified for the InputDescriptor in your TaskApplication.
-3. The <a href="javadocs/org/apache/samza/Partition"><em>partition</em></a>: A stream is normally split into several partitions, and each partition is assigned to one task instance by Samza. </p>
+<p>The <a href="javadocs/org/apache/samza/system/SystemStreamPartition">SystemStreamPartition</a> object tells you where the message came from. It consists of three parts:</p>
+<ol>
+  <li>The <em>system</em>: the name of the system the message came from, as specified for the SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name.</li>
+  <li>The <em>stream name</em>: the name of the stream (e.g., topic, queue) within the input system. This is the physical name of the stream, as specified for the InputDescriptor in your TaskApplication.</li>
+  <li>The <a href="javadocs/org/apache/samza/Partition"><em>partition</em></a>: A stream is normally split into several partitions, and each partition is assigned to one task instance by Samza.</li>
+</ol>
 
 <p>If you have several input streams for your TaskApplication, you can use the SystemStreamPartition to determine what kind of message you’ve received.</p>
 
 <h4 id="sending-messages-to-output-streams">Sending Messages to Output Streams</h4>
-
 <p>To send a message to a stream, you first create an <a href="javadocs/org/apache/samza/system/OutgoingMessageEnvelope">OutgoingMessageEnvelope</a>. At a minimum, you need to provide the message you want to send, and the system and stream to send it to. Optionally you can specify the partitioning key and other parameters. See the <a href="javadocs/org/apache/samza/system/OutgoingMessageEnvelope">javadoc</a> for details.</p>
 
-<p>You can then send the OutgoingMessageEnvelope using the <a href="javadocs/org/apache/samza/task/MessageCollector">MessageCollector</a> provided with the process() or processAsync() call. You <strong>must</strong> use the MessageCollector delivered for the message you&rsquo;re currently processing. Holding on to a MessageCollector and reusing it later will cause your messages to not be sent correctly.  </p>
+<p>You can then send the OutgoingMessageEnvelope using the <a href="javadocs/org/apache/samza/task/MessageCollector">MessageCollector</a> provided with the process() or processAsync() call. You <strong>must</strong> use the MessageCollector delivered for the message you’re currently processing. Holding on to a MessageCollector and reusing it later will cause your messages to not be sent correctly.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    
     <span class="cm">/** When a task wishes to send a message, it uses this interface. */</span>
     <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MessageCollector</span> <span class="o">{</span>
-      <span class="kt">void</span> <span class="nf">send</span><span class="o">(</span><span class="n">OutgoingMessageEnvelope</span> <span class="n">envelope</span><span class="o">);</span>
+      <span class="kt">void</span> <span class="nf">send</span><span class="o">(</span><span class="nc">OutgoingMessageEnvelope</span> <span class="n">envelope</span><span class="o">);</span>
     <span class="o">}</span>
     </code></pre></figure>
 
@@ -862,75 +874,74 @@ whether they&rsquo;re provided by the fr
 <p>In the Low Level API, you can obtain and use a Table as follows:</p>
 
 <ol>
-<li>Use the appropriate TableDescriptor to specify the table properties.</li>
-<li>Register the TableDescriptor with the TaskApplicationDescriptor.</li>
-<li>Obtain a Table reference within the task implementation using <a href="javadocs/org/apache/samza/task/TaskContext#getTable-java.lang.String-">TaskContext.getTable()</a>. <a href="javadocs/org/apache/samza/task/TaskContext">TaskContext</a> is available via <a href="javadocs/org/apache/samza/context/Context#getTaskContext--">Context.getTaskContext()</a>, which in turn is available by implementing <a href="(javadocs/org/apache/samza/task/InitableTask#init-org.apache.samza.context.Context-)">InitiableTask. init()</a>.</li>
+  <li>Use the appropriate TableDescriptor to specify the table properties.</li>
+  <li>Register the TableDescriptor with the TaskApplicationDescriptor.</li>
+  <li>Obtain a Table reference within the task implementation using <a href="javadocs/org/apache/samza/task/TaskContext#getTable-java.lang.String-">TaskContext.getTable()</a>. <a href="javadocs/org/apache/samza/task/TaskContext">TaskContext</a> is available via <a href="javadocs/org/apache/samza/context/Context#getTaskContext--">Context.getTaskContext()</a>, which in turn is available by implementing <a href="(javadocs/org/apache/samza/task/InitableTask#init-org.apache.samza.context.Context-)">InitiableTask. init()</a>.</li>
 </ol>
 
 <p>For example:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilterTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span><span class="o">,</span> <span class="n">InitableTask</span> <span class="o">{</span>
-      <span class="kd">private</span> <span class="kd">final</span> <span class="n">SystemStream</span> <span class="n">outputStream</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SystemStream</span><span class="o">(</span><span class="err">“</span><span class="n">kafka</span><span class="err">”</span><span class="o">,</span> <span class="err">“</span><span class="n">goodPageViewEvent</span><span class="err">”</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewFilterTask</span> <span class="kd">implements</span> <span class="nc">StreamTask</span><span class="o">,</span> <span class="nc">InitableTask</span> <span class="o">{</span>
+      <span class="kd">private</span> <span class="kd">final</span> <span class="nc">SystemStream</span> <span class="n">outputStream</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SystemStream</span><span class="o">(</span><span class="err">“</span><span class="n">kafka</span><span class="err">”</span><span class="o">,</span> <span class="err">“</span><span class="n">goodPageViewEvent</span><span class="err">”</span><span class="o">);</span>
       
-      <span class="kd">private</span> <span class="n">ReadWriteTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">badUrlsTable</span><span class="o">;</span>
+      <span class="kd">private</span> <span class="nc">ReadWriteTable</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">badUrlsTable</span><span class="o">;</span>
       
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
-        <span class="n">badUrlsTable</span> <span class="o">=</span> <span class="o">(</span><span class="n">ReadWriteTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;)</span> <span class="n">context</span><span class="o">.</span><span class="na">getTaskContext</span><span class="o">().</span><span class="na">getTable</span><span class="o">(</span><span class="s">&quot;badUrls&quot;</span><span class="o">);</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">badUrlsTable</span> <span class="o">=</span> <span class="o">(</span><span class="nc">ReadWriteTable</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;)</span> <span class="n">context</span><span class="o">.</span><span class="na">getTaskContext</span><span class="o">().</span><span class="na">getTable</span><span class="o">(</span><span class="s">"badUrls"</span><span class="o">);</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
-        <span class="n">String</span> <span class="n">key</span> <span class="o">=</span> <span class="o">(</span><span class="n">String</span><span class="o">)</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+        <span class="nc">String</span> <span class="n">key</span> <span class="o">=</span> <span class="o">(</span><span class="nc">String</span><span class="o">)</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span>
         <span class="k">if</span> <span class="o">(</span><span class="n">badUrlsTable</span><span class="o">.</span><span class="na">containsKey</span><span class="o">(</span><span class="n">key</span><span class="o">))</span> <span class="o">{</span>
           <span class="c1">// skip the message, increment the counter, do not send it</span>
           <span class="n">badPageUrlTable</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">badPageUrlTable</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">key</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span><span class="o">);</span>
         <span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
-          <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span class="n">outputStream</span><span class="o">,</span> <span class="n">key</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">getValue</span><span class="o">()));</span>   <span class="o">}</span>
+          <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="nc">OutgoingMessageEnvelope</span><span class="o">(</span><span class="n">outputStream</span><span class="o">,</span> <span class="n">key</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">getValue</span><span class="o">()));</span>   <span class="o">}</span>
       <span class="o">}</span>
     <span class="o">}</span></code></pre></figure>
 
 <h3 id="side-inputs-for-local-tables">Side Inputs for Local Tables</h3>
 
 <p>For populating a local <a href="javadocs/org/apache/samza/table/Table">Table</a> with secondary data sources, we can use side inputs to specify the source stream. Additionally, the table descriptor also takes
-in a <code>SideInputsProcessor</code> that will be applied before writing the entries to the table. The <code>TableDescriptor</code> that is registered with the <code>TaskApplicationDescriptor</code> can be used to specify side input properties.</p>
+in a <code class="language-plaintext highlighter-rouge">SideInputsProcessor</code> that will be applied before writing the entries to the table. The <code class="language-plaintext highlighter-rouge">TableDescriptor</code> that is registered with the <code class="language-plaintext highlighter-rouge">TaskApplicationDescriptor</code> can be used to specify side input properties.</p>
 
-<p>The following code snippet shows a sample <code>TableDescriptor</code> for a local table that is backed by side inputs.</p>
+<p>The following code snippet shows a sample <code class="language-plaintext highlighter-rouge">TableDescriptor</code> for a local table that is backed by side inputs.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">RocksDbTableDescriptor</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Profile</span><span class="o">&gt;</span> <span class="n">tableDesc</span> <span class="o">=</span> 
-      <span class="k">new</span> <span class="n">RocksDbTableDescriptor</span><span class="o">(</span><span class="err">“</span><span class="n">viewCounts</span><span class="err">”</span><span class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">Profile</span><span class="o">.</span><span class="na">class</span><span class="o">)))</span>
-        <span class="o">.</span><span class="na">withSideInput</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">&quot;profile&quot;</span><span class="o">))</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">RocksDbTableDescriptor</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;</span> <span class="n">tableDesc</span> <span class="o">=</span> 
+      <span class="k">new</span> <span class="nf">RocksDbTableDescriptor</span><span class="o">(</span><span class="err">“</span><span class="n">viewCounts</span><span class="err">”</span><span class="o">,</span> <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">Profile</span><span class="o">.</span><span class="na">class</span><span class="o">)))</span>
+        <span class="o">.</span><span class="na">withSideInput</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"profile"</span><span class="o">))</span>
         <span class="o">.</span><span class="na">withSideInputsProcessor</span><span class="o">((</span><span class="n">message</span><span class="o">,</span> <span class="n">store</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
           <span class="o">...</span>
         <span class="o">});</span></code></pre></figure>
- 
 
 <h3 id="legacy-applications">Legacy Applications</h3>
 
 <p>For legacy Low Level API applications, you can continue specifying your system, stream and store properties along with your task.class in configuration. An incomplete example of configuration for legacy task application looks like this (see the <a href="../jobs/configuration.html">configuration</a> documentation for more detail):</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span>    <span class="c"># This is the Task class that Samza will instantiate when the job is run</span>
-<span class="na">    task.class</span><span class="o">=</span><span class="s">com.example.samza.PageViewFilterTask</span>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">    # This is the Task class that Samza will instantiate when the job is run
+    task.class=com.example.samza.PageViewFilterTask
 
-    <span class="c"># Define a system called &quot;kafka&quot; (you can give it any name, and you can define</span>
-    <span class="c"># multiple systems if you want to process messages from different sources)</span>
-<span class="na">    systems.kafka.samza.factory</span><span class="o">=</span><span class="s">org.apache.samza.system.kafka.KafkaSystemFactory</span>
+    # Define a system called "kafka" (you can give it any name, and you can define
+    # multiple systems if you want to process messages from different sources)
+    systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 
-    <span class="c"># The job consumes a topic called &quot;PageViewEvent&quot; from the &quot;kafka&quot; system</span>
-<span class="na">    task.inputs</span><span class="o">=</span><span class="s">kafka.PageViewEvent</span>
+    # The job consumes a topic called "PageViewEvent" from the "kafka" system
+    task.inputs=kafka.PageViewEvent
 
-    <span class="c"># Define a serializer/deserializer called &quot;json&quot; which parses JSON messages</span>
-<span class="na">    serializers.registry.json.class</span><span class="o">=</span><span class="s">org.apache.samza.serializers.JsonSerdeFactory</span>
+    # Define a serializer/deserializer called "json" which parses JSON messages
+    serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 
-    <span class="c"># Use the &quot;json&quot; serializer for messages in the &quot;PageViewEvent&quot; topic</span>
-<span class="na">    systems.kafka.streams.PageViewEvent.samza.msg.serde</span><span class="o">=</span><span class="s">json</span>
+    # Use the "json" serializer for messages in the "PageViewEvent" topic
+    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
     
-    <span class="c"># Use &quot;ProfileEvent&quot; from the &quot;kafka&quot; system for side inputs for &quot;profile-store&quot;</span>
-<span class="na">    stores.profile-store.side.inputs</span><span class="o">=</span><span class="s">kafka.ProfileEvent</span>
+    # Use "ProfileEvent" from the "kafka" system for side inputs for "profile-store"
+    stores.profile-store.side.inputs=kafka.ProfileEvent
     
-    <span class="c"># Use &quot;MySideInputsProcessorFactory&quot; to instantiate the &quot;SideInputsProcessor&quot; </span>
-    <span class="c"># that will applied on the &quot;ProfileEvent&quot; before writing to &quot;profile-store&quot;</span>
-<span class="na">    stores.profile-store.side.inputs.processor.factory</span><span class="o">=</span><span class="s">org.apache.samza.MySideInputsProcessorFactory</span>
+    # Use "MySideInputsProcessorFactory" to instantiate the "SideInputsProcessor" 
+    # that will applied on the "ProfileEvent" before writing to "profile-store"
+    stores.profile-store.side.inputs.processor.factory=org.apache.samza.MySideInputsProcessorFactory
     </code></pre></figure>