You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by aj...@apache.org on 2023/01/18 19:33:31 UTC

svn commit: r1906774 [43/49] - in /samza/site: ./ archive/ blog/ case-studies/ community/ contribute/ img/latest/learn/documentation/api/ learn/documentation/latest/ learn/documentation/latest/api/ learn/documentation/latest/api/javadocs/ learn/documen...

Modified: samza/site/learn/tutorials/latest/hello-samza-high-level-code.html
URL: http://svn.apache.org/viewvc/samza/site/learn/tutorials/latest/hello-samza-high-level-code.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/tutorials/latest/hello-samza-high-level-code.html (original)
+++ samza/site/learn/tutorials/latest/hello-samza-high-level-code.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -545,57 +551,54 @@
    limitations under the License.
 -->
 
-<p>This tutorial introduces the high level API by showing you how to build wikipedia application from the <a href="hello-samza-high-level-yarn.html">hello-samza high level API Yarn tutorial</a>. Upon completion of this tutorial, you&rsquo;ll know how to implement and configure a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>. Along the way, you&rsquo;ll see how to use some of the basic operators as well as how to leverage key-value stores and metrics in an app.</p>
+<p>This tutorial introduces the high level API by showing you how to build wikipedia application from the [hello-samza high level API Yarn tutorial] (hello-samza-high-level-yarn.html). Upon completion of this tutorial, you’ll know how to implement and configure a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>. Along the way, you’ll see how to use some of the basic operators as well as how to leverage key-value stores and metrics in an app.</p>
 
-<p>The same <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project is used for this tutorial as for many of the others. You will clone that project and by the end of the tutorial, you will have implemented a duplicate of the <code>WikipediaApplication</code>.</p>
+<p>The same <a href="https://github.com/apache/samza-hello-samza">hello-samza</a> project is used for this tutorial as for many of the others. You will clone that project and by the end of the tutorial, you will have implemented a duplicate of the <code class="language-plaintext highlighter-rouge">WikipediaApplication</code>.</p>
 
-<p>Let&rsquo;s get started.</p>
+<p>Let’s get started.</p>
 
 <h3 id="get-the-code">Get the Code</h3>
 
 <p>Check out the hello-samza project:</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>git clone https://gitbox.apache.org/repos/asf/samza-hello-samza.git hello-samza
-<span class="nb">cd</span> hello-samza
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">git clone https://gitbox.apache.org/repos/asf/samza-hello-samza.git hello-samza
+<span class="nb">cd </span>hello-samza
 git checkout latest</code></pre></figure>
 
-<p>This project already contains implementations of the wikipedia application using both the Low Level Task API and the High Level Streams API. The Low Level Task API implementations are in the <code>samza.examples.wikipedia.task</code> package. The High Level Streams API implementation is in the <code>samza.examples.wikipedia.application</code> package.</p>
+<p>This project already contains implementations of the wikipedia application using both the Low Level Task API and the High Level Streams API. The Low Level Task API implementations are in the <code class="language-plaintext highlighter-rouge">samza.examples.wikipedia.task</code> package. The High Level Streams API implementation is in the <code class="language-plaintext highlighter-rouge">samza.examples.wikipedia.application</code> package.</p>
 
 <p>This tutorial will provide step by step instructions to recreate the existing wikipedia application.</p>
 
 <h3 id="introduction-to-wikipedia-consumer">Introduction to Wikipedia Consumer</h3>
+<p>In order to consume events from Wikipedia, the hello-samza project includes a <code class="language-plaintext highlighter-rouge">WikipediaSystemFactory</code> implementation of the Samza <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemFactory.html">SystemFactory</a> that provides a <code class="language-plaintext highlighter-rouge">WikipediaConsumer</code>.</p>
 
-<p>In order to consume events from Wikipedia, the hello-samza project includes a <code>WikipediaSystemFactory</code> implementation of the Samza <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemFactory.html">SystemFactory</a> that provides a <code>WikipediaConsumer</code>.</p>
-
-<p>The WikipediaConsumer is an implementation of <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemConsumer.html">SystemConsumer</a> that can consume events from Wikipedia. It is also a listener for events from the <code>WikipediaFeed</code>. It&rsquo;s important to note that the events received in <code>onEvent</code> are of the type <code>WikipediaFeedEvent</code>, so we will expect that type for messages on our input streams. For other systems, the messages may come in the form of <code>byte[]</code>. In that case you may want to configure a samza <a href="/learn/documentation/latest/container/serialization.html">serde</a> and the application should expect the output type of that serde.</p>
+<p>The WikipediaConsumer is an implementation of <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemConsumer.html">SystemConsumer</a> that can consume events from Wikipedia. It is also a listener for events from the <code class="language-plaintext highlighter-rouge">WikipediaFeed</code>. It’s important to note that the events received in <code class="language-plaintext highlighter-rouge">onEvent</code> are of the type <code class="language-plaintext highlighter-rouge">WikipediaFeedEvent</code>, so we will expect that type for messages on our input streams. For other systems, the messages may come in the form of <code class="language-plaintext highlighter-rouge">byte[]</code>. In that case you may want to configure a samza <a href="/learn/documentation/latest/container/serialization.html">serde</a> and the application should expect the output type of that serde.</p>
 
-<p>Now that we understand the Wikipedia system and the types of inputs we&rsquo;ll be processing, we can proceed with creating our application.</p>
+<p>Now that we understand the Wikipedia system and the types of inputs we’ll be processing, we can proceed with creating our application.</p>
 
 <h3 id="create-the-initial-config">Create the Initial Config</h3>
-
 <p>In the hello-samza project, configs are kept in the <em>src/main/config/</em> path. This is where we will add the config for our application.
 Create a new file named <em>my-wikipedia-application.properties</em> in this location.</p>
 
 <h4 id="core-configuration">Core Configuration</h4>
+<p>Let’s start by adding some of the core properties to the file:</p>
 
