You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by aj...@apache.org on 2023/01/18 19:33:31 UTC

svn commit: r1906774 [45/49] - in /samza/site: ./ archive/ blog/ case-studies/ community/ contribute/ img/latest/learn/documentation/api/ learn/documentation/latest/ learn/documentation/latest/api/ learn/documentation/latest/api/javadocs/ learn/documen...

Modified: samza/site/learn/tutorials/latest/samza-async-user-guide.html
URL: http://svn.apache.org/viewvc/samza/site/learn/tutorials/latest/samza-async-user-guide.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/tutorials/latest/samza-async-user-guide.html (original)
+++ samza/site/learn/tutorials/latest/samza-async-user-guide.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -551,117 +557,117 @@
 
 <p>If your job process involves synchronous IO, or blocking IO, you can simply configure the Samza build-in thread pool to run your tasks in parallel. In the following example, SyncRestTask uses Jersey client to makes rest calls in each process().</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">SyncRestTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span><span class="o">,</span> <span class="n">InitableTask</span><span class="o">,</span> <span class="n">ClosableTask</span> <span class="o">{</span>
-  <span class="kd">private</span> <span class="n">Client</span> <span class="n">client</span><span class="o">;</span>
-  <span class="kd">private</span> <span class="n">WebTarget</span> <span class="n">target</span><span class="o">;</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">SyncRestTask</span> <span class="kd">implements</span> <span class="nc">StreamTask</span><span class="o">,</span> <span class="nc">InitableTask</span><span class="o">,</span> <span class="nc">ClosableTask</span> <span class="o">{</span>
+  <span class="kd">private</span> <span class="nc">Client</span> <span class="n">client</span><span class="o">;</span>
+  <span class="kd">private</span> <span class="nc">WebTarget</span> <span class="n">target</span><span class="o">;</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Config</span> <span class="n">config</span><span class="o">,</span> <span class="n">TaskContext</span> <span class="n">taskContext</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-    <span class="n">client</span> <span class="o">=</span> <span class="n">ClientBuilder</span><span class="o">.</span><span class="na">newClient</span><span class="o">();</span>
-    <span class="n">target</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">target</span><span class="o">(</span><span class="s">&quot;http://example.com/resource/&quot;</span><span class="o">).</span><span class="na">path</span><span class="o">(</span><span class="s">&quot;hello&quot;</span><span class="o">);</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Config</span> <span class="n">config</span><span class="o">,</span> <span class="nc">TaskContext</span> <span class="n">taskContext</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
+    <span class="n">client</span> <span class="o">=</span> <span class="nc">ClientBuilder</span><span class="o">.</span><span class="na">newClient</span><span class="o">();</span>
+    <span class="n">target</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">target</span><span class="o">(</span><span class="s">"http://example.com/resource/"</span><span class="o">).</span><span class="na">path</span><span class="o">(</span><span class="s">"hello"</span><span class="o">);</span>
   <span class="o">}</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">Response</span> <span class="n">response</span> <span class="o">=</span> <span class="n">target</span><span class="o">.</span><span class="na">request</span><span class="o">().</span><span class="na">get</span><span class="o">();</span>
-    <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Response status code &quot;</span> <span class="o">+</span> <span class="n">response</span><span class="o">.</span><span class="na">getStatus</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot; received.&quot;</span><span class="o">);</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+    <span class="nc">Response</span> <span class="n">response</span> <span class="o">=</span> <span class="n">target</span><span class="o">.</span><span class="na">request</span><span class="o">().</span><span class="na">get</span><span class="o">();</span>
+    <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Response status code "</span> <span class="o">+</span> <span class="n">response</span><span class="o">.</span><span class="na">getStatus</span><span class="o">()</span> <span class="o">+</span> <span class="s">" received."</span><span class="o">);</span>
   <span class="o">}</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
     <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
 <p>By default Samza will run this task sequentially in a single thread. In below we configure the thread pool of size 16 to run the tasks in parallel:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># Thread pool to run synchronous tasks in parallel.</span>
-<span class="na">job.container.thread.pool.size</span><span class="o">=</span><span class="s">16</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># Thread pool to run synchronous tasks in parallel.
+job.container.thread.pool.size=16</code></pre></figure>
 
 <p><strong>NOTE:</strong> The thread pool will be used to run all the synchronous operations of a task, including StreamTask.process(), WindowableTask.window(), and internally Task.commit(). This is for maximizing the parallelism between tasks as well as reducing the blocking time. When running tasks in multithreading, Samza still guarantees the in-order processing of the messages within a task by default.</p>
 
 <h3 id="asynchronous-process-with-asyncstreamtask-api">Asynchronous Process with AsyncStreamTask API</h3>
 
-<p>If your job process is asynchronous, e.g. making non-blocking remote IO calls, <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/AsyncStreamTask.html">AsyncStreamTask</a> interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it&rsquo;s complete.</p>
+<p>If your job process is asynchronous, e.g. making non-blocking remote IO calls, <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/AsyncStreamTask.html">AsyncStreamTask</a> interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it’s complete.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">AsyncRestTask</span> <span class="kd">implements</span> <span class="n">AsyncStreamTask</span><span class="o">,</span> <span class="n">InitableTask</span><span class="o">,</span> <span class="n">ClosableTask</span> <span class="o">{</span>
-  <span class="kd">private</span> <span class="n">Client</span> <span class="n">client</span><span class="o">;</span>
-  <span class="kd">private</span> <span class="n">WebTarget</span> <span class="n">target</span><span class="o">;</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">AsyncRestTask</span> <span class="kd">implements</span> <span class="nc">AsyncStreamTask</span><span class="o">,</span> <span class="nc">InitableTask</span><span class="o">,</span> <span class="nc">ClosableTask</span> <span class="o">{</span>
+  <span class="kd">private</span> <span class="nc">Client</span> <span class="n">client</span><span class="o">;</span>
+  <span class="kd">private</span> <span class="nc">WebTarget</span> <span class="n">target</span><span class="o">;</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Config</span> <span class="n">config</span><span class="o">,</span> <span class="n">TaskContext</span> <span class="n">taskContext</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-    <span class="n">client</span> <span class="o">=</span> <span class="n">ClientBuilder</span><span class="o">.</span><span class="na">newClient</span><span class="o">();</span>
-    <span class="n">target</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">target</span><span class="o">(</span><span class="s">&quot;http://example.com/resource/&quot;</span><span class="o">).</span><span class="na">path</span><span class="o">(</span><span class="s">&quot;hello&quot;</span><span class="o">);</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Config</span> <span class="n">config</span><span class="o">,</span> <span class="nc">TaskContext</span> <span class="n">taskContext</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
+    <span class="n">client</span> <span class="o">=</span> <span class="nc">ClientBuilder</span><span class="o">.</span><span class="na">newClient</span><span class="o">();</span>
+    <span class="n">target</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">target</span><span class="o">(</span><span class="s">"http://example.com/resource/"</span><span class="o">).</span><span class="na">path</span><span class="o">(</span><span class="s">"hello"</span><span class="o">);</span>
   <span class="o">}</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processAsync</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
