You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by aj...@apache.org on 2023/01/18 19:33:31 UTC

svn commit: r1906774 [29/49] - in /samza/site: ./ archive/ blog/ case-studies/ community/ contribute/ img/latest/learn/documentation/api/ learn/documentation/latest/ learn/documentation/latest/api/ learn/documentation/latest/api/javadocs/ learn/documen...

Modified: samza/site/learn/documentation/latest/connectors/eventhubs.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/connectors/eventhubs.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/connectors/eventhubs.html (original)
+++ samza/site/learn/documentation/latest/connectors/eventhubs.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/connectors/eventhubs">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/connectors/eventhubs">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/connectors/eventhubs">1.6.0</a></li>
 
               
@@ -639,118 +653,122 @@
    limitations under the License.
 -->
 
-<h3 id="eventhubs-i-o-quickstart">EventHubs I/O: QuickStart</h3>
+<h3 id="eventhubs-io-quickstart">EventHubs I/O: QuickStart</h3>
 
-<p>The Samza EventHubs connector provides access to <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features">Azure EventHubs</a>, Microsoft’s data streaming service on Azure. An eventhub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data">EventData</a>. </p>
+<p>The Samza EventHubs connector provides access to <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features">Azure EventHubs</a>, Microsoft’s data streaming service on Azure. An eventhub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data">EventData</a>.</p>
 
 <p>The <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project includes an <a href="../../../tutorials/versioned/samza-event-hubs-standalone.html">example</a> of reading and writing to EventHubs.</p>
 
 <h3 id="concepts">Concepts</h3>
 
-<h4 id="eventhubssystemdescriptor">EventHubsSystemDescriptor</h4>
+<p>####EventHubsSystemDescriptor</p>
 
-<p>Samza refers to any IO source (eg: Kafka) it interacts with as a <em>system</em>, whose properties are set using a corresponding <code>SystemDescriptor</code>. The <code>EventHubsSystemDescriptor</code> allows you to configure various properties for the <code>EventHubsClient</code> used by Samza.</p>
+<p>Samza refers to any IO source (eg: Kafka) it interacts with as a <em>system</em>, whose properties are set using a corresponding <code class="language-plaintext highlighter-rouge">SystemDescriptor</code>. The <code class="language-plaintext highlighter-rouge">EventHubsSystemDescriptor</code> allows you to configure various properties for the <code class="language-plaintext highlighter-rouge">EventHubsClient</code> used by Samza.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="mi">1</span>  <span class="n">EventHubsSystemDescriptor</span> <span class="n">eventHubsSystemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span class="o">).</span><span class="na">withNumClientThreads</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"> <span class="mi">1</span>  <span class="nc">EventHubsSystemDescriptor</span> <span class="n">eventHubsSystemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">"eventhubs"</span><span class="o">).</span><span class="na">withNumClientThreads</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span></code></pre></figure>
 
-<h4 id="eventhubsinputdescriptor">EventHubsInputDescriptor</h4>
+<p>####EventHubsInputDescriptor</p>
 
 <p>The EventHubsInputDescriptor allows you to specify the properties of each EventHubs stream your application should read from. For each of your input streams, you should create a corresponding instance of EventHubsInputDescriptor by providing a topic-name and a serializer.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span> 
-        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="n">streamId</span><span class="o">,</span> <span class="s">&quot;eventhubs-namespace&quot;</span><span class="o">,</span> <span class="s">&quot;eventhubs-name&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">StringSerde</span><span class="o">())</span>
-          <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(</span><span class="s">&quot;secretkey&quot;</span><span class="o">)</span>
-          <span class="o">.</span><span class="na">withSasKey</span><span class="o">(</span><span class="s">&quot;sasToken-123&quot;</span><span class="o">)</span>
-          <span class="o">.</span><span class="na">withConsumerGroup</span><span class="o">(</span><span class="s">&quot;$notdefault&quot;</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span> 
+        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="n">streamId</span><span class="o">,</span> <span class="s">"eventhubs-namespace"</span><span class="o">,</span> <span class="s">"eventhubs-name"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">StringSerde</span><span class="o">())</span>
+          <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(</span><span class="s">"secretkey"</span><span class="o">)</span>
+          <span class="o">.</span><span class="na">withSasKey</span><span class="o">(</span><span class="s">"sasToken-123"</span><span class="o">)</span>
+          <span class="o">.</span><span class="na">withConsumerGroup</span><span class="o">(</span><span class="s">"$notdefault"</span><span class="o">);</span></code></pre></figure>
 
-<p>By default, messages are sent and received as byte arrays. Samza then de-serializes them to typed objects using your provided Serde. For example, the above uses a <code>StringSerde</code> to de-serialize messages.</p>
+<p>By default, messages are sent and received as byte arrays. Samza then de-serializes them to typed objects using your provided Serde. For example, the above uses a <code class="language-plaintext highlighter-rouge">StringSerde</code> to de-serialize messages.</p>
 
-<h4 id="eventhubsoutputdescriptor">EventHubsOutputDescriptor</h4>
+<p>####EventHubsOutputDescriptor</p>
 
-<p>Similarly, the <code>EventHubsOutputDescriptor</code> allows you to specify the output streams for your application. For each output stream you write to in EventHubs, you should create an instance of <code>EventHubsOutputDescriptor</code>.</p>
+<p>Similarly, the <code class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code> allows you to specify the output streams for your application. For each output stream you write to in EventHubs, you should create an instance of <code class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="n">OUTPUT_STREAM_ID</span><span class="o">,</span> <span class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="n">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span class="k">new</span> <span class="n">StringSerde</span><span class="o">();)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="no">OUTPUT_STREAM_ID</span><span class="o">,</span> <span class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="no">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span class="k">new</span> <span class="nc">StringSerde</span><span class="o">();)</span>
             <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withSasKey</span><span class="o">(..);</span></code></pre></figure>
 