-<p>Let&rsquo;s start by adding some of the core properties to the file:</p>
-
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
-<span class="c1"># or more contributor license agreements.  See the NOTICE file</span>
-<span class="c1"># distributed with this work for additional information</span>
-<span class="c1"># regarding copyright ownership.  The ASF licenses this file</span>
-<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
-<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
-<span class="c1"># with the License.  You may obtain a copy of the License at</span>
-<span class="c1">#</span>
-<span class="c1">#   http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="c1">#</span>
-<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
-<span class="c1"># software distributed under the License is distributed on an</span>
-<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
-<span class="c1"># KIND, either express or implied.  See the License for the</span>
-<span class="c1"># specific language governing permissions and limitations</span>
-<span class="c1"># under the License.</span>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="c"># Licensed to the Apache Software Foundation (ASF) under one</span>
+<span class="c"># or more contributor license agreements.  See the NOTICE file</span>
+<span class="c"># distributed with this work for additional information</span>
+<span class="c"># regarding copyright ownership.  The ASF licenses this file</span>
+<span class="c"># to you under the Apache License, Version 2.0 (the</span>
+<span class="c"># "License"); you may not use this file except in compliance</span>
+<span class="c"># with the License.  You may obtain a copy of the License at</span>
+<span class="c">#</span>
+<span class="c">#   http://www.apache.org/licenses/LICENSE-2.0</span>
+<span class="c">#</span>
+<span class="c"># Unless required by applicable law or agreed to in writing,</span>
+<span class="c"># software distributed under the License is distributed on an</span>
+<span class="c"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
+<span class="c"># KIND, either express or implied.  See the License for the</span>
+<span class="c"># specific language governing permissions and limitations</span>
+<span class="c"># under the License.</span>
 
 app.class<span class="o">=</span>samza.examples.wikipedia.application.MyWikipediaApplication
 app.runner.class<span class="o">=</span>org.apache.samza.runtime.RemoteApplicationRunner
@@ -604,37 +607,36 @@ job.factory.class<span class="o">=</span
 job.name<span class="o">=</span>my-wikipedia-application
 job.default.system<span class="o">=</span>kafka
 
-yarn.package.path<span class="o">=</span>file://<span class="si">${</span><span class="nv">basedir</span><span class="si">}</span>/target/<span class="si">${</span><span class="nv">project</span><span class="p">.artifactId</span><span class="si">}</span>-<span class="si">${</span><span class="nv">pom</span><span class="p">.version</span><span class="si">}</span>-dist.tar.gz</code></pre></figure>
+yarn.package.path<span class="o">=</span>file://<span class="k">${</span><span class="nv">basedir</span><span class="k">}</span>/target/<span class="k">${</span><span class="nv">project</span><span class="p">.artifactId</span><span class="k">}</span>-<span class="k">${</span><span class="nv">pom</span><span class="p">.version</span><span class="k">}</span><span class="nt">-dist</span>.tar.gz</code></pre></figure>
 
-<p>Be sure to include the Apache header. The project will not compile without it. </p>
+<p>Be sure to include the Apache header. The project will not compile without it.</p>
 
-<p>Here&rsquo;s a brief summary of what we configured so far.</p>
+<p>Here’s a brief summary of what we configured so far.</p>
 
 <ul>
-<li><strong>app.class</strong>: the class that defines the application logic. We will create this class later.</li>
-<li><strong>app.runner.class</strong>: the runner implementation which will launch our application. Since we are using YARN, we use <code>RemoteApplicationRunner</code> which is required for any cluster-based deployment.</li>
-<li><strong>job.factory.class</strong>: the <a href="/learn/documentation/latest/jobs/job-runner.html">factory</a> that will create the runtime instances of our jobs. Since we are using YARN, we want each job to be created as a <a href="/learn/documentation/latest/jobs/yarn-jobs.html">YARN job</a>, so we use <code>YarnJobFactory</code></li>
-<li><strong>job.name</strong>: the primary identifier for the job.</li>
-<li><strong>job.default.system</strong>: the default system to use for input, output, and internal metadata streams. This can be overridden on a per-stream basis. The <em>kafka</em> system will be defined in the next section.</li>
-<li><strong>yarn.package.path</strong>: tells YARN where to find the <a href="/learn/documentation/latest/jobs/packaging.html">job package</a> so the Node Managers can download it.</li>
+  <li><strong>app.class</strong>: the class that defines the application logic. We will create this class later.</li>
+  <li><strong>app.runner.class</strong>: the runner implementation which will launch our application. Since we are using YARN, we use <code class="language-plaintext highlighter-rouge">RemoteApplicationRunner</code> which is required for any cluster-based deployment.</li>
+  <li><strong>job.factory.class</strong>: the <a href="/learn/documentation/latest/jobs/job-runner.html">factory</a> that will create the runtime instances of our jobs. Since we are using YARN, we want each job to be created as a <a href="/learn/documentation/latest/jobs/yarn-jobs.html">YARN job</a>, so we use <code class="language-plaintext highlighter-rouge">YarnJobFactory</code></li>
+  <li><strong>job.name</strong>: the primary identifier for the job.</li>
+  <li><strong>job.default.system</strong>: the default system to use for input, output, and internal metadata streams. This can be overridden on a per-stream basis. The <em>kafka</em> system will be defined in the next section.</li>
+  <li><strong>yarn.package.path</strong>: tells YARN where to find the <a href="/learn/documentation/latest/jobs/packaging.html">job package</a> so the Node Managers can download it.</li>
 </ul>
 
 <p>These basic configurations are enough to launch the application on YARN but we haven’t defined any streaming systems for Samza to use, so the application would not process anything.</p>
 
-<p>Next, let&rsquo;s define the streaming systems with which the application will interact. </p>
+<p>Next, let’s define the streaming systems with which the application will interact.</p>
 
 <h4 id="define-systems">Define Systems</h4>
-
 <p>This Wikipedia application will consume events from Wikipedia and produce stats to a Kafka topic. We need to define those systems in config before Samza can use them. Add the following lines to the config:</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>systems.wikipedia.samza.factory<span class="o">=</span>samza.examples.wikipedia.system.WikipediaSystemFactory
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">systems.wikipedia.samza.factory<span class="o">=</span>samza.examples.wikipedia.system.WikipediaSystemFactory
 systems.wikipedia.host<span class="o">=</span>irc.wikimedia.org
-systems.wikipedia.port<span class="o">=</span><span class="m">6667</span>
+systems.wikipedia.port<span class="o">=</span>6667
 
 systems.kafka.samza.factory<span class="o">=</span>org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.consumer.zookeeper.connect<span class="o">=</span>localhost:2181/
 systems.kafka.producer.bootstrap.servers<span class="o">=</span>localhost:9092
-systems.kafka.default.stream.replication.factor<span class="o">=</span><span class="m">1</span></code></pre></figure>
+systems.kafka.default.stream.replication.factor<span class="o">=</span>1</code></pre></figure>
 
 <p>The above configuration defines 2 systems; one called <em>wikipedia</em> and one called <em>kafka</em>.</p>
 