-      <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">,</span> <span class="kd">final</span> <span class="n">TaskCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">target</span><span class="o">.</span><span class="na">request</span><span class="o">().</span><span class="na">async</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="k">new</span> <span class="n">InvocationCallback</span><span class="o">&lt;</span><span class="n">Response</span><span class="o">&gt;()</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processAsync</span><span class="o">(</span><span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
+      <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">,</span> <span class="kd">final</span> <span class="nc">TaskCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">target</span><span class="o">.</span><span class="na">request</span><span class="o">().</span><span class="na">async</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="k">new</span> <span class="nc">InvocationCallback</span><span class="o">&lt;</span><span class="nc">Response</span><span class="o">&gt;()</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">completed</span><span class="o">(</span><span class="n">Response</span> <span class="n">response</span><span class="o">)</span> <span class="o">{</span>
-        <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Response status code &quot;</span> <span class="o">+</span> <span class="n">response</span><span class="o">.</span><span class="na">getStatus</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot; received.&quot;</span><span class="o">);</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">completed</span><span class="o">(</span><span class="nc">Response</span> <span class="n">response</span><span class="o">)</span> <span class="o">{</span>
+        <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Response status code "</span> <span class="o">+</span> <span class="n">response</span><span class="o">.</span><span class="na">getStatus</span><span class="o">()</span> <span class="o">+</span> <span class="s">" received."</span><span class="o">);</span>
         <span class="n">callback</span><span class="o">.</span><span class="na">complete</span><span class="o">();</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">failed</span><span class="o">(</span><span class="n">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">{</span>
-        <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Invocation failed.&quot;</span><span class="o">);</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">failed</span><span class="o">(</span><span class="nc">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">{</span>
+        <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Invocation failed."</span><span class="o">);</span>
         <span class="n">callback</span><span class="o">.</span><span class="na">failure</span><span class="o">(</span><span class="n">throwable</span><span class="o">);</span>
       <span class="o">}</span>
     <span class="o">});</span>
   <span class="o">}</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
     <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
 <p>In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/TaskCallback.html">TaskCallback</a> to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.</span>
-<span class="na">task.callback.timeout.ms</span><span class="o">=</span><span class="s">5000</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
+task.callback.timeout.ms=5000</code></pre></figure>
 
-<p><strong>NOTE:</strong> Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won&rsquo;t be called until the previous processAsync() callback has been triggered.</p>
+<p><strong>NOTE:</strong> Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won’t be called until the previous processAsync() callback has been triggered.</p>
 
 <h3 id="asynchronous-process-in-high-level-api">Asynchronous Process in High Level API</h3>
 
 <p>If your processing logic is asynchronous, e.g. it makes non-blocking remote calls, you can implement it using the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/AsyncFlatMapFunction.html">AsyncFlatMapFunction</a>. The following example illustrates an application that processes Wikipedia feed updates and invokes a remote service to standardize the updates and sends the standardized events to Wikipedia.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WikipediaAsyncStandardizer</span> <span class="kd">implements</span> <span class="n">StreamApplication</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WikipediaAsyncStandardizer</span> <span class="kd">implements</span> <span class="nc">StreamApplication</span> <span class="o">{</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">StreamApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="nc">StreamApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
     <span class="c1">// Define a SystemDescriptor for Wikipedia data</span>
-    <span class="n">WikipediaSystemDescriptor</span> <span class="n">wikipediaSystemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">WikipediaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;irc.wikimedia.org&quot;</span><span class="o">,</span> <span class="mi">6667</span><span class="o">);</span>
+    <span class="nc">WikipediaSystemDescriptor</span> <span class="n">wikipediaSystemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WikipediaSystemDescriptor</span><span class="o">(</span><span class="s">"irc.wikimedia.org"</span><span class="o">,</span> <span class="mi">6667</span><span class="o">);</span>
     <span class="c1">// Define InputDescriptors for consuming wikipedia data</span>
-    <span class="n">WikipediaInputDescriptor</span> <span class="n">wikipediaInputDescriptor</span> <span class="o">=</span> <span class="n">wikipediaSystemDescriptor</span>
-        <span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;en-wikipedia&quot;</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">withChannel</span><span class="o">(</span><span class="s">&quot;#en.wikipedia&quot;</span><span class="o">);</span>
+    <span class="nc">WikipediaInputDescriptor</span> <span class="n">wikipediaInputDescriptor</span> <span class="o">=</span> <span class="n">wikipediaSystemDescriptor</span>
+        <span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"en-wikipedia"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withChannel</span><span class="o">(</span><span class="s">"#en.wikipedia"</span><span class="o">);</span>
     <span class="c1">// Define OutputDescriptor for producing wikipedia data</span>
-    <span class="n">WikipediaOutputDescriptor</span> <span class="n">wikipediaOutputDescriptor</span> <span class="o">=</span> <span class="n">wikipediaSystemDescriptor</span>
-        <span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">&quot;en-wikipedia-standardized&quot;</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">withChannel</span><span class="o">(</span><span class="s">&quot;#en.wikipedia.standardized&quot;</span><span class="o">);</span>
+    <span class="nc">WikipediaOutputDescriptor</span> <span class="n">wikipediaOutputDescriptor</span> <span class="o">=</span> <span class="n">wikipediaSystemDescriptor</span>
+        <span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"en-wikipedia-standardized"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withChannel</span><span class="o">(</span><span class="s">"#en.wikipedia.standardized"</span><span class="o">);</span>
 
     <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">wikipediaInputDescriptor</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">WikipediaFeedEvent</span><span class="o">::</span><span class="n">isUpdate</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">flatMapAsync</span><span class="o">(</span><span class="k">new</span> <span class="n">AsyncStandardizerFunction</span><span class="o">())</span>
+        <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="nl">WikipediaFeedEvent:</span><span class="o">:</span><span class="n">isUpdate</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">flatMapAsync</span><span class="o">(</span><span class="k">new</span> <span class="nc">AsyncStandardizerFunction</span><span class="o">())</span>
         <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">wikipediaOutputDescriptor</span><span class="o">);</span>
   <span class="o">}</span>
 