-<h4 id="security-model">Security Model</h4>
-
-<p>Each EventHubs stream is scoped to a container called a <em>namespace</em>, which uniquely identifies an EventHubs in a region. EventHubs&rsquo;s <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview">security model</a> is based on Shared Access Signatures(SAS). 
-Hence, you should also provide your SAS keys and tokens to access the stream. You can generate your SAS tokens using the </p>
-
-<h4 id="data-model">Data Model</h4>
+<p>####Security Model
+Each EventHubs stream is scoped to a container called a <em>namespace</em>, which uniquely identifies an EventHubs in a region. EventHubs’s <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview">security model</a> is based on Shared Access Signatures(SAS). 
+Hence, you should also provide your SAS keys and tokens to access the stream. You can generate your SAS tokens using the</p>
 
-<p>Each event produced and consumed from an EventHubs stream is an instance of <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data">EventData</a>, which wraps a byte-array payload. When producing to EventHubs, Samza serializes your object into an <code>EventData</code> payload before sending it over the wire. Likewise, when consuming messages from EventHubs, messages are de-serialized into typed objects using the provided Serde. </p>
+<p>####Data Model
+Each event produced and consumed from an EventHubs stream is an instance of <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data">EventData</a>, which wraps a byte-array payload. When producing to EventHubs, Samza serializes your object into an <code class="language-plaintext highlighter-rouge">EventData</code> payload before sending it over the wire. Likewise, when consuming messages from EventHubs, messages are de-serialized into typed objects using the provided Serde.</p>
 
 <h3 id="configuration">Configuration</h3>
 
-<h4 id="producer-partitioning">Producer partitioning</h4>
+<p>####Producer partitioning</p>
 
-<p>You can use <code>#withPartitioningMethod</code> to control how outgoing messages are partitioned. The following partitioning schemes are supported:</p>
+<p>You can use <code class="language-plaintext highlighter-rouge">#withPartitioningMethod</code> to control how outgoing messages are partitioned. The following partitioning schemes are supported:</p>
 
 <ol>