@@ -643,31 +645,30 @@ systems.kafka.default.stream.replication
 <p>For the <em>kafka</em> system, we set the default replication factor to 1 for all streams because this application is intended for a demo deployment which utilizes a Kafka cluster with only 1 broker, so a replication factor larger than 1 is invalid.</p>
 
 <h4 id="configure-streams">Configure Streams</h4>
+<p>Samza identifies streams using a unique stream ID. In most cases, the stream ID is the same as the actual stream name. However, if a stream has a name that doesn’t match the pattern <code class="language-plaintext highlighter-rouge">[A-Za-z0-9_-]+</code>, we need to configure a separate <em>physical.name</em> to associate the actual stream name with a legal stream ID. The Wikipedia channels we will consume have a ‘#’ character in the names. So for each of them we must pick a legal stream ID and then configure the physical name to match the channel.</p>
 
-<p>Samza identifies streams using a unique stream ID. In most cases, the stream ID is the same as the actual stream name. However, if a stream has a name that doesn&rsquo;t match the pattern <code>[A-Za-z0-9_-]+</code>, we need to configure a separate <em>physical.name</em> to associate the actual stream name with a legal stream ID. The Wikipedia channels we will consume have a &lsquo;#&rsquo; character in the names. So for each of them we must pick a legal stream ID and then configure the physical name to match the channel.</p>
-
-<p>Samza uses the <em>job.default.system</em> for any streams that do not explicitly specify a system. In the previous sections, we defined 2 systems, <em>wikipedia</em> and <em>kafka</em>, and we configured <em>kafka</em> as the default. To understand why, let&rsquo;s look at the streams and how Samza will use them.</p>
+<p>Samza uses the <em>job.default.system</em> for any streams that do not explicitly specify a system. In the previous sections, we defined 2 systems, <em>wikipedia</em> and <em>kafka</em>, and we configured <em>kafka</em> as the default. To understand why, let’s look at the streams and how Samza will use them.</p>
 
 <p>For this app, Samza will:</p>
 
 <ol>
-<li>Consume from input streams</li>
-<li>Produce to an output stream and a metrics stream</li>
-<li>Both produce and consume from job-coordination, checkpoint, and changelog streams</li>
+  <li>Consume from input streams</li>
+  <li>Produce to an output stream and a metrics stream</li>
+  <li>Both produce and consume from job-coordination, checkpoint, and changelog streams</li>
 </ol>
 
-<p>While the <em>wikipedia</em> system is necessary for case 1, it does not support producers (we can&rsquo;t write Samza output to Wikipedia), which are needed for cases 2-3. So it is more convenient to use <em>kafka</em> as the default system. We can then explicitly configure the input streams to use the <em>wikipedia</em> system.</p>
+<p>While the <em>wikipedia</em> system is necessary for case 1, it does not support producers (we can’t write Samza output to Wikipedia), which are needed for cases 2-3. So it is more convenient to use <em>kafka</em> as the default system. We can then explicitly configure the input streams to use the <em>wikipedia</em> system.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>streams.en-wikipedia.samza.system<span class="o">=</span>wikipedia
-streams.en-wikipedia.samza.physical.name<span class="o">=</span><span class="c1">#en.wikipedia</span>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">streams.en-wikipedia.samza.system<span class="o">=</span>wikipedia
+streams.en-wikipedia.samza.physical.name<span class="o">=</span><span class="c">#en.wikipedia</span>
 
 streams.en-wiktionary.samza.system<span class="o">=</span>wikipedia
-streams.en-wiktionary.samza.physical.name<span class="o">=</span><span class="c1">#en.wiktionary</span>
+streams.en-wiktionary.samza.physical.name<span class="o">=</span><span class="c">#en.wiktionary</span>
 
 streams.en-wikinews.samza.system<span class="o">=</span>wikipedia
-streams.en-wikinews.samza.physical.name<span class="o">=</span><span class="c1">#en.wikinews</span></code></pre></figure>
+streams.en-wikinews.samza.physical.name<span class="o">=</span><span class="c">#en.wikinews</span></code></pre></figure>
 
-<p>The above configurations declare 3 streams with IDs, <em>en-wikipedia</em>, <em>en-wiktionary</em>, and <em>en-wikinews</em>. It associates each stream with the <em>wikipedia</em> system we defined earlier and set the physical name to the corresponding Wikipedia channel. </p>
+<p>The above configurations declare 3 streams with IDs, <em>en-wikipedia</em>, <em>en-wiktionary</em>, and <em>en-wikinews</em>. It associates each stream with the <em>wikipedia</em> system we defined earlier and set the physical name to the corresponding Wikipedia channel.</p>
 
 <p>Since all the Kafka streams for cases 2-3 are on the default system and do not include special characters in their names, we do not need to configure them explicitly.</p>
 
@@ -676,38 +677,37 @@ streams.en-wikinews.samza.physical.name<
 <p>With the core configuration settled, we turn our attention to code.</p>
 
 <h3 id="define-application-logic">Define Application Logic</h3>
+<p>Let’s create the application class we configured above. The next 8 sections walk you through writing the code for the Wikipedia application.</p>
 
-<p>Let&rsquo;s create the application class we configured above. The next 8 sections walk you through writing the code for the Wikipedia application.</p>
+<p>Create a new class named <code class="language-plaintext highlighter-rouge">MyWikipediaApplication</code> in the <code class="language-plaintext highlighter-rouge">samza.examples.wikipedia.application</code> package. The class must implement <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a> and should look like this:</p>
 
-<p>Create a new class named <code>MyWikipediaApplication</code> in the <code>samza.examples.wikipedia.application</code> package. The class must implement <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a> and should look like this:</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="cm">/*</span>
-<span class="cm"> * Licensed to the Apache Software Foundation (ASF) under one</span>
-<span class="cm"> * or more contributor license agreements.  See the NOTICE file</span>
-<span class="cm"> * distributed with this work for additional information</span>
-<span class="cm"> * regarding copyright ownership.  The ASF licenses this file</span>
-<span class="cm"> * to you under the Apache License, Version 2.0 (the</span>
-<span class="cm"> * &quot;License&quot;); you may not use this file except in compliance</span>
-<span class="cm"> * with the License.  You may obtain a copy of the License at</span>
-<span class="cm"> *</span>
-<span class="cm"> *   http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="cm"> *</span>
-<span class="cm"> * Unless required by applicable law or agreed to in writing,</span>
-<span class="cm"> * software distributed under the License is distributed on an</span>
-<span class="cm"> * &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
-<span class="cm"> * KIND, either express or implied.  See the License for the</span>
-<span class="cm"> * specific language governing permissions and limitations</span>
-<span class="cm"> * under the License.</span>
-<span class="cm"> */</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */</span>
 <span class="kn">package</span> <span class="nn">samza.examples.wikipedia.application</span><span class="o">;</span>
 
 <span class="kn">import</span> <span class="nn">org.apache.samza.application.StreamApplication</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.samza.config.Config</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.samza.operators.StreamGraph</span><span class="o">;</span>
 