-  <span class="kd">static</span> <span class="kd">class</span> <span class="nc">AsyncStandardizerFunction</span> <span class="kd">implements</span> <span class="n">AsyncFlatMapFunction</span><span class="o">&lt;</span><span class="n">WikipediaFeedEvent</span><span class="o">,</span> <span class="n">StandardizedWikipediaFeedEvent</span><span class="o">&gt;</span> <span class="o">{</span>
-    <span class="kd">private</span> <span class="kd">transient</span> <span class="n">Client</span> <span class="n">client</span><span class="o">;</span>
+  <span class="kd">static</span> <span class="kd">class</span> <span class="nc">AsyncStandardizerFunction</span> <span class="kd">implements</span> <span class="nc">AsyncFlatMapFunction</span><span class="o">&lt;</span><span class="nc">WikipediaFeedEvent</span><span class="o">,</span> <span class="nc">StandardizedWikipediaFeedEvent</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="kd">private</span> <span class="kd">transient</span> <span class="nc">Client</span> <span class="n">client</span><span class="o">;</span>
 
     <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
-      <span class="n">client</span> <span class="o">=</span> <span class="n">ClientBuilder</span><span class="o">.</span><span class="na">newClient</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="na">getJobContext</span><span class="o">().</span><span class="na">getConfig</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;standardizer.uri&quot;</span><span class="o">));</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">client</span> <span class="o">=</span> <span class="nc">ClientBuilder</span><span class="o">.</span><span class="na">newClient</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="na">getJobContext</span><span class="o">().</span><span class="na">getConfig</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="s">"standardizer.uri"</span><span class="o">));</span>
     <span class="o">}</span>
 
     <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="n">CompletionStage</span><span class="o">&lt;</span><span class="n">Collection</span><span class="o">&lt;</span><span class="n">StandardizedWikipediaFeedEvent</span><span class="o">&gt;&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="n">WikipediaFeedEvent</span> <span class="n">wikipediaFeedEvent</span><span class="o">)</span> <span class="o">{</span>
-      <span class="n">Request</span><span class="o">&lt;</span><span class="n">StandardizerRequest</span><span class="o">&gt;</span> <span class="n">standardizerRequest</span> <span class="o">=</span> <span class="n">buildStandardizedRequest</span><span class="o">(</span><span class="n">wikipediaFeedEvent</span><span class="o">);</span>
-      <span class="n">CompletableFuture</span><span class="o">&lt;</span><span class="n">StandardizerResponse</span><span class="o">&gt;</span> <span class="n">standardizerResponse</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">sendRequest</span><span class="o">(</span><span class="n">standardizerRequest</span><span class="o">);</span>
+    <span class="kd">public</span> <span class="nc">CompletionStage</span><span class="o">&lt;</span><span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">StandardizedWikipediaFeedEvent</span><span class="o">&gt;&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="nc">WikipediaFeedEvent</span> <span class="n">wikipediaFeedEvent</span><span class="o">)</span> <span class="o">{</span>
+      <span class="nc">Request</span><span class="o">&lt;</span><span class="nc">StandardizerRequest</span><span class="o">&gt;</span> <span class="n">standardizerRequest</span> <span class="o">=</span> <span class="n">buildStandardizedRequest</span><span class="o">(</span><span class="n">wikipediaFeedEvent</span><span class="o">);</span>
+      <span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">StandardizerResponse</span><span class="o">&gt;</span> <span class="n">standardizerResponse</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">sendRequest</span><span class="o">(</span><span class="n">standardizerRequest</span><span class="o">);</span>
 
       <span class="k">return</span> <span class="n">standardizerResponse</span>
           <span class="o">.</span><span class="na">thenApply</span><span class="o">(</span><span class="n">response</span> <span class="o">-&gt;</span> <span class="n">extractStandardizedWikipediaFeedEvent</span><span class="o">(</span><span class="n">response</span><span class="o">));</span>
@@ -674,24 +680,24 @@
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>In the above example, the results from the <code>AsyncStandardizerFunction</code> are propagated to downstream operator once the future is complete. There is an overall timeout for each to message to be processed and you can tune it using:</p>
+<p>In the above example, the results from the <code class="language-plaintext highlighter-rouge">AsyncStandardizerFunction</code> are propagated to downstream operator once the future is complete. There is an overall timeout for each to message to be processed and you can tune it using:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># Timeout for the message to processed. When the timeout elapses, the container shuts down.</span>
-<span class="err">task.callback.timeout.ms</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># Timeout for the message to processed. When the timeout elapses, the container shuts down.
+task.callback.timeout.ms</code></pre></figure>
 
 <p>If IO library accepts callbacks instead of returning a Future, the callback can be adapted to a Future in the following way:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>  <span class="kd">public</span> <span class="n">CompletionStage</span><span class="o">&lt;</span><span class="n">Collection</span><span class="o">&lt;</span><span class="n">StandardizedWikipediaFeedEvent</span><span class="o">&gt;&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="n">WikipediaFeedEvent</span> <span class="n">wikipediaFeedEvent</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">Request</span><span class="o">&lt;</span><span class="n">StandardizerRequest</span><span class="o">&gt;</span> <span class="n">standardizationRequest</span> <span class="o">=</span> <span class="n">buildStandardizedRequest</span><span class="o">(</span><span class="n">wikipediaFeedEvent</span><span class="o">);</span>
-    <span class="n">CompletableFuture</span><span class="o">&lt;</span><span class="n">Collection</span><span class="o">&lt;</span><span class="n">StandardizedWikipediaFeedEvent</span><span class="o">&gt;&gt;</span> <span class="n">standardizedFuture</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CompletableFuture</span><span class="o">&lt;&gt;();</span>
-    <span class="n">client</span><span class="o">.</span><span class="na">async</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="n">standardizationRequest</span><span class="o">,</span> <span class="k">new</span> <span class="n">InvocationCallback</span><span class="o">&lt;</span><span class="n">Response</span><span class="o">&gt;()</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  <span class="kd">public</span> <span class="nc">CompletionStage</span><span class="o">&lt;</span><span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">StandardizedWikipediaFeedEvent</span><span class="o">&gt;&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="nc">WikipediaFeedEvent</span> <span class="n">wikipediaFeedEvent</span><span class="o">)</span> <span class="o">{</span>
+    <span class="nc">Request</span><span class="o">&lt;</span><span class="nc">StandardizerRequest</span><span class="o">&gt;</span> <span class="n">standardizationRequest</span> <span class="o">=</span> <span class="n">buildStandardizedRequest</span><span class="o">(</span><span class="n">wikipediaFeedEvent</span><span class="o">);</span>
+    <span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">StandardizedWikipediaFeedEvent</span><span class="o">&gt;&gt;</span> <span class="n">standardizedFuture</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CompletableFuture</span><span class="o">&lt;&gt;();</span>
+    <span class="n">client</span><span class="o">.</span><span class="na">async</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="n">standardizationRequest</span><span class="o">,</span> <span class="k">new</span> <span class="nc">InvocationCallback</span><span class="o">&lt;</span><span class="nc">Response</span><span class="o">&gt;()</span> <span class="o">{</span>
           <span class="nd">@Override</span>
-          <span class="kd">public</span> <span class="kt">void</span> <span class="nf">completed</span><span class="o">(</span><span class="n">StandardizerResponse</span> <span class="n">response</span><span class="o">)</span> <span class="o">{</span>
+          <span class="kd">public</span> <span class="kt">void</span> <span class="nf">completed</span><span class="o">(</span><span class="nc">StandardizerResponse</span> <span class="n">response</span><span class="o">)</span> <span class="o">{</span>
             <span class="n">standardizedFuture</span><span class="o">.</span><span class="na">complete</span><span class="o">(</span><span class="n">extractStandardizedWikipediaFeedEvent</span><span class="o">(</span><span class="n">response</span><span class="o">));</span>
           <span class="o">}</span>
 
           <span class="nd">@Override</span>
-          <span class="kd">public</span> <span class="kt">void</span> <span class="nf">failed</span><span class="o">(</span><span class="n">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">{</span>
+          <span class="kd">public</span> <span class="kt">void</span> <span class="nf">failed</span><span class="o">(</span><span class="nc">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">{</span>
             <span class="n">standardizedFuture</span><span class="o">.</span><span class="na">completeExceptionally</span><span class="o">(</span><span class="n">throwable</span><span class="o">);</span>
           <span class="o">}</span>
         <span class="o">});</span>
@@ -701,8 +707,8 @@
 
 <p>In all cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.</span>
-<span class="na">task.max.concurrency</span><span class="o">=</span><span class="s">4</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
+task.max.concurrency=4</code></pre></figure>
 
 <p><strong>NOTE:</strong> In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can happen out of order. In case of StreamTask and High level API applications with task.max.concurrency &gt; 1, delivery can be out-of-order. This option should <strong>NOT</strong> be used when strict ordering of the output is required.</p>
 
@@ -711,9 +717,9 @@
 <p>In any of the scenarios, Samza guarantees the following semantics:</p>
 
 <ul>
-<li>Samza is thead-safe. You can safely access your job’s state in key-value store, write messages and checkpoint offset in the task threads. If you have other data shared among tasks, such as global variables or static data, it is not thread safe if the data can be accessed concurrently by multiple threads, e.g. StreamTask running in the configured thread pool with more than one threads. For states within a task, such as member variables, Samza guarantees the mutual exclusiveness of process, window and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others.</li>
-<li>WindowableTask.window is called when no outstanding process/processAsync and no new process/processAsync invocations can be scheduled until it completes. The Samza engine is responsible for ensuring that window is invoked in a timely manner.</li>
-<li>Checkpointing is guaranteed to only cover events that are fully processed. It is persisted in commit() method.</li>
+  <li>Samza is thead-safe. You can safely access your job’s state in key-value store, write messages and checkpoint offset in the task threads. If you have other data shared among tasks, such as global variables or static data, it is not thread safe if the data can be accessed concurrently by multiple threads, e.g. StreamTask running in the configured thread pool with more than one threads. For states within a task, such as member variables, Samza guarantees the mutual exclusiveness of process, window and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others.</li>
+  <li>WindowableTask.window is called when no outstanding process/processAsync and no new process/processAsync invocations can be scheduled until it completes. The Samza engine is responsible for ensuring that window is invoked in a timely manner.</li>
+  <li>Checkpointing is guaranteed to only cover events that are fully processed. It is persisted in commit() method.</li>
 </ul>
 
            

Modified: samza/site/learn/tutorials/latest/samza-event-hubs-standalone.html
URL: http://svn.apache.org/viewvc/samza/site/learn/tutorials/latest/samza-event-hubs-standalone.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/tutorials/latest/samza-event-hubs-standalone.html (original)
+++ samza/site/learn/tutorials/latest/samza-event-hubs-standalone.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -544,55 +550,54 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-
 <p>The <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project has an example that uses the Samza High Level Streams API to consume and produce from <a href="../../documentation/versioned/connectors/eventhubs.html">Event Hubs</a> using the Zookeeper deployment model.</p>
 
 <h4 id="get-the-code">Get the Code</h4>
 
-<p>Let&rsquo;s get started by cloning the hello-samza project</p>
+<p>Let’s get started by cloning the hello-samza project</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>git clone https://gitbox.apache.org/repos/asf/samza-hello-samza.git hello-samza
-<span class="nb">cd</span> hello-samza
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">git clone https://gitbox.apache.org/repos/asf/samza-hello-samza.git hello-samza
+<span class="nb">cd </span>hello-samza
 git checkout latest</code></pre></figure>
 
 <p>The project comes up with numerous examples and for this tutorial, we will pick the Azure Event Hubs demo application.</p>
 
 <h4 id="setting-up-the-deployment-environment">Setting up the Deployment Environment</h4>
 
-<p>For our Azure application, we require <a href="http://zookeeper.apache.org/">ZooKeeper</a>. The hello-samza project comes with a script called &ldquo;grid&rdquo; to help with the environment setup</p>
+<p>For our Azure application, we require <a href="http://zookeeper.apache.org/">ZooKeeper</a>. The hello-samza project comes with a script called “grid” to help with the environment setup</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./bin/grid standalone</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">./bin/grid standalone</code></pre></figure>
 
-<p>This command will download, install, and start ZooKeeper and Kafka. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called &ldquo;deploy&rdquo; inside hello-samza&rsquo;s root folder.</p>
+<p>This command will download, install, and start ZooKeeper and Kafka. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called “deploy” inside hello-samza’s root folder.</p>
 
-<p>If you get a complaint that JAVA_HOME is not set, then you&rsquo;ll need to set it to the path where Java is installed on your system.</p>
+<p>If you get a complaint that JAVA_HOME is not set, then you’ll need to set it to the path where Java is installed on your system.</p>
 
 <h4 id="configuring-the-samza-application">Configuring the Samza Application</h4>
 
 <p>Here are the <a href="../../documentation/versioned/connectors/eventhubs.html">Event Hubs descriptors</a> you must set before building the project.
-Configure these in the <code>src/main/java/samza/examples/AzureApplication.java</code> file.</p>
+Configure these in the <code class="language-plaintext highlighter-rouge">src/main/java/samza/examples/AzureApplication.java</code> file.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="mi">1</span>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="n">StreamApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"> <span class="mi">1</span>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">describe</span><span class="o">(</span><span class="nc">StreamApplicationDescriptor</span> <span class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
  <span class="mi">2</span>  <span class="c1">// Define your system here</span>
- <span class="mi">3</span>  <span class="n">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span class="o">);</span>
+ <span class="mi">3</span>  <span class="nc">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">"eventhubs"</span><span class="o">);</span>
  <span class="mi">4</span>  
  <span class="mi">5</span>  <span class="c1">// Choose your serializer/deserializer for the EventData payload</span>
- <span class="mi">6</span>  <span class="n">StringSerde</span> <span class="n">serde</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StringSerde</span><span class="o">();</span>
+ <span class="mi">6</span>  <span class="nc">StringSerde</span> <span class="n">serde</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">StringSerde</span><span class="o">();</span>
  <span class="mi">7</span>  
  <span class="mi">8</span>  <span class="c1">// Define the input and output descriptors with respective descriptors</span>
- <span class="mi">9</span>  <span class="n">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
-<span class="mi">10</span>    <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="n">INPUT_STREAM_ID</span><span class="o">,</span> <span class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="n">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
-<span class="mi">11</span>        <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(</span><span class="n">EVENTHUBS_SAS_KEY_NAME</span><span class="o">)</span>
-<span class="mi">12</span>        <span class="o">.</span><span class="na">withSasKey</span><span class="o">(</span><span class="n">EVENTHUBS_SAS_KEY_TOKEN</span><span class="o">);</span>
+ <span class="mi">9</span>  <span class="nc">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
+<span class="mi">10</span>    <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="no">INPUT_STREAM_ID</span><span class="o">,</span> <span class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="no">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
+<span class="mi">11</span>        <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(</span><span class="no">EVENTHUBS_SAS_KEY_NAME</span><span class="o">)</span>
+<span class="mi">12</span>        <span class="o">.</span><span class="na">withSasKey</span><span class="o">(</span><span class="no">EVENTHUBS_SAS_KEY_TOKEN</span><span class="o">);</span>
 <span class="mi">13</span>  
-<span class="mi">14</span>  <span class="n">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
-<span class="mi">15</span>    <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="n">OUTPUT_STREAM_ID</span><span class="o">,</span> <span class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="n">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
-<span class="mi">16</span>        <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(</span><span class="n">EVENTHUBS_SAS_KEY_NAME</span><span class="o">)</span>
-<span class="mi">17</span>        <span class="o">.</span><span class="na">withSasKey</span><span class="o">(</span><span class="n">EVENTHUBS_SAS_KEY_TOKEN</span><span class="o">);</span>
+<span class="mi">14</span>  <span class="nc">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
+<span class="mi">15</span>    <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="no">OUTPUT_STREAM_ID</span><span class="o">,</span> <span class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="no">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
+<span class="mi">16</span>        <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(</span><span class="no">EVENTHUBS_SAS_KEY_NAME</span><span class="o">)</span>
+<span class="mi">17</span>        <span class="o">.</span><span class="na">withSasKey</span><span class="o">(</span><span class="no">EVENTHUBS_SAS_KEY_TOKEN</span><span class="o">);</span>
 <span class="mi">18</span>  
 <span class="mi">19</span>  <span class="c1">// Define the input and output streams with descriptors</span>
-<span class="mi">20</span>  <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubInput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">inputDescriptor</span><span class="o">);</span>
-<span class="mi">21</span>  <span class="n">OutputStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubOutput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputDescriptor</span><span class="o">);</span>
+<span class="mi">20</span>  <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubInput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">inputDescriptor</span><span class="o">);</span>
+<span class="mi">21</span>  <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubOutput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputDescriptor</span><span class="o">);</span>
 <span class="mi">22</span>  
 <span class="mi">23</span>  <span class="c1">//...</span>
 <span class="mi">24</span>  <span class="o">}</span></code></pre></figure>