-<li><p>EVENT_HUB_HASHING: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.</p></li>
-<li><p>PARTITION_KEY_AS_PARTITION: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key in the message is used if the partition key is not specified.</p></li>
-<li><p>ROUND_ROBIN: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.</p></li>
+  <li>
+    <p>EVENT_HUB_HASHING: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.</p>
+  </li>
+  <li>
+    <p>PARTITION_KEY_AS_PARTITION: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key in the message is used if the partition key is not specified.</p>
+  </li>
+  <li>
+    <p>ROUND_ROBIN: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.</p>
+  </li>
 </ol>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">withPartitioningMethod</span><span class="o">(</span><span class="n">PartitioningMethod</span><span class="o">.</span><span class="na">EVENT_HUB_HASHING</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">"eventhubs"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withPartitioningMethod</span><span class="o">(</span><span class="nc">PartitioningMethod</span><span class="o">.</span><span class="na">EVENT_HUB_HASHING</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="consumer-groups">Consumer groups</h4>
 
-<p>Event Hubs supports the notion of <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups">consumer groups</a> which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job using <code>withConsumerGroup</code>.</p>
+<p>Event Hubs supports the notion of <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups">consumer groups</a> which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job using <code class="language-plaintext highlighter-rouge">withConsumerGroup</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span class="o">);</span>
-<span class="n">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="n">INPUT_STREAM_ID</span><span class="o">,</span> <span class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="n">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
-            <span class="o">.</span><span class="na">withConsumerGroup</span><span class="o">(</span><span class="s">&quot;my-group&quot;</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">"eventhubs"</span><span class="o">);</span>
+<span class="nc">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="no">INPUT_STREAM_ID</span><span class="o">,</span> <span class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="no">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">withConsumerGroup</span><span class="o">(</span><span class="s">"my-group"</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="consumer-buffer-size">Consumer buffer size</h4>
 
 <p>When the consumer reads a message from EventHubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"> <span class="nc">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">"eventhubs"</span><span class="o">)</span>
         <span class="o">.</span><span class="na">withReceiveQueueSize</span><span class="o">(</span><span class="mi">10</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="code-walkthrough">Code walkthrough</h3>
 
-<p>In this section, we will walk through a simple pipeline that reads from one EventHubs stream and copies each message to another output stream. </p>
+<p>In this section, we will walk through a simple pipeline that reads from one EventHubs stream and copies each message to another output stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="mi">1</span>    <span class="n">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span class="o">).</span><span class="na">withNumClientThreads</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="mi">1</span>    <span class="nc">EventHubsSystemDescriptor</span> <span class="n">systemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span class="o">(</span><span class="s">"eventhubs"</span><span class="o">).</span><span class="na">withNumClientThreads</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span>
 
-<span class="mi">2</span>    <span class="n">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="n">INPUT_STREAM_ID</span><span class="o">,</span> <span class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="n">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span class="k">new</span> <span class="n">StringSerde</span><span class="o">())</span>
+<span class="mi">2</span>    <span class="nc">EventHubsInputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="no">INPUT_STREAM_ID</span><span class="o">,</span> <span class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="no">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span class="k">new</span> <span class="nc">StringSerde</span><span class="o">())</span>
             <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withSasKey</span><span class="o">(..));</span>
 
-<span class="mi">3</span>    <span class="n">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="n">OUTPUT_STREAM_ID</span><span class="o">,</span> <span class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="n">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
+<span class="mi">3</span>    <span class="nc">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="no">OUTPUT_STREAM_ID</span><span class="o">,</span> <span class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span class="no">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span class="n">serde</span><span class="o">)</span>
             <span class="o">.</span><span class="na">withSasKeyName</span><span class="o">(..))</span>
             <span class="o">.</span><span class="na">withSasKey</span><span class="o">(..));</span>
 
-<span class="mi">4</span>    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubInput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">inputDescriptor</span><span class="o">);</span>
-<span class="mi">5</span>    <span class="n">OutputStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubOutput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputDescriptor</span><span class="o">);</span>
+<span class="mi">4</span>    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubInput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">inputDescriptor</span><span class="o">);</span>
+<span class="mi">5</span>    <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">eventhubOutput</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputDescriptor</span><span class="o">);</span>
 
     <span class="c1">// Define the execution flow with the High Level Streams API</span>
 <span class="mi">6</span>    <span class="n">eventhubInput</span>
 <span class="mi">7</span>        <span class="o">.</span><span class="na">map</span><span class="o">((</span><span class="n">message</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
-<span class="mi">8</span>          <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Received Key: &quot;</span> <span class="o">+</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">());</span>
-<span class="mi">9</span>          <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Received Message: &quot;</span> <span class="o">+</span> <span class="n">message</span><span class="o">.</span><span class="na">getValue</span><span class="o">());</span>
+<span class="mi">8</span>          <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Received Key: "</span> <span class="o">+</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">());</span>
+<span class="mi">9</span>          <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Received Message: "</span> <span class="o">+</span> <span class="n">message</span><span class="o">.</span><span class="na">getValue</span><span class="o">());</span>
 <span class="mi">10</span>          <span class="k">return</span> <span class="n">message</span><span class="o">;</span>
 <span class="mi">11</span>        <span class="o">})</span>
 <span class="mi">12</span>        <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">eventhubOutput</span><span class="o">);</span></code></pre></figure>
 
-<p>-Line 1 instantiates an <code>EventHubsSystemDescriptor</code> configuring an EventHubsClient with 5 threads. To consume from other input sources like Kafka, you can define their corresponding descriptors. </p>
+<p>-Line 1 instantiates an <code class="language-plaintext highlighter-rouge">EventHubsSystemDescriptor</code> configuring an EventHubsClient with 5 threads. To consume from other input sources like Kafka, you can define their corresponding descriptors.</p>
 
-<p>-Line 2 creates an <code>EventHubsInputDescriptor</code> with a String serde for its values. Recall that Samza follows a KV data-model for input messages. In the case of EventHubs, the key is a string which is set to the <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__">partitionKey</a> in the message. Hence, no separate key serde is required. </p>
+<p>-Line 2 creates an <code class="language-plaintext highlighter-rouge">EventHubsInputDescriptor</code> with a String serde for its values. Recall that Samza follows a KV data-model for input messages. In the case of EventHubs, the key is a string which is set to the <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__">partitionKey</a> in the message. Hence, no separate key serde is required.</p>
 