-<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyWikipediaApplication</span> <span class="kd">implements</span> <span class="n">StreamApplication</span><span class="o">{</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyWikipediaApplication</span> <span class="kd">implements</span> <span class="nc">StreamApplication</span><span class="o">{</span>
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">StreamGraph</span> <span class="n">streamGraph</span><span class="o">,</span> <span class="n">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">StreamGraph</span> <span class="n">streamGraph</span><span class="o">,</span> <span class="nc">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span>
     
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
@@ -719,63 +719,59 @@ streams.en-wikinews.samza.physical.name<
 <p>Next, we will declare the input streams for the Wikipedia application.</p>
 
 <h4 id="inputs">Inputs</h4>
+<p>The Wikipedia application consumes events from three channels. Let’s declare each of those channels as an input streams via the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html">StreamGraph</a> in the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method.</p>
 
-<p>The Wikipedia application consumes events from three channels. Let&rsquo;s declare each of those channels as an input streams via the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html">StreamGraph</a> in the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method.</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">wikipediaEvents</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">&quot;en-wikipedia&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o">&lt;&gt;());</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">wiktionaryEvents</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">&quot;en-wiktionary&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o">&lt;&gt;());</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">wikiNewsEvents</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">&quot;en-wikinews&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o">&lt;&gt;());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">wikipediaEvents</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"en-wikipedia"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">NoOpSerde</span><span class="o">&lt;&gt;());</span>
+<span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">wiktionaryEvents</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"en-wiktionary"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">NoOpSerde</span><span class="o">&lt;&gt;());</span>
+<span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">wikiNewsEvents</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"en-wikinews"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">NoOpSerde</span><span class="o">&lt;&gt;());</span></code></pre></figure>
 
 <p>The first argument to the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-org.apache.samza.serializers.Serde-">getInputStream</a> method is the stream ID. Each ID must match the corresponding stream IDs we configured earlier.
-The second argument is the <code>Serde</code> used to deserialize the message. We&rsquo;ve set this to a <code>NoOpSerde</code> since our <code>wikipedia</code> system already returns <code>WikipediaFeedEvent</code>s and there is no need for further deserialization.</p>
+The second argument is the <code class="language-plaintext highlighter-rouge">Serde</code> used to deserialize the message. We’ve set this to a <code class="language-plaintext highlighter-rouge">NoOpSerde</code> since our <code class="language-plaintext highlighter-rouge">wikipedia</code> system already returns <code class="language-plaintext highlighter-rouge">WikipediaFeedEvent</code>s and there is no need for further deserialization.</p>
 
 <p>Note the streams are all MessageStreams of type WikipediaFeedEvent. <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html">MessageStream</a> is the in-memory representation of a stream in Samza. It uses generics to ensure type safety across the streams and operations.</p>
 
 <h4 id="merge">Merge</h4>
-
-<p>We&rsquo;d like to use the same processing logic for all three input streams, so we will use the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">mergeAll</a> operator to merge them together. Note: this is not the same as a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-">join</a> because we are not associating events by key. We are simply combining three streams into one, like a union.</p>
+<p>We’d like to use the same processing logic for all three input streams, so we will use the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">mergeAll</a> operator to merge them together. Note: this is not the same as a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-">join</a> because we are not associating events by key. We are simply combining three streams into one, like a union.</p>
 
 <p>Add the following snippet to the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method. It merges all the input streams into a new one called <em>allWikipediaEvents</em></p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o">&lt;</span><span class="n">WikipediaFeed</span><span class="o">.</span><span class="na">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">allWikipediaEvents</span> <span class="o">=</span> <span class="n">MessageStream</span><span class="o">.</span><span class="na">mergeAll</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">wikipediaEvents</span><span class="o">,</span> <span class="n">wiktionaryEvents</span><span class="o">,</span> <span class="n">wikiNewsEvents</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WikipediaFeed</span><span class="o">.</span><span class="na">WikipediaFeedEvent</span><span class="o">&gt;</span> <span class="n">allWikipediaEvents</span> <span class="o">=</span> <span class="nc">MessageStream</span><span class="o">.</span><span class="na">mergeAll</span><span class="o">(</span><span class="nc">ImmutableList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">wikipediaEvents</span><span class="o">,</span> <span class="n">wiktionaryEvents</span><span class="o">,</span> <span class="n">wikiNewsEvents</span><span class="o">));</span></code></pre></figure>
 
 <p>Note there is a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#merge-java.util.Collection-">merge</a> operator instance method on MessageStream, but the static <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">mergeAll</a> method is a more convenient alternative if you need to merge many streams.</p>
 
 <h4 id="parse">Parse</h4>
+<p>The next step is to parse the events and extract some information. We will use the pre-existing `WikipediaParser.parseEvent()’ method to do this. The parser extracts some flags we want to monitor as well as some metadata about the event. Inspect the method signature. The input is a WikipediaFeedEvents and the output is a Map&lt;String, Object&gt;. These types will be reflected in the types of the streams before and after the operation.</p>
 
-<p>The next step is to parse the events and extract some information. We will use the pre-existing `WikipediaParser.parseEvent()&lsquo; method to do this. The parser extracts some flags we want to monitor as well as some metadata about the event. Inspect the method signature. The input is a WikipediaFeedEvents and the output is a Map<String, Object>. These types will be reflected in the types of the streams before and after the operation.</p>
+<p>In the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method, invoke the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a> operation on <code class="language-plaintext highlighter-rouge">allWikipediaEvents</code>, passing the <code class="language-plaintext highlighter-rouge">WikipediaParser::parseEvent</code> method reference as follows:</p>
 