@@ -600,23 +605,23 @@ Configure these in the <code>src/main/ja
 <p>In the code snippet above, we create the input and output streams that can consume and produce from the configured Event Hubs entities.</p>
 
 <ol>
-<li>Line 3: A <code>EventHubsSystemDescriptor</code> is created with the name &ldquo;eventhubs&rdquo;. You may set different system descriptors here. </li>
-<li>Line 6: Event Hubs messages are consumed as key value pairs. The <a href="../../documentation/versioned/container/serialization.html">serde</a> is defined for the value of the payload of the Event Hubs&rsquo; EventData. You may use any of the serdes that samza ships with out of the box or define your own.
+  <li>Line 3: A <code class="language-plaintext highlighter-rouge">EventHubsSystemDescriptor</code> is created with the name “eventhubs”. You may set different system descriptors here.</li>
+  <li>Line 6: Event Hubs messages are consumed as key value pairs. The <a href="../../documentation/versioned/container/serialization.html">serde</a> is defined for the value of the payload of the Event Hubs’ EventData. You may use any of the serdes that samza ships with out of the box or define your own.
 The serde for the key is not set since it will always the String from the EventData <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__">partitionKey</a>.</li>
-<li>Line 9-17: An <code>EventHubsInputDescriptor</code> and an <code>EventHubsOutputDescriptor</code> are created with the required descriptors to gain access of the Event Hubs entity (<code>STREAM_ID</code>, <code>EVENTHUBS_NAMESPACE</code>, <code>EVENTHUBS_ENTITY</code>, <code>EVENTHUBS_SAS_KEY_NAME</code>, <code>EVENTHUBS_SAS_KEY_TOKEN</code>).
+  <li>Line 9-17: An <code class="language-plaintext highlighter-rouge">EventHubsInputDescriptor</code> and an <code class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code> are created with the required descriptors to gain access of the Event Hubs entity (<code class="language-plaintext highlighter-rouge">STREAM_ID</code>, <code class="language-plaintext highlighter-rouge">EVENTHUBS_NAMESPACE</code>, <code class="language-plaintext highlighter-rouge">EVENTHUBS_ENTITY</code>, <code class="language-plaintext highlighter-rouge">EVENTHUBS_SAS_KEY_NAME</code>, <code class="language-plaintext highlighter-rouge">EVENTHUBS_SAS_KEY_TOKEN</code>).
 These must be set to the credentials of the entities you wish to connect to.</li>