-<p>-Line 3 creates an <code>EventHubsOutputDescriptor</code> to write to an EventHubs stream with the given credentials.</p>
+<p>-Line 3 creates an <code class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code> to write to an EventHubs stream with the given credentials.</p>
 
-<p>-Line 4 obtains a <code>MessageStream</code> from the input descriptor that you can later chain operations on. </p>
+<p>-Line 4 obtains a <code class="language-plaintext highlighter-rouge">MessageStream</code> from the input descriptor that you can later chain operations on.</p>
 
-<p>-Line 5 creates an <code>OutputStream</code> with the previously defined <code>EventHubsOutputDescriptor</code> that you can send messages to.</p>
+<p>-Line 5 creates an <code class="language-plaintext highlighter-rouge">OutputStream</code> with the previously defined <code class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code> that you can send messages to.</p>
 
 <p>-Line 7-12 define a simple pipeline that copies message from one EventHubs stream to another</p>
 

Modified: samza/site/learn/documentation/latest/connectors/hdfs.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/connectors/hdfs.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/connectors/hdfs.html (original)
+++ samza/site/learn/documentation/latest/connectors/hdfs.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/connectors/hdfs">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/connectors/hdfs">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/connectors/hdfs">1.6.0</a></li>
 
               
@@ -645,83 +659,87 @@
 To interact with HDFS, Samza requires your job to run on the same YARN cluster.</p>
 
 <h3 id="consuming-from-hdfs">Consuming from HDFS</h3>
-
 <h4 id="input-partitioning">Input Partitioning</h4>
 
-<p>Partitioning works at the level of individual directories and files. Each directory is treated as its own stream and each of its files is treated as a <em>partition</em>. For example, Samza creates 5 partitions when it&rsquo;s reading from a directory containing 5 files. There is no way to parallelize the consumption when reading from a single file - you can only have one container to process the file.</p>
+<p>Partitioning works at the level of individual directories and files. Each directory is treated as its own stream and each of its files is treated as a <em>partition</em>. For example, Samza creates 5 partitions when it’s reading from a directory containing 5 files. There is no way to parallelize the consumption when reading from a single file - you can only have one container to process the file.</p>
 
 <h4 id="input-event-format">Input Event format</h4>
-
-<p>Samza supports avro natively, and it&rsquo;s easy to extend to other serialization formats. Each avro record read from HDFS is wrapped into a message-envelope. The <a href="../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html">envelope</a> contains these 3 fields:</p>
+<p>Samza supports avro natively, and it’s easy to extend to other serialization formats. Each avro record read from HDFS is wrapped into a message-envelope. The <a href="../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html">envelope</a> contains these 3 fields:</p>
 
 <ul>
-<li><p>The key, which is empty</p></li>
-<li><p>The value, which is set to the avro <a href="https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html">GenericRecord</a></p></li>
-<li><p>The partition, which is set to the name of the HDFS file</p></li>
+  <li>
+    <p>The key, which is empty</p>
+  </li>
+  <li>
+    <p>The value, which is set to the avro <a href="https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html">GenericRecord</a></p>
+  </li>
+  <li>
+    <p>The partition, which is set to the name of the HDFS file</p>
+  </li>
 </ul>
 
 <p>To support non-avro input formats, you can implement the <a href="https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java">SingleFileHdfsReader</a> interface.</p>
 
 <h4 id="endofstream">EndOfStream</h4>
 
-<p>While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of EOF. When reading from HDFS, your Samza job automatically exits after consuming all the data. You can implement <a href="../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html">EndOfStreamListenerTask</a> to get a callback once EOF has been reached. </p>
+<p>While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of EOF. When reading from HDFS, your Samza job automatically exits after consuming all the data. You can implement <a href="../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html">EndOfStreamListenerTask</a> to get a callback once EOF has been reached.</p>
 
 <h4 id="defining-streams">Defining streams</h4>
 
-<p>In Samza high level API, you can use <code>HdfsSystemDescriptor</code> to create a HDFS system. The stream name should be set to the name of the directory on HDFS.</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;hdfs-clickstream&quot;</span><span class="o">);</span>
-<span class="n">HdfsInputDescriptor</span> <span class="n">hid</span> <span class="o">=</span> <span class="n">hsd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;/data/clickstream/2016/09/11&quot;</span><span class="o">);</span></code></pre></figure>
+<p>In Samza high level API, you can use <code class="language-plaintext highlighter-rouge">HdfsSystemDescriptor</code> to create a HDFS system. The stream name should be set to the name of the directory on HDFS.</p>
 