-<p>In the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method, invoke the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a> operation on <code>allWikipediaEvents</code>, passing the <code>WikipediaParser::parseEvent</code> method reference as follows:</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">WikipediaParser</span><span class="o">::</span><span class="n">parseEvent</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="nl">WikipediaParser:</span><span class="o">:</span><span class="n">parseEvent</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="window">Window</h4>
-
-<p>Now that we have the relevant information extracted, let&rsquo;s perform some aggregations over a 10-second <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html">window</a>.</p>
+<p>Now that we have the relevant information extracted, let’s perform some aggregations over a 10-second <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html">window</a>.</p>
 
 <p>First, we need a container class for statistics we want to track. Add the following static class after the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">private</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WikipediaStats</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WikipediaStats</span> <span class="o">{</span>
   <span class="kt">int</span> <span class="n">edits</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
   <span class="kt">int</span> <span class="n">byteDiff</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
-  <span class="n">Set</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">titles</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;();</span>
-  <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">counts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;();</span>
+  <span class="nc">Set</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">titles</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashSet</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;();</span>
+  <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">counts</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashMap</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;();</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>Now we need to define the logic to aggregate the stats over the duration of the window. To do this, we implement <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a> by adding the following class after the <code>WikipediaStats</code> class:</p>
+<p>Now we need to define the logic to aggregate the stats over the duration of the window. To do this, we implement <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a> by adding the following class after the <code class="language-plaintext highlighter-rouge">WikipediaStats</code> class:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">private</span> <span class="kd">class</span> <span class="nc">WikipediaStatsAggregator</span> <span class="kd">implements</span> <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">&gt;,</span> <span class="n">WikipediaStats</span><span class="o">&gt;</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">class</span> <span class="nc">WikipediaStatsAggregator</span> <span class="kd">implements</span> <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Object</span><span class="o">&gt;,</span> <span class="nc">WikipediaStats</span><span class="o">&gt;</span> <span class="o">{</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="n">WikipediaStats</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">&gt;</span> <span class="n">edit</span><span class="o">,</span> <span class="n">WikipediaStats</span> <span class="n">stats</span><span class="o">)</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="nc">WikipediaStats</span> <span class="nf">apply</span><span class="o">(</span><span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Object</span><span class="o">&gt;</span> <span class="n">edit</span><span class="o">,</span> <span class="nc">WikipediaStats</span> <span class="n">stats</span><span class="o">)</span> <span class="o">{</span>
     <span class="c1">// Update window stats</span>
     <span class="n">stats</span><span class="o">.</span><span class="na">edits</span><span class="o">++;</span>
-    <span class="n">stats</span><span class="o">.</span><span class="na">byteDiff</span> <span class="o">+=</span> <span class="o">(</span><span class="n">Integer</span><span class="o">)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;diff-bytes&quot;</span><span class="o">);</span>
-    <span class="n">stats</span><span class="o">.</span><span class="na">titles</span><span class="o">.</span><span class="na">add</span><span class="o">((</span><span class="n">String</span><span class="o">)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;title&quot;</span><span class="o">));</span>
+    <span class="n">stats</span><span class="o">.</span><span class="na">byteDiff</span> <span class="o">+=</span> <span class="o">(</span><span class="nc">Integer</span><span class="o">)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"diff-bytes"</span><span class="o">);</span>
+    <span class="n">stats</span><span class="o">.</span><span class="na">titles</span><span class="o">.</span><span class="na">add</span><span class="o">((</span><span class="nc">String</span><span class="o">)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"title"</span><span class="o">));</span>
 