-<li>Line 10-21: creates an <code>InputStream</code> and <code>OutputStream</code> with the previously defined <code>EventHubsInputDescriptor</code> and <code>EventHubsOutputDescriptor</code>, respectively.</li>
+  <li>Line 10-21: creates an <code class="language-plaintext highlighter-rouge">InputStream</code> and <code class="language-plaintext highlighter-rouge">OutputStream</code> with the previously defined <code class="language-plaintext highlighter-rouge">EventHubsInputDescriptor</code> and <code class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code>, respectively.</li>
 </ol>
 
-<p>Alternatively, you can set these properties in the <code>src/main/config/azure-application-local-runner.properties</code> file.
-Note: the keys set in the <code>.properties</code> file will override the ones set in code with descriptors.
+<p>Alternatively, you can set these properties in the <code class="language-plaintext highlighter-rouge">src/main/config/azure-application-local-runner.properties</code> file.
+Note: the keys set in the <code class="language-plaintext highlighter-rouge">.properties</code> file will override the ones set in code with descriptors.
 Refer to the <a href="../../documentation/versioned/jobs/samza-configurations.html#eventhubs">Event Hubs configuration reference</a> for the complete list of configurations.</p>
 
 <h4 id="building-the-hello-samza-project">Building the Hello Samza Project</h4>
 
-<p>With the environment setup complete, let&rsquo;s move on to building the hello-samza project. Execute the following command:</p>
+<p>With the environment setup complete, let’s move on to building the hello-samza project. Execute the following command:</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./bin/deploy.sh</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">./bin/deploy.sh</code></pre></figure>
 
 <p>We are now all set to run the application locally.</p>
 
@@ -624,13 +629,13 @@ Refer to the <a href="../../documentatio
 
 <p>In order to run the application, we will use the <em>run-azure-application</em> script.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./deploy/samza/bin/run-event-hubs-zk-application.sh</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">./deploy/samza/bin/run-event-hubs-zk-application.sh</code></pre></figure>
 