-<p>The above example defines a stream called <code>hdfs-clickstream</code> that reads data from the <code>/data/clickstream/2016/09/11</code> directory. </p>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">);</span>
+<span class="nc">HdfsInputDescriptor</span> <span class="n">hid</span> <span class="o">=</span> <span class="n">hsd</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"/data/clickstream/2016/09/11"</span><span class="o">);</span></code></pre></figure>
 
-<h4 id="whitelists-blacklists">Whitelists &amp; Blacklists</h4>
+<p>The above example defines a stream called <code class="language-plaintext highlighter-rouge">hdfs-clickstream</code> that reads data from the <code class="language-plaintext highlighter-rouge">/data/clickstream/2016/09/11</code> directory.</p>
 
-<p>If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the <em>whitelist</em> selects the files to be filtered and the <em>blacklist</em> is later applied on its results. </p>
+<h4 id="whitelists--blacklists">Whitelists &amp; Blacklists</h4>
+<p>If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the <em>whitelist</em> selects the files to be filtered and the <em>blacklist</em> is later applied on its results.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span class="na">withConsumerWhiteList</span><span class="o">(</span><span class="s">&quot;.*avro&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span class="na">withConsumerBlackList</span><span class="o">(</span><span class="s">&quot;somefile.avro&quot;</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span class="na">withConsumerWhiteList</span><span class="o">(</span><span class="s">".*avro"</span><span class="o">)</span>
+                                        <span class="o">.</span><span class="na">withConsumerBlackList</span><span class="o">(</span><span class="s">"somefile.avro"</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="producing-to-hdfs">Producing to HDFS</h3>
 
 <h4 id="output-format">Output format</h4>
 
-<p>Samza allows writing your output results to HDFS in AVRO format. You can either use avro&rsquo;s GenericRecords or have Samza automatically infer the schema for your object using reflection. </p>
+<p>Samza allows writing your output results to HDFS in AVRO format. You can either use avro’s GenericRecords or have Samza automatically infer the schema for your object using reflection.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span class="na">withWriterClassName</span><span class="o">(</span><span class="n">AvroDataFileHdfsWriter</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span class="na">withWriterClassName</span><span class="o">(</span><span class="nc">AvroDataFileHdfsWriter</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span></code></pre></figure>
 
-<p>If your output is non-avro, use <code>TextSequenceFileHdfsWriter</code>.</p>
+<p>If your output is non-avro, use <code class="language-plaintext highlighter-rouge">TextSequenceFileHdfsWriter</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span class="na">withWriterClassName</span><span class="o">(</span><span class="n">TextSequenceFileHdfsWriter</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span class="na">withWriterClassName</span><span class="o">(</span><span class="nc">TextSequenceFileHdfsWriter</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span></code></pre></figure>
 
 <h4 id="output-directory-structure">Output directory structure</h4>
 
-<p>Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter. </p>
+<p>Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span class="na">withOutputBaseDir</span><span class="o">(</span><span class="s">&quot;/user/me/analytics/clickstream_data&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span class="na">withDatePathFormat</span><span class="o">(</span><span class="s">&quot;yyyy_MM_dd&quot;</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span class="na">withOutputBaseDir</span><span class="o">(</span><span class="s">"/user/me/analytics/clickstream_data"</span><span class="o">)</span>
+                                        <span class="o">.</span><span class="na">withDatePathFormat</span><span class="o">(</span><span class="s">"yyyy_MM_dd"</span><span class="o">);</span></code></pre></figure>
 
 <p>You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span class="s">"hdfs-clickstream"</span><span class="o">)</span>
                                         <span class="o">.</span><span class="na">withWriteBatchSizeBytes</span><span class="o">(</span><span class="mi">134217728</span><span class="o">)</span>
                                         <span class="o">.</span><span class="na">withWriteBatchSizeRecords</span><span class="o">(</span><span class="mi">10000</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="security">Security</h3>
 
-<p>You can access Kerberos-enabled HDFS clusters by providing your principal and the path to your key-tab file. Samza takes care of automatically creating and renewing your Kerberos tokens periodically. </p>
+<p>You can access Kerberos-enabled HDFS clusters by providing your principal and the path to your key-tab file. Samza takes care of automatically creating and renewing your Kerberos tokens periodically.</p>
+
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">job.security.manager.factory</span><span class="o">=</span><span class="s">org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory</span>
+# Kerberos principal
+yarn.kerberos.principal=your-principal-name
 
-<span class="c"># Kerberos principal</span>
-<span class="na">yarn.kerberos.principal</span><span class="o">=</span><span class="s">your-principal-name</span>
+# Path of the keytab file (local path)
+yarn.kerberos.keytab=/tmp/keytab</code></pre></figure>
 
-<span class="c"># Path of the keytab file (local path)</span>
-<span class="na">yarn.kerberos.keytab</span><span class="o">=</span><span class="s">/tmp/keytab</span></code></pre></figure>
 
            
         </div>

Modified: samza/site/learn/documentation/latest/connectors/kafka.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/connectors/kafka.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/connectors/kafka.html (original)
+++ samza/site/learn/documentation/latest/connectors/kafka.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/connectors/kafka">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/connectors/kafka">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/connectors/kafka">1.6.0</a></li>
 
               
@@ -639,112 +653,125 @@
    limitations under the License.
 -->
 
-<h3 id="kafka-i-o-quickstart">Kafka I/O : QuickStart</h3>
-
+<h3 id="kafka-io--quickstart">Kafka I/O : QuickStart</h3>
 <p>Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process them and emit results to other Kafka topics or databases.</p>
 
-<p>The <code>hello-samza</code> project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results. </p>
+<p>The <code class="language-plaintext highlighter-rouge">hello-samza</code> project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results.</p>
 
 <ul>
-<li><p><a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java">High Level Streams API Example</a> with a corresponding <a href="/learn/documentation/latest/deployment/yarn.html#starting-your-application-on-yarn">tutorial</a></p></li>
-<li><p><a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java">Low Level Task API Example</a> with a corresponding <a href="https://github.com/apache/samza-hello-samza#hello-samza">tutorial</a></p></li>
+  <li>
+    <p><a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java">High Level Streams API Example</a> with a corresponding <a href="/learn/documentation/latest/deployment/yarn.html#starting-your-application-on-yarn">tutorial</a></p>
+  </li>
+  <li>
+    <p><a href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java">Low Level Task API Example</a> with a corresponding <a href="https://github.com/apache/samza-hello-samza#hello-samza">tutorial</a></p>
+  </li>
 </ul>
 
 <h3 id="concepts">Concepts</h3>
 
-<h4 id="kafkasystemdescriptor">KafkaSystemDescriptor</h4>
+<p>####KafkaSystemDescriptor</p>
 
-<p>Samza refers to any IO source (eg: Kafka) it interacts with as a <em>system</em>, whose properties are set using a corresponding <code>SystemDescriptor</code>. The <code>KafkaSystemDescriptor</code> allows you to describe the Kafka cluster you are interacting with and specify its properties. </p>
+<p>Samza refers to any IO source (eg: Kafka) it interacts with as a <em>system</em>, whose properties are set using a corresponding <code class="language-plaintext highlighter-rouge">SystemDescriptor</code>. The <code class="language-plaintext highlighter-rouge">KafkaSystemDescriptor</code> allows you to describe the Kafka cluster you are interacting with and specify its properties.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
-        <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">).</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="n">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
-            <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="n">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
-            <span class="o">.</span><span class="na">withDefaultStreamConfigs</span><span class="o">(</span><span class="n">KAFKA_DEFAULT_STREAM_CONFIGS</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
+        <span class="k">new</span> <span class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">).</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="no">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="no">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">withDefaultStreamConfigs</span><span class="o">(</span><span class="no">KAFKA_DEFAULT_STREAM_CONFIGS</span><span class="o">);</span></code></pre></figure>
 
-<h4 id="kafkainputdescriptor">KafkaInputDescriptor</h4>
+<p>####KafkaInputDescriptor</p>
 
-<p>A Kafka cluster usually has multiple topics (a.k.a <em>streams</em>). The <code>KafkaInputDescriptor</code> allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of <code>KafkaInputDescriptor</code>
+<p>A Kafka cluster usually has multiple topics (a.k.a <em>streams</em>). The <code class="language-plaintext highlighter-rouge">KafkaInputDescriptor</code> allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of <code class="language-plaintext highlighter-rouge">KafkaInputDescriptor</code>
 by providing a topic-name and a serializer.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViewStreamDescriptor</span> <span class="o">=</span> <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;page-view-topic&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViewStreamDescriptor</span> <span class="o">=</span> <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"page-view-topic"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></figure>
 
-<p>The above example describes an input Kafka stream from the &ldquo;page-view-topic&rdquo; which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc.</p>
+<p>The above example describes an input Kafka stream from the “page-view-topic” which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc.</p>
 
-<h4 id="kafkaoutputdescriptor">KafkaOutputDescriptor</h4>
+<p>####KafkaOutputDescriptor</p>
 
-<p>Similarly, the <code>KafkaOutputDescriptor</code> allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of <code>KafkaOutputDescriptor</code>.</p>
+<p>Similarly, the <code class="language-plaintext highlighter-rouge">KafkaOutputDescriptor</code> allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of <code class="language-plaintext highlighter-rouge">KafkaOutputDescriptor</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="n">DecoratedPageView</span><span class="o">&gt;</span> <span class="n">decoratedPageView</span> <span class="o">=</span> <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">&quot;my-output-topic&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="nc">DecoratedPageView</span><span class="o">&gt;</span> <span class="n">decoratedPageView</span> <span class="o">=</span> <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"my-output-topic"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></figure>
 
 <h3 id="configuration">Configuration</h3>
 
-<h5 id="configuring-kafka-producer-and-consumer">Configuring Kafka producer and consumer</h5>
+<p>#####Configuring Kafka producer and consumer</p>
 
-<p>The <code>KafkaSystemDescriptor</code> allows you to specify any <a href="https://kafka.apache.org/documentation/#producerconfigs">Kafka producer</a> or <a href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka consumer</a>) property which are directly passed over to the underlying Kafka client. This allows for 
-precise control over the KafkaProducer and KafkaConsumer used by Samza. </p>
+<p>The <code class="language-plaintext highlighter-rouge">KafkaSystemDescriptor</code> allows you to specify any <a href="https://kafka.apache.org/documentation/#producerconfigs">Kafka producer</a> or <a href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka consumer</a>) property which are directly passed over to the underlying Kafka client. This allows for 
+precise control over the KafkaProducer and KafkaConsumer used by Samza.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
-        <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">).</span><span class="na">withConsumerZkConnect</span><span class="o">(..)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
+        <span class="k">new</span> <span class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">).</span><span class="na">withConsumerZkConnect</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withConsumerConfigs</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withProducerConfigs</span><span class="o">(..)</span></code></pre></figure>
 
-<h4 id="accessing-an-offset-which-is-out-of-range">Accessing an offset which is out-of-range</h4>
-
-<p>This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers. </p>
+<p>####Accessing an offset which is out-of-range
+This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>    <span class="n">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
-        <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">).</span><span class="na">withConsumerZkConnect</span><span class="o">(..)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">    <span class="nc">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
+        <span class="k">new</span> <span class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">).</span><span class="na">withConsumerZkConnect</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(..)</span>
-            <span class="o">.</span><span class="na">withConsumerAutoOffsetReset</span><span class="o">(</span><span class="s">&quot;largest&quot;</span><span class="o">)</span></code></pre></figure>
-
-<h5 id="ignoring-checkpointed-offsets">Ignoring checkpointed offsets</h5>
+            <span class="o">.</span><span class="na">withConsumerAutoOffsetReset</span><span class="o">(</span><span class="s">"largest"</span><span class="o">)</span></code></pre></figure>
 
-<p>Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with <code>KafkaInputDescriptor#shouldResetOffset()</code>.
-Once there are no checkpoints for a stream, the <code>#withOffsetDefault(..)</code> determines whether we start consumption from the oldest or newest offset. </p>
+<p>#####Ignoring checkpointed offsets
+Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with <code class="language-plaintext highlighter-rouge">KafkaInputDescriptor#shouldResetOffset()</code>.
+Once there are no checkpoints for a stream, the <code class="language-plaintext highlighter-rouge">#withOffsetDefault(..)</code> determines whether we start consumption from the oldest or newest offset.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> <span class="n">pageViewStreamDescriptor</span> <span class="o">=</span> 
-    <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;page-view-topic&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">))</span> 
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViewStreamDescriptor</span> <span class="o">=</span> 
+    <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"page-view-topic"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">))</span> 
         <span class="o">.</span><span class="na">shouldResetOffset</span><span class="o">()</span>
-        <span class="o">.</span><span class="na">withOffsetDefault</span><span class="o">(</span><span class="n">OffsetType</span><span class="o">.</span><span class="na">OLDEST</span><span class="o">);</span></code></pre></figure>
+        <span class="o">.</span><span class="na">withOffsetDefault</span><span class="o">(</span><span class="nc">OffsetType</span><span class="o">.</span><span class="na">OLDEST</span><span class="o">);</span></code></pre></figure>
 
-<p>The above example configures Samza to ignore checkpointed offsets for <code>page-view-topic</code> and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using <code>KafkaSystemDescriptor#withDefaultStreamOffsetDefault</code>.</p>
+<p>The above example configures Samza to ignore checkpointed offsets for <code class="language-plaintext highlighter-rouge">page-view-topic</code> and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using <code class="language-plaintext highlighter-rouge">KafkaSystemDescriptor#withDefaultStreamOffsetDefault</code>.</p>
 
 <h3 id="code-walkthrough-high-level-streams-api">Code walkthrough: High Level Streams API</h3>
 
 <p>In this section, we walk through a complete example that reads from a Kafka topic, filters a few messages and writes them to another topic.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Define coordinates of the Kafka cluster using the KafkaSystemDescriptor</span>
-<span class="mi">1</span>    <span class="n">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">)</span>
-<span class="mi">2</span>        <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="n">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
-<span class="mi">3</span>        <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="n">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Define coordinates of the Kafka cluster using the KafkaSystemDescriptor</span>
+<span class="mi">1</span>    <span class="nc">KafkaSystemDescriptor</span> <span class="n">kafkaSystemDescriptor</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KafkaSystemDescriptor</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
+<span class="mi">2</span>        <span class="o">.</span><span class="na">withConsumerZkConnect</span><span class="o">(</span><span class="no">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
+<span class="mi">3</span>        <span class="o">.</span><span class="na">withProducerBootstrapServers</span><span class="o">(</span><span class="no">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
 
 <span class="c1">// Create an KafkaInputDescriptor for your input topic and a KafkaOutputDescriptor for the output topic </span>
-<span class="mi">4</span>    <span class="n">KVSerde</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;</span> <span class="n">serde</span> <span class="o">=</span> <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
-<span class="mi">5</span>    <span class="n">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
-<span class="mi">6</span>        <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">&quot;page-views&quot;</span><span class="o">,</span> <span class="n">serde</span><span class="o">);</span>
-<span class="mi">7</span>    <span class="n">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
-<span class="mi">8</span>        <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">&quot;filtered-page-views&quot;</span><span class="o">,</span> <span class="n">serde</span><span class="o">);</span>
+<span class="mi">4</span>    <span class="nc">KVSerde</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">serde</span> <span class="o">=</span> <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
+<span class="mi">5</span>    <span class="nc">KafkaInputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">inputDescriptor</span> <span class="o">=</span>
+<span class="mi">6</span>        <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getInputDescriptor</span><span class="o">(</span><span class="s">"page-views"</span><span class="o">,</span> <span class="n">serde</span><span class="o">);</span>
+<span class="mi">7</span>    <span class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">outputDescriptor</span> <span class="o">=</span>
+<span class="mi">8</span>        <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span class="na">getOutputDescriptor</span><span class="o">(</span><span class="s">"filtered-page-views"</span><span class="o">,</span> <span class="n">serde</span><span class="o">);</span>
 
 
 <span class="c1">// Obtain a message stream the input topic</span>
-<span class="mi">9</span>    <span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">inputDescriptor</span><span class="o">);</span>
+<span class="mi">9</span>    <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="n">inputDescriptor</span><span class="o">);</span>
 
 <span class="c1">// Obtain an output stream for the topic    </span>
-<span class="mi">10</span>    <span class="n">OutputStream</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">&gt;&gt;</span> <span class="n">filteredPageViews</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputDescriptor</span><span class="o">);</span>
+<span class="mi">10</span>    <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">filteredPageViews</span> <span class="o">=</span> <span class="n">appDescriptor</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="n">outputDescriptor</span><span class="o">);</span>
 
 <span class="c1">// write results to the output topic</span>
 <span class="mi">11</span>    <span class="n">pageViews</span>
-<span class="mi">12</span>       <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">kv</span> <span class="o">-&gt;</span> <span class="o">!</span><span class="n">INVALID_USER_ID</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">kv</span><span class="o">.</span><span class="na">value</span><span class="o">.</span><span class="na">userId</span><span class="o">))</span>
+<span class="mi">12</span>       <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">kv</span> <span class="o">-&gt;</span> <span class="o">!</span><span class="no">INVALID_USER_ID</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">kv</span><span class="o">.</span><span class="na">value</span><span class="o">.</span><span class="na">userId</span><span class="o">))</span>
 <span class="mi">13</span>       <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">filteredPageViews</span><span class="o">);</span></code></pre></figure>
 
 <ul>
-<li><p>Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster</p></li>
-<li><p>Lines 4-6 defines a KafkaInputDescriptor for our input topic - <code>page-views</code></p></li>
-<li><p>Lines 7-9 defines a KafkaOutputDescriptor for our output topic - <code>filtered-page-views</code></p></li>
-<li><p>Line 9 creates a MessageStream for the input topic so that you can chain operations on it later</p></li>
-<li><p>Line 10 creates an OuputStream for the output topic</p></li>
-<li><p>Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream</p></li>
+  <li>
+    <p>Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster</p>
+  </li>
+  <li>
+    <p>Lines 4-6 defines a KafkaInputDescriptor for our input topic - <code class="language-plaintext highlighter-rouge">page-views</code></p>
+  </li>
+  <li>
+    <p>Lines 7-9 defines a KafkaOutputDescriptor for our output topic - <code class="language-plaintext highlighter-rouge">filtered-page-views</code></p>
+  </li>
+  <li>
+    <p>Line 9 creates a MessageStream for the input topic so that you can chain operations on it later</p>
+  </li>
+  <li>
+    <p>Line 10 creates an OuputStream for the output topic</p>
+  </li>
+  <li>
+    <p>Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream</p>
+  </li>
 </ul>