-    <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Boolean</span><span class="o">&gt;</span> <span class="n">flags</span> <span class="o">=</span> <span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Boolean</span><span class="o">&gt;)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;flags&quot;</span><span class="o">);</span>
-    <span class="k">for</span> <span class="o">(</span><span class="n">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Boolean</span><span class="o">&gt;</span> <span class="n">flag</span> <span class="o">:</span> <span class="n">flags</span><span class="o">.</span><span class="na">entrySet</span><span class="o">())</span> <span class="o">{</span>
-      <span class="k">if</span> <span class="o">(</span><span class="n">Boolean</span><span class="o">.</span><span class="na">TRUE</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">flag</span><span class="o">.</span><span class="na">getValue</span><span class="o">()))</span> <span class="o">{</span>
+    <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Boolean</span><span class="o">&gt;</span> <span class="n">flags</span> <span class="o">=</span> <span class="o">(</span><span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Boolean</span><span class="o">&gt;)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"flags"</span><span class="o">);</span>
+    <span class="k">for</span> <span class="o">(</span><span class="nc">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Boolean</span><span class="o">&gt;</span> <span class="n">flag</span> <span class="o">:</span> <span class="n">flags</span><span class="o">.</span><span class="na">entrySet</span><span class="o">())</span> <span class="o">{</span>
+      <span class="k">if</span> <span class="o">(</span><span class="nc">Boolean</span><span class="o">.</span><span class="na">TRUE</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">flag</span><span class="o">.</span><span class="na">getValue</span><span class="o">()))</span> <span class="o">{</span>
         <span class="n">stats</span><span class="o">.</span><span class="na">counts</span><span class="o">.</span><span class="na">compute</span><span class="o">(</span><span class="n">flag</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="o">(</span><span class="n">k</span><span class="o">,</span> <span class="n">v</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">v</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">?</span> <span class="mi">0</span> <span class="o">:</span> <span class="n">v</span> <span class="o">+</span> <span class="mi">1</span><span class="o">);</span>
       <span class="o">}</span>
     <span class="o">}</span>
@@ -784,29 +780,28 @@ The second argument is the <code>Serde</
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>Note: the type parameters for <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a> reflect the upstream data type and the window value type, respectively. In our case, the upstream type is the output of the parser and the window value is our <code>WikipediaStats</code> class.</p>
+<p>Note: the type parameters for <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a> reflect the upstream data type and the window value type, respectively. In our case, the upstream type is the output of the parser and the window value is our <code class="language-plaintext highlighter-rouge">WikipediaStats</code> class.</p>
 
 <p>Finally, we can define our <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html">window</a> back in the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a> method by chaining the result of the parser:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">WikipediaParser</span><span class="o">::</span><span class="n">parseEvent</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="n">WikipediaStats</span><span class="o">::</span><span class="k">new</span><span class="o">,</span> <span class="k">new</span> <span class="n">WikipediaStatsAggregator</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">WikipediaStats</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="nl">WikipediaParser:</span><span class="o">:</span><span class="n">parseEvent</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="nl">WikipediaStats:</span><span class="o">:</span><span class="k">new</span><span class="o">,</span> <span class="k">new</span> <span class="nc">WikipediaStatsAggregator</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">WikipediaStats</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span></code></pre></figure>
 
-<p>This defines an unkeyed <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Windows.html">tumbling window</a> that spans 10s, which instantiates a new <code>WikipediaStats</code> object at the beginning of each window and aggregates the stats using <code>WikipediaStatsAggregator</code>.</p>
+<p>This defines an unkeyed <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Windows.html">tumbling window</a> that spans 10s, which instantiates a new <code class="language-plaintext highlighter-rouge">WikipediaStats</code> object at the beginning of each window and aggregates the stats using <code class="language-plaintext highlighter-rouge">WikipediaStatsAggregator</code>.</p>
 
-<p>The output of the window is a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/WindowPane.html">WindowPane</a> with a key and value. Since we used an unkeyed tumbling window, the key is <code>Void</code>. The value is our <code>WikipediaStats</code> object.</p>
+<p>The output of the window is a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/WindowPane.html">WindowPane</a> with a key and value. Since we used an unkeyed tumbling window, the key is <code class="language-plaintext highlighter-rouge">Void</code>. The value is our <code class="language-plaintext highlighter-rouge">WikipediaStats</code> object.</p>
 
 <h4 id="output">Output</h4>
+<p>We will do a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a> at the end to format our window output. Let’s begin by defining a simple container class for our formatted output.</p>
 
-<p>We will do a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a> at the end to format our window output. Let&rsquo;s begin by defining a simple container class for our formatted output.</p>
-
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>  <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WikipediaStatsOutput</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WikipediaStatsOutput</span> <span class="o">{</span>
     <span class="kd">public</span> <span class="kt">int</span> <span class="n">edits</span><span class="o">;</span>
     <span class="kd">public</span> <span class="kt">int</span> <span class="n">bytesAdded</span><span class="o">;</span>
     <span class="kd">public</span> <span class="kt">int</span> <span class="n">uniqueTitles</span><span class="o">;</span>
-    <span class="kd">public</span> <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">counts</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">counts</span><span class="o">;</span>
 
     <span class="kd">public</span> <span class="nf">WikipediaStatsOutput</span><span class="o">(</span><span class="kt">int</span> <span class="n">edits</span><span class="o">,</span> <span class="kt">int</span> <span class="n">bytesAdded</span><span class="o">,</span> <span class="kt">int</span> <span class="n">uniqueTitles</span><span class="o">,</span>
-        <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">counts</span><span class="o">)</span> <span class="o">{</span>
+        <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">counts</span><span class="o">)</span> <span class="o">{</span>
       <span class="k">this</span><span class="o">.</span><span class="na">edits</span> <span class="o">=</span> <span class="n">edits</span><span class="o">;</span>
       <span class="k">this</span><span class="o">.</span><span class="na">bytesAdded</span> <span class="o">=</span> <span class="n">bytesAdded</span><span class="o">;</span>
       <span class="k">this</span><span class="o">.</span><span class="na">uniqueTitles</span> <span class="o">=</span> <span class="n">uniqueTitles</span><span class="o">;</span>
@@ -816,46 +811,45 @@ The second argument is the <code>Serde</
 
 <p>Paste the following after the aggregator class:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>  <span class="kd">private</span> <span class="n">WikipediaStatsOutput</span> <span class="nf">formatOutput</span><span class="o">(</span><span class="n">WindowPane</span><span class="o">&lt;</span><span class="n">Void</span><span class="o">,</span> <span class="n">WikipediaStats</span><span class="o">&gt;</span> <span class="n">statsWindowPane</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">WikipediaStats</span> <span class="n">stats</span> <span class="o">=</span> <span class="n">statsWindowPane</span><span class="o">.</span><span class="na">getMessage</span><span class="o">();</span>
-    <span class="k">return</span> <span class="k">new</span> <span class="n">WikipediaStatsOutput</span><span class="o">(</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  <span class="kd">private</span> <span class="nc">WikipediaStatsOutput</span> <span class="nf">formatOutput</span><span class="o">(</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">,</span> <span class="nc">WikipediaStats</span><span class="o">&gt;</span> <span class="n">statsWindowPane</span><span class="o">)</span> <span class="o">{</span>
+    <span class="nc">WikipediaStats</span> <span class="n">stats</span> <span class="o">=</span> <span class="n">statsWindowPane</span><span class="o">.</span><span class="na">getMessage</span><span class="o">();</span>
+    <span class="k">return</span> <span class="k">new</span> <span class="nf">WikipediaStatsOutput</span><span class="o">(</span>
         <span class="n">stats</span><span class="o">.</span><span class="na">edits</span><span class="o">,</span> <span class="n">stats</span><span class="o">.</span><span class="na">byteDiff</span><span class="o">,</span> <span class="n">stats</span><span class="o">.</span><span class="na">titles</span><span class="o">.</span><span class="na">size</span><span class="o">(),</span> <span class="n">stats</span><span class="o">.</span><span class="na">counts</span><span class="o">);</span>
   <span class="o">}</span></code></pre></figure>
 
 <p>Now, we can invoke the method by adding another <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a> operation to the chain in <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>. The operator chain should now look like this:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">WikipediaParser</span><span class="o">::</span><span class="n">parseEvent</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="n">WikipediaStats</span><span class="o">::</span><span class="k">new</span><span class="o">,</span> <span class="k">new</span> <span class="n">WikipediaStatsAggregator</span><span class="o">()))</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="nl">WikipediaParser:</span><span class="o">:</span><span class="n">parseEvent</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="nl">WikipediaStats:</span><span class="o">:</span><span class="k">new</span><span class="o">,</span> <span class="k">new</span> <span class="nc">WikipediaStatsAggregator</span><span class="o">()))</span>
         <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">formatOutput</span><span class="o">);</span></code></pre></figure>
 
 <p>Next we need to get the output stream to which we will send the stats. Insert the following line below the creation of the 3 input streams:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span>  <span class="n">OutputStream</span><span class="o">&lt;</span><span class="n">WikipediaStatsOutput</span><span class="o">&gt;</span> <span class="n">wikipediaStats</span> <span class="o">=</span>
-     <span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="s">&quot;wikipedia-stats&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="n">WikipediaStatsOutput</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="nc">WikipediaStatsOutput</span><span class="o">&gt;</span> <span class="n">wikipediaStats</span> <span class="o">=</span>
+     <span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="s">"wikipedia-stats"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">WikipediaStatsOutput</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></figure>
 
 <p>The <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/OutputStream.html">OutputStream</a> is parameterized by the type of the output.</p>
 
-<p>The first parameter of <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-org.apache.samza.serializers.Serde-">getOutputStream</a> is the output stream ID. We will use <em>wikipedia-stats</em> and since it contains no special characters, we won&rsquo;t bother configuring a physical name so Samza will use the stream ID as the topic name.
-The second parameter is the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/Serde.html">Serde</a> to serialize the outgoing message. We will set it to <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/JsonSerdeV2.html">JsonSerdeV2</a> to serialize our <code>WikipediaStatsOutput</code> as a JSON string.</p>
+<p>The first parameter of <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-org.apache.samza.serializers.Serde-">getOutputStream</a> is the output stream ID. We will use <em>wikipedia-stats</em> and since it contains no special characters, we won’t bother configuring a physical name so Samza will use the stream ID as the topic name.
+The second parameter is the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/Serde.html">Serde</a> to serialize the outgoing message. We will set it to <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/JsonSerdeV2.html">JsonSerdeV2</a> to serialize our <code class="language-plaintext highlighter-rouge">WikipediaStatsOutput</code> as a JSON string.</p>
 
 <p>Finally, we can send our output to the output stream using the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-">sendTo</a> operator:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">WikipediaParser</span><span class="o">::</span><span class="n">parseEvent</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="n">WikipediaStats</span><span class="o">::</span><span class="k">new</span><span class="o">,</span> <span class="k">new</span> <span class="n">WikipediaStatsAggregator</span><span class="o">()))</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">allWikipediaEvents</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="nl">WikipediaParser:</span><span class="o">:</span><span class="n">parseEvent</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="nl">WikipediaStats:</span><span class="o">:</span><span class="k">new</span><span class="o">,</span> <span class="k">new</span> <span class="nc">WikipediaStatsAggregator</span><span class="o">()))</span>
         <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">formatOutput</span><span class="o">)</span>
         <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">wikipediaStats</span><span class="o">);</span></code></pre></figure>
 
-<p>Tip: Because the MessageStream type information is preserved in the operator chain, it is often easier to define the OutputStream inline with the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-">sendTo</a> operator and then refactor it for readability. That way you don&rsquo;t have to hunt down the types.</p>
+<p>Tip: Because the MessageStream type information is preserved in the operator chain, it is often easier to define the OutputStream inline with the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-">sendTo</a> operator and then refactor it for readability. That way you don’t have to hunt down the types.</p>
 
 <h4 id="kvstore">KVStore</h4>
-
 <p>We now have an operational Wikipedia application which provides stats aggregated over a 10 second interval. One of those stats is a count of the number of edits within the 10s window. But what if we want to keep an additional durable counter of the total edits?</p>
 
 <p>We will do this by keeping a separate count outside the window and persisting it in a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/storage/kv/KeyValueStore.html">KeyValueStore</a>.</p>
 
 <p>We start by defining the store in the config file:</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>serializers.registry.string.class<span class="o">=</span>org.apache.samza.serializers.StringSerdeFactory
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">serializers.registry.string.class<span class="o">=</span>org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.integer.class<span class="o">=</span>org.apache.samza.serializers.IntegerSerdeFactory
 
 stores.wikipedia-stats.factory<span class="o">=</span>org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
@@ -863,40 +857,39 @@ stores.wikipedia-stats.changelog<span cl
 stores.wikipedia-stats.key.serde<span class="o">=</span>string
 stores.wikipedia-stats.msg.serde<span class="o">=</span>integer</code></pre></figure>
 
-<p>These properties declare a <a href="http://rocksdb.org/">RocksDB</a> key-value store named &ldquo;wikipedia-stats&rdquo;. The store is replicated to a changelog stream called &ldquo;wikipedia-stats-changelog&rdquo; on the <em>kafka</em> system for durability. It uses the <em>string</em> and <em>integer</em> serdes for keys and values respectively.</p>
+<p>These properties declare a <a href="http://rocksdb.org/">RocksDB</a> key-value store named “wikipedia-stats”. The store is replicated to a changelog stream called “wikipedia-stats-changelog” on the <em>kafka</em> system for durability. It uses the <em>string</em> and <em>integer</em> serdes for keys and values respectively.</p>
 
-<p>Next, we add a total count member variable to the <code>WikipediaStats</code> class, and to the <code>WikipediaStatsOutput</code> class:</p>
+<p>Next, we add a total count member variable to the <code class="language-plaintext highlighter-rouge">WikipediaStats</code> class, and to the <code class="language-plaintext highlighter-rouge">WikipediaStatsOutput</code> class:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kt">int</span> <span class="n">totalEdits</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kt">int</span> <span class="n">totalEdits</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span></code></pre></figure>
 
-<p>To use the store in the application, we need to get it from the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/TaskContext.html">TaskContext</a>. Also, since we want to emit the total edit count along with the window edit count, it&rsquo;s easiest to update both of them in our aggregator. Declare the store as a member variable of the <code>WikipediaStatsAggregator</code> class:</p>
+<p>To use the store in the application, we need to get it from the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/TaskContext.html">TaskContext</a>. Also, since we want to emit the total edit count along with the window edit count, it’s easiest to update both of them in our aggregator. Declare the store as a member variable of the <code class="language-plaintext highlighter-rouge">WikipediaStatsAggregator</code> class:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">private</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">store</span><span class="o">;</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="nc">KeyValueStore</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">store</span><span class="o">;</span></code></pre></figure>
 
-<p>Then override the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.context.Context-">init</a> method in <code>WikipediaStatsAggregator</code> to initialize the store.</p>
+<p>Then override the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.context.Context-">init</a> method in <code class="language-plaintext highlighter-rouge">WikipediaStatsAggregator</code> to initialize the store.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="nd">@Override</span>
-<span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Config</span> <span class="n">config</span><span class="o">,</span> <span class="n">TaskContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
-  <span class="n">store</span> <span class="o">=</span> <span class="o">(</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStore</span><span class="o">(</span><span class="s">&quot;wikipedia-stats&quot;</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nd">@Override</span>
+<span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Config</span> <span class="n">config</span><span class="o">,</span> <span class="nc">TaskContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
+  <span class="n">store</span> <span class="o">=</span> <span class="o">(</span><span class="nc">KeyValueStore</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStore</span><span class="o">(</span><span class="s">"wikipedia-stats"</span><span class="o">);</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>Update and persist the counter in the <code>apply</code> method.</p>
+<p>Update and persist the counter in the <code class="language-plaintext highlighter-rouge">apply</code> method.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">Integer</span> <span class="n">editsAllTime</span> <span class="o">=</span> <span class="n">store</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;count-edits-all-time&quot;</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">Integer</span> <span class="n">editsAllTime</span> <span class="o">=</span> <span class="n">store</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"count-edits-all-time"</span><span class="o">);</span>
 <span class="k">if</span> <span class="o">(</span><span class="n">editsAllTime</span> <span class="o">==</span> <span class="kc">null</span><span class="o">)</span> <span class="n">editsAllTime</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
 <span class="n">editsAllTime</span><span class="o">++;</span>
-<span class="n">store</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;count-edits-all-time&quot;</span><span class="o">,</span> <span class="n">editsAllTime</span><span class="o">);</span>
+<span class="n">store</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"count-edits-all-time"</span><span class="o">,</span> <span class="n">editsAllTime</span><span class="o">);</span>
 <span class="n">stats</span><span class="o">.</span><span class="na">totalEdits</span> <span class="o">=</span> <span class="n">editsAllTime</span><span class="o">;</span></code></pre></figure>
 
-<p>Finally, update the <code>MyWikipediaApplication#formatOutput</code> method to include the total counter in its <code>WikipediaStatsOutput</code>.</p>
+<p>Finally, update the <code class="language-plaintext highlighter-rouge">MyWikipediaApplication#formatOutput</code> method to include the total counter in its <code class="language-plaintext highlighter-rouge">WikipediaStatsOutput</code>.</p>
 
 <h4 id="metrics">Metrics</h4>
-
-<p>Lastly, let&rsquo;s add a metric to the application which counts the number of repeat edits each topic within the window interval.</p>
+<p>Lastly, let’s add a metric to the application which counts the number of repeat edits each topic within the window interval.</p>
 
 <p>As with the key-value store, we must first define the metrics reporters in the config file.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>metrics.reporters<span class="o">=</span>snapshot,jmx
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">metrics.reporters<span class="o">=</span>snapshot,jmx
 metrics.reporter.snapshot.class<span class="o">=</span>org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
 metrics.reporter.snapshot.stream<span class="o">=</span>kafka.metrics
 metrics.reporter.jmx.class<span class="o">=</span>org.apache.samza.metrics.reporter.JmxReporterFactory</code></pre></figure>
@@ -905,28 +898,26 @@ metrics.reporter.jmx.class<span class="o
 
 <p>In the WikipediaStatsAggregator, declare a counter member variable.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">private</span> <span class="n">Counter</span> <span class="n">repeatEdits</span><span class="o">;</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="nc">Counter</span> <span class="n">repeatEdits</span><span class="o">;</span></code></pre></figure>
 
-<p>Then add the following to the <code>WikipediaStatsAggregator#init</code> method to initialize the counter.</p>
+<p>Then add the following to the <code class="language-plaintext highlighter-rouge">WikipediaStatsAggregator#init</code> method to initialize the counter.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">repeatEdits</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getMetricsRegistry</span><span class="o">().</span><span class="na">newCounter</span><span class="o">(</span><span class="s">&quot;edit-counters&quot;</span><span class="o">,</span> <span class="s">&quot;repeat-edits&quot;</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">repeatEdits</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getMetricsRegistry</span><span class="o">().</span><span class="na">newCounter</span><span class="o">(</span><span class="s">"edit-counters"</span><span class="o">,</span> <span class="s">"repeat-edits"</span><span class="o">);</span></code></pre></figure>
 
-<p>Update and persist the counter from the <code>apply</code> method.</p>
+<p>Update and persist the counter from the <code class="language-plaintext highlighter-rouge">apply</code> method.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kt">boolean</span> <span class="n">newTitle</span> <span class="o">=</span> <span class="n">stats</span><span class="o">.</span><span class="na">titles</span><span class="o">.</span><span class="na">add</span><span class="o">((</span><span class="n">String</span><span class="o">)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;title&quot;</span><span class="o">));</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kt">boolean</span> <span class="n">newTitle</span> <span class="o">=</span> <span class="n">stats</span><span class="o">.</span><span class="na">titles</span><span class="o">.</span><span class="na">add</span><span class="o">((</span><span class="nc">String</span><span class="o">)</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"title"</span><span class="o">));</span>
 
 <span class="k">if</span> <span class="o">(!</span><span class="n">newTitle</span><span class="o">)</span> <span class="o">{</span>
   <span class="n">repeatEdits</span><span class="o">.</span><span class="na">inc</span><span class="o">();</span>
-  <span class="n">log</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">&quot;Frequent edits for title: {}&quot;</span><span class="o">,</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;title&quot;</span><span class="o">));</span>
+  <span class="n">log</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">"Frequent edits for title: {}"</span><span class="o">,</span> <span class="n">edit</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"title"</span><span class="o">));</span>
 <span class="o">}</span></code></pre></figure>
 
 <h4 id="run-and-view-plan">Run and View Plan</h4>
-
-<p>You can set up the grid and run the application using the same instructions from the <a href="hello-samza-high-level-yarn.html">hello samza high level API Yarn tutorial</a>. The only difference is to replace the <code>wikipedia-application.properties</code> config file in the <em>config-path</em> command line parameter with <code>my-wikipedia-application.properties</code></p>
+<p>You can set up the grid and run the application using the same instructions from the [hello samza high level API Yarn tutorial] (hello-samza-high-level-yarn.html). The only difference is to replace the <code class="language-plaintext highlighter-rouge">wikipedia-application.properties</code> config file in the <em>config-path</em> command line parameter with <code class="language-plaintext highlighter-rouge">my-wikipedia-application.properties</code></p>
 
 <h3 id="summary">Summary</h3>
-
-<p>Congratulations! You have built and executed a Wikipedia stream application on Samza using the high level API. The final application should be directly comparable to the pre-existing <code>WikipediaApplication</code> in the project.</p>
+<p>Congratulations! You have built and executed a Wikipedia stream application on Samza using the high level API. The final application should be directly comparable to the pre-existing <code class="language-plaintext highlighter-rouge">WikipediaApplication</code> in the project.</p>
 
 <p>You can provide feedback on this tutorial in the <a href="mailto:dev@samza.apache.org">dev mailing list</a>.</p>