-<p>The above command executes the helper script which invokes the <em>AzureZKLocalApplication</em> main class, which starts the <em>AzureApplication</em>. This application prints out the messages from the input stream to <code>stdout</code> and send them the output stream.</p>
+<p>The above command executes the helper script which invokes the <em>AzureZKLocalApplication</em> main class, which starts the <em>AzureApplication</em>. This application prints out the messages from the input stream to <code class="language-plaintext highlighter-rouge">stdout</code> and send them the output stream.</p>
 
 <p>The messages consumed should be printed in the following format:</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>Sending: 
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">Sending: 
 Received Key: &lt;KEY&gt;
 Received Message: &lt;VALUE&gt;</code></pre></figure>
 
@@ -639,7 +644,7 @@ Received Message: &lt;VALUE&gt;</code></
 <p>This application can be shutdown by terminating the <em>run-azure-application</em> script.
 We can use the <em>grid</em> script to tear down the local environment (<a href="http://kafka.apache.org/">Kafka</a> and <a href="http://zookeeper.apache.org/">Zookeeper</a>).</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./bin/grid stop all</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">./bin/grid stop all</code></pre></figure>
 
            
         </div>

Modified: samza/site/learn/tutorials/latest/samza-rest-getting-started.html
URL: http://svn.apache.org/viewvc/samza/site/learn/tutorials/latest/samza-rest-getting-started.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/tutorials/latest/samza-rest-getting-started.html (original)
+++ samza/site/learn/tutorials/latest/samza-rest-getting-started.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -551,63 +557,67 @@
 
 <h3 id="run-hello-samza-jobs-locally">Run Hello Samza Jobs Locally</h3>
 
-<p>Follow the <a href="../../../startup/hello-samza/latest/">hello-samza</a> tutorial to setup a local grid and run the wikipedia jobs. Skip the <a href="../../../startup/hello-samza/latest/#shutdown">shutdown step</a> because you need the grid to still be running to query the REST service for jobs. You can optionally skip all the <code>kafka-console-consumer.sh</code> commands if you don&rsquo;t want to verify the output of the jobs.</p>
+<p>Follow the <a href="../../../startup/hello-samza/latest/">hello-samza</a> tutorial to setup a local grid and run the wikipedia jobs. Skip the <a href="../../../startup/hello-samza/latest/#shutdown">shutdown step</a> because you need the grid to still be running to query the REST service for jobs. You can optionally skip all the <code class="language-plaintext highlighter-rouge">kafka-console-consumer.sh</code> commands if you don’t want to verify the output of the jobs.</p>
 
 <p>Take note of the path where you cloned hello-samza. You will need this to configure the installations path for the JobsResource.</p>
 
 <h4 id="build-the-samza-rest-service-package">Build the Samza REST Service package</h4>
-
 <p>The source code for Samza REST is in the samza-rest module of the Samza repository. To build it, execute the following gradle task from the root of the project.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./gradlew samza-rest:clean releaseRestServiceTar</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">./gradlew samza-rest:clean releaseRestServiceTar</code></pre></figure>
 
 <h4 id="deploy-the-samza-rest-service-locally">Deploy the Samza REST Service Locally</h4>
-
 <p>To deploy the service, you simply extract the tarball to the desired location. Here, we will deploy the tarball on the local host in</p>
-<div class="highlight"><pre><code class="language-text" data-lang="text"><span></span>SAMZA_ROOT/samza-rest/build/distributions/deploy/samza-rest
-</code></pre></div>
-<p>where <code>SAMZA_ROOT</code> is the path to the root of your Samza project.</p>
+
+<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>SAMZA_ROOT/samza-rest/build/distributions/deploy/samza-rest
+</code></pre></div></div>
+<p>where <code class="language-plaintext highlighter-rouge">SAMZA_ROOT</code> is the path to the root of your Samza project.</p>
 
 <p>Run the following commands:</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="nb">cd</span> samza-rest/build/distributions/
-mkdir -p deploy/samza-rest
-tar -xvf ./samza-rest_2.11-1.7.0-SNAPSHOT.tgz -C deploy/samza-rest</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nb">cd </span>samza-rest/build/distributions/
+<span class="nb">mkdir</span> <span class="nt">-p</span> deploy/samza-rest
+<span class="nb">tar</span> <span class="nt">-xvf</span> ./samza-rest_2.11-1.9.0-SNAPSHOT.tgz <span class="nt">-C</span> deploy/samza-rest</code></pre></figure>
 
 <h4 id="configure-the-installations-path">Configure the Installations Path</h4>
-
 <p>The JobsResource has a required config <a href="../../documentation/latest/rest/resources/jobs.html#configuration">job.installations.path</a> which specifies the path where the jobs are installed. Edit the configuration file:</p>
-<div class="highlight"><pre><code class="language-text" data-lang="text"><span></span>deploy/samza-rest/config/samza-rest.properties
-</code></pre></div>
+
+<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>deploy/samza-rest/config/samza-rest.properties
+</code></pre></div></div>
+
 <p>Set the job.installations.path to:</p>
-<div class="highlight"><pre><code class="language-text" data-lang="text"><span></span>job.installations.path=/hello-samza-ROOT/deploy
-</code></pre></div>
-<p>where <code>hello-samza-ROOT</code> is the path to your hello-samza clone, noted above. This tells the JobsResource to crawl this location to find all the installed jobs.</p>
 
-<h4 id="start-the-samza-rest-service">Start the Samza REST Service</h4>
+<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>job.installations.path=/hello-samza-ROOT/deploy
+</code></pre></div></div>
+
+<p>where <code class="language-plaintext highlighter-rouge">hello-samza-ROOT</code> is the path to your hello-samza clone, noted above. This tells the JobsResource to crawl this location to find all the installed jobs.</p>
 
+<h4 id="start-the-samza-rest-service">Start the Samza REST Service</h4>
 <p>To deploy the service, run the run-samza-rest-service.sh script from the extracted directory.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="nb">cd</span> deploy/samza-rest
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nb">cd </span>deploy/samza-rest
 ./bin/run-samza-rest-service.sh  <span class="se">\</span>
-  --config job.config.loader.factory<span class="o">=</span>org.apache.samza.config.loaders.PropertiesConfigLoaderFactory <span class="se">\</span>
-  --config job.config.loader.properties.path<span class="o">=</span><span class="nv">$PWD</span>/config/samza-rest.properties</code></pre></figure>
+  <span class="nt">--config</span> job.config.loader.factory<span class="o">=</span>org.apache.samza.config.loaders.PropertiesConfigLoaderFactory <span class="se">\</span>
+  <span class="nt">--config</span> job.config.loader.properties.path<span class="o">=</span><span class="nv">$PWD</span>/config/samza-rest.properties</code></pre></figure>
 
 <p>You provide two parameters to the run-samza-rest-service.sh script. One is the config location, and the other, optional, parameter is a factory class that is used to read your configuration file. The SamzaRestService uses your ConfigFactory to get a Config object from the config path. The ConfigFactory is covered in more detail on the <a href="../../documentation/latest/jobs/job-runner.html">Job Runner page</a>. The run-samza-rest-service.sh script will block until the SamzaRestService terminates.</p>
 
-<p>Note: With the default settings, the JobsResource will expect a YARN cluster with a local Resource Manager accessible via the ApplicationCLI. Without YARN, the JobsResource will not respond to any requests. So it&rsquo;s important to walk through hello-samza demo before the next step.</p>
+<p>Note: With the default settings, the JobsResource will expect a YARN cluster with a local Resource Manager accessible via the ApplicationCLI. Without YARN, the JobsResource will not respond to any requests. So it’s important to walk through hello-samza demo before the next step.</p>
 
 <h3 id="curl-the-default-rest-service">Curl the Default REST Service</h3>
-
 <p>Curl the JobsResource to get all installed jobs</p>
-<div class="highlight"><pre><code class="language-text" data-lang="text"><span></span>curl localhost:9139/v1/jobs
-[{&quot;jobName&quot;:&quot;wikipedia-stats&quot;,&quot;jobId&quot;:&quot;1&quot;,&quot;status&quot;:&quot;STARTED&quot;,&quot;statusDetail&quot;:RUNNING},{&quot;jobName&quot;:&quot;wikipedia-parser&quot;,&quot;jobId&quot;:&quot;1&quot;,&quot;status&quot;:&quot;STARTED&quot;,&quot;statusDetail&quot;:RUNNING},{&quot;jobName&quot;:&quot;wikipedia-feed&quot;,&quot;jobId&quot;:&quot;1&quot;,&quot;status&quot;:&quot;STARTED&quot;,&quot;statusDetail&quot;:RUNNING}
-</code></pre></div>
+
+<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>curl localhost:9139/v1/jobs
+[{"jobName":"wikipedia-stats","jobId":"1","status":"STARTED","statusDetail":RUNNING},{"jobName":"wikipedia-parser","jobId":"1","status":"STARTED","statusDetail":RUNNING},{"jobName":"wikipedia-feed","jobId":"1","status":"STARTED","statusDetail":RUNNING}
+</code></pre></div></div>
+
 <p>Now curl the JobsResource to stop one of the jobs</p>
-<div class="highlight"><pre><code class="language-text" data-lang="text"><span></span>curl -X PUT localhost:9139/v1/jobs/wikipedia-feed/1?status=stopped
-{&quot;jobId&quot;:&quot;1&quot;,&quot;jobName&quot;:&quot;wikipedia-feed&quot;,&quot;status&quot;:&quot;STOPPED&quot;,&quot;statusDetail&quot;:&quot;FINISHED&quot;}
-</code></pre></div>
-<p>Congratulations, you&rsquo;ve successfully deployed the Samza REST Service and used the JobsResource to list jobs and stop a job!</p>
+
+<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>curl -X PUT localhost:9139/v1/jobs/wikipedia-feed/1?status=stopped
+{"jobId":"1","jobName":"wikipedia-feed","status":"STOPPED","statusDetail":"FINISHED"}
+</code></pre></div></div>
+
+<p>Congratulations, you’ve successfully deployed the Samza REST Service and used the JobsResource to list jobs and stop a job!</p>
 
 <p>See the <a href="../../documentation/latest/rest/resources/jobs.html">JobsResource documentation</a> for the rest of its API.</p>
 

Modified: samza/site/learn/tutorials/latest/samza-sql.html
URL: http://svn.apache.org/viewvc/samza/site/learn/tutorials/latest/samza-sql.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/tutorials/latest/samza-sql.html (original)
+++ samza/site/learn/tutorials/latest/samza-sql.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -548,8 +554,8 @@
 <p>There are couple of ways to use Samza SQL</p>
 
 <ol>
-<li>Run Samza SQL on your local machine.</li>
-<li>Run Samza SQL on YARN.</li>
+  <li>Run Samza SQL on your local machine.</li>
+  <li>Run Samza SQL on YARN.</li>
 </ol>
 
 <h1 id="running-samza-sql-on-your-local-machine">Running Samza SQL on your local machine</h1>
@@ -562,27 +568,33 @@
 
 <h2 id="create-profilechangestream-kafka-topic">Create ProfileChangeStream Kafka topic</h2>
 
-<p>The below sql statements requires a topic named ProfileChangeStream to be created on the Kafka broker. You can follow the instructions in the <a href="http://kafka.apache.org/quickstart">Kafka quick start guide</a> to create a topic named &ldquo;ProfileChangeStream&rdquo;.</p>
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <span class="m">1</span> --partitions <span class="m">1</span> --topic ProfileChangeStream
-</code></pre></div>
+<p>The below sql statements requires a topic named ProfileChangeStream to be created on the Kafka broker. You can follow the instructions in the <a href="http://kafka.apache.org/quickstart">Kafka quick start guide</a> to create a topic named “ProfileChangeStream”.</p>
+
+<div class="language-bash highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./deploy/kafka/bin/kafka-topics.sh <span class="nt">--create</span> <span class="nt">--zookeeper</span> localhost:2181 <span class="nt">--replication-factor</span> 1 <span class="nt">--partitions</span> 1 <span class="nt">--topic</span> ProfileChangeStream
+</code></pre></div></div>
+
 <h2 id="generate-events-into-profilechangestream-topic">Generate events into ProfileChangeStream topic</h2>
 
 <p>Use generate-kafka-events from <a href="samza-tools.html">Samza tools</a> to generate events into the ProfileChangeStream</p>
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="nb">cd</span> samza-tools-&lt;version&gt;
-./scripts/generate-kafka-events.sh -t ProfileChangeStream -e ProfileChange
-</code></pre></div>
+
+<div class="language-bash highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nb">cd </span>samza-tools-&lt;version&gt;
+./scripts/generate-kafka-events.sh <span class="nt">-t</span> ProfileChangeStream <span class="nt">-e</span> ProfileChange
+</code></pre></div></div>
+
 <h2 id="using-samza-sql-console-to-run-samza-sql-on-your-local-machine">Using Samza SQL Console to run Samza sql on your local machine</h2>
 
 <p>Below are some of the sql queries that you can execute using the samza-sql-console tool from <a href="samza-tools.html">Samza tools</a> package.</p>
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="c1"># This command just prints out all the events in the Kafka topic ProfileChangeStream into console output as a json serialized payload.</span>
-./scripts/samza-sql-console.sh --sql <span class="s2">&quot;insert into log.consoleoutput select * from kafka.ProfileChangeStream&quot;</span>
 
-<span class="c1"># This command prints out the fields that are selected into the console output as a json serialized payload.</span>
-./scripts/samza-sql-console.sh --sql <span class="s2">&quot;insert into log.consoleoutput select Name, OldCompany, NewCompany from kafka.ProfileChangeStream&quot;</span>
+<div class="language-bash highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="c"># This command just prints out all the events in the Kafka topic ProfileChangeStream into console output as a json serialized payload.</span>
+./scripts/samza-sql-console.sh <span class="nt">--sql</span> <span class="s2">"insert into log.consoleoutput select * from kafka.ProfileChangeStream"</span>
+
+<span class="c"># This command prints out the fields that are selected into the console output as a json serialized payload.</span>
+./scripts/samza-sql-console.sh <span class="nt">--sql</span> <span class="s2">"insert into log.consoleoutput select Name, OldCompany, NewCompany from kafka.ProfileChangeStream"</span>
+
+<span class="c"># This command showcases the RegexMatch udf and filtering capabilities.</span>
+./scripts/samza-sql-console.sh <span class="nt">--sql</span> <span class="s2">"insert into log.consoleoutput select Name as __key__, Name, NewCompany, RegexMatch('.*soft', OldCompany) from kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"</span>
+</code></pre></div></div>
 
-<span class="c1"># This command showcases the RegexMatch udf and filtering capabilities.</span>
-./scripts/samza-sql-console.sh --sql <span class="s2">&quot;insert into log.consoleoutput select Name as __key__, Name, NewCompany, RegexMatch(&#39;.*soft&#39;, OldCompany) from kafka.ProfileChangeStream where NewCompany = &#39;LinkedIn&#39;&quot;</span>
-</code></pre></div>
 <h1 id="running-samza-sql-on-yarn">Running Samza SQL on YARN</h1>
 
 <p>The <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project is an example project designed to help you run your first Samza application. It has examples of applications using the Low Level  Task API, High Level Streams API as well as Samza SQL.</p>
@@ -591,11 +603,11 @@
 
 <h2 id="get-the-hello-samza-code-and-start-the-grid">Get the hello-samza Code and Start the grid</h2>
 
-<p>Please follow the instructions from <a href="hello-samza-high-level-yarn.html">hello-samza-high-level-yarn</a> on how to build the hello-samza repository and start the yarn grid. </p>
+<p>Please follow the instructions from <a href="hello-samza-high-level-yarn.html">hello-samza-high-level-yarn</a> on how to build the hello-samza repository and start the yarn grid.</p>
 
 <h2 id="create-the-topic-and-generate-kafka-events">Create the topic and generate Kafka events</h2>
 
-<p>Please follow the steps in the section &ldquo;Create ProfileChangeStream Kafka topic&rdquo; and &ldquo;Generate events into ProfileChangeStream topic&rdquo; above.</p>
+<p>Please follow the steps in the section “Create ProfileChangeStream Kafka topic” and “Generate events into ProfileChangeStream topic” above.</p>
 
 <h2 id="build-a-samza-application-package">Build a Samza Application Package</h2>
 
@@ -603,33 +615,35 @@
 
 <h2 id="run-a-samza-application">Run a Samza Application</h2>
 
-<p>After you&rsquo;ve built your Samza package, you can start the app on the grid using the run-app.sh script.</p>
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./deploy/samza/bin/run-app.sh --config-path<span class="o">=</span><span class="nv">$PWD</span>/deploy/samza/config/page-view-filter-sql.properties
-</code></pre></div>
-<p>The app executes the following SQL command :
-<code>sql
-insert into kafka.NewLinkedInEmployees select Name from ProfileChangeStream where NewCompany = &#39;LinkedIn&#39;
-</code></p>
+<p>After you’ve built your Samza package, you can start the app on the grid using the run-app.sh script.</p>
+
+<div class="language-bash highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./deploy/samza/bin/run-app.sh <span class="nt">--config-path</span><span class="o">=</span><span class="nv">$PWD</span>/deploy/samza/config/page-view-filter-sql.properties
+</code></pre></div></div>
+
+<p>The app executes the following SQL command :</p>
+<div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">insert</span> <span class="k">into</span> <span class="n">kafka</span><span class="p">.</span><span class="n">NewLinkedInEmployees</span> <span class="k">select</span> <span class="n">Name</span> <span class="k">from</span> <span class="n">ProfileChangeStream</span> <span class="k">where</span> <span class="n">NewCompany</span> <span class="o">=</span> <span class="s1">'LinkedIn'</span>
+</code></pre></div></div>
 
 <p>This SQL performs the following</p>
 
 <ol>
-<li>Consumes the Kafka topic ProfileChangeStreamStream which contains the avro serialized ProfileChangeEvent(s) </li>
-<li>Deserializes the events and filters out only the profile change events where NewCompany = &lsquo;LinkedIn&rsquo; i.e. Members who have moved to LinkedIn.</li>
-<li>Writes the Avro serialized event that contains the Id and Name of those profiles to Kafka topic NewLinkedInEmployees.</li>
+  <li>Consumes the Kafka topic ProfileChangeStreamStream which contains the avro serialized ProfileChangeEvent(s)</li>
+  <li>Deserializes the events and filters out only the profile change events where NewCompany = ‘LinkedIn’ i.e. Members who have moved to LinkedIn.</li>
+  <li>Writes the Avro serialized event that contains the Id and Name of those profiles to Kafka topic NewLinkedInEmployees.</li>
 </ol>
 
 <p>Give the job a minute to startup, and then tail the Kafka topic:</p>
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic NewLinkedInEmployees
-</code></pre></div>
-<p>Congratulations! You&rsquo;ve now setup a local grid that includes YARN, Kafka, and ZooKeeper, and run a Samza SQL application on it.</p>
+
+<div class="language-bash highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./deploy/kafka/bin/kafka-console-consumer.sh  <span class="nt">--zookeeper</span> localhost:2181 <span class="nt">--topic</span> NewLinkedInEmployees
+</code></pre></div></div>
+
+<p>Congratulations! You’ve now setup a local grid that includes YARN, Kafka, and ZooKeeper, and run a Samza SQL application on it.</p>
 
 <h2 id="shutdown-and-cleanup">Shutdown and cleanup</h2>
 
-<p>To shutdown the app, use the same <em>run-app.sh</em> script with an extra <em>&ndash;operation=kill</em> argument
-<code>bash
-./deploy/samza/bin/run-app.sh --config-path=$PWD/deploy/samza/config/page-view-filter-sql.properties --operation=kill
-</code></p>
+<p>To shutdown the app, use the same <em>run-app.sh</em> script with an extra <em>–operation=kill</em> argument</p>
+<div class="language-bash highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./deploy/samza/bin/run-app.sh <span class="nt">--config-path</span><span class="o">=</span><span class="nv">$PWD</span>/deploy/samza/config/page-view-filter-sql.properties <span class="nt">--operation</span><span class="o">=</span><span class="nb">kill</span>
+</code></pre></div></div>
 
 <p>Please follow the instructions from <a href="hello-samza-high-level-yarn.html">Hello Samza High Level API - YARN Deployment</a> on how to shutdown and cleanup the app.</p>