You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by aj...@apache.org on 2023/01/18 19:33:31 UTC

svn commit: r1906774 [32/49] - in /samza/site: ./ archive/ blog/ case-studies/ community/ contribute/ img/latest/learn/documentation/api/ learn/documentation/latest/ learn/documentation/latest/api/ learn/documentation/latest/api/javadocs/ learn/documen...

Modified: samza/site/learn/documentation/latest/container/state-management.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/container/state-management.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/container/state-management.html (original)
+++ samza/site/learn/documentation/latest/container/state-management.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/container/state-management">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/container/state-management">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/container/state-management">1.6.0</a></li>
 
               
@@ -639,21 +653,21 @@
    limitations under the License.
 -->
 
-<p>One of the more interesting features of Samza is stateful stream processing. Tasks can store and query data through APIs provided by Samza. That data is stored on the same machine as the stream task; compared to connecting over the network to a remote database, Samza&rsquo;s local state allows you to read and write large amounts of data with better performance. Samza replicates this state across multiple machines for fault-tolerance (described in detail below).</p>
+<p>One of the more interesting features of Samza is stateful stream processing. Tasks can store and query data through APIs provided by Samza. That data is stored on the same machine as the stream task; compared to connecting over the network to a remote database, Samza’s local state allows you to read and write large amounts of data with better performance. Samza replicates this state across multiple machines for fault-tolerance (described in detail below).</p>
 
-<p>Some stream processing jobs don&rsquo;t require state: if you only need to transform one message at a time, or filter out messages based on some condition, your job can be simple. Every call to your task&rsquo;s <a href="../api/overview.html">process method</a> handles one incoming message, and each message is independent of all the other messages.</p>
+<p>Some stream processing jobs don’t require state: if you only need to transform one message at a time, or filter out messages based on some condition, your job can be simple. Every call to your task’s <a href="../api/overview.html">process method</a> handles one incoming message, and each message is independent of all the other messages.</p>
 
-<p>However, being able to maintain state opens up many possibilities for sophisticated stream processing jobs: joining input streams, grouping messages and aggregating groups of messages. By analogy to SQL, the <em>select</em> and <em>where</em> clauses of a query are usually stateless, but <em>join</em>, <em>group by</em> and aggregation functions like <em>sum</em> and <em>count</em> require state. Samza doesn&rsquo;t yet provide a higher-level SQL-like language, but it does provide lower-level primitives that you can use to implement streaming aggregation and joins.</p>
+<p>However, being able to maintain state opens up many possibilities for sophisticated stream processing jobs: joining input streams, grouping messages and aggregating groups of messages. By analogy to SQL, the <em>select</em> and <em>where</em> clauses of a query are usually stateless, but <em>join</em>, <em>group by</em> and aggregation functions like <em>sum</em> and <em>count</em> require state. Samza doesn’t yet provide a higher-level SQL-like language, but it does provide lower-level primitives that you can use to implement streaming aggregation and joins.</p>
 
 <h3 id="common-use-cases-for-stateful-processing">Common use cases for stateful processing</h3>
 
-<p>First, let&rsquo;s look at some simple examples of stateful stream processing that might be seen in the backend of a consumer website. Later in this page we&rsquo;ll discuss how to implement these applications using Samza&rsquo;s built-in key-value storage capabilities.</p>
+<p>First, let’s look at some simple examples of stateful stream processing that might be seen in the backend of a consumer website. Later in this page we’ll discuss how to implement these applications using Samza’s built-in key-value storage capabilities.</p>
 
 <h4 id="windowed-aggregation">Windowed aggregation</h4>
 
 <p><em>Example: Counting the number of page views for each user per hour</em></p>
 
-<p>In this case, your state typically consists of a number of counters which are incremented when a message is processed. The aggregation is typically limited to a time window (e.g. 1 minute, 1 hour, 1 day) so that you can observe changes of activity over time. This kind of windowed processing is common for ranking and relevance, detecting &ldquo;trending topics&rdquo;, as well as real-time reporting and monitoring.</p>
+<p>In this case, your state typically consists of a number of counters which are incremented when a message is processed. The aggregation is typically limited to a time window (e.g. 1 minute, 1 hour, 1 day) so that you can observe changes of activity over time. This kind of windowed processing is common for ranking and relevance, detecting “trending topics”, as well as real-time reporting and monitoring.</p>
 
 <p>The simplest implementation keeps this state in memory (e.g. a hash map in the task instances), and writes it to a database or output stream at the end of every time window. However, you need to consider what happens when a container fails and your in-memory state is lost. You might be able to restore it by processing all the messages in the current window again, but that might take a long time if the window covers a long period of time. Samza can speed up this recovery by making the state fault-tolerant rather than trying to recompute it.</p>
 
@@ -668,18 +682,18 @@
 <p>There are several real-life examples of data normalization which essentially work in this way:</p>
 
 <ul>
-<li>E-commerce companies like Amazon and EBay need to import feeds of merchandise from merchants, normalize them by product, and present products with all the associated merchants and pricing information.</li>
-<li>Web search requires building a crawler which creates essentially a <a href="http://labs.yahoo.com/files/YahooWebmap.pdf">table of web page contents</a> and joins on all the relevance attributes such as click-through ratio or pagerank.</li>
-<li>Social networks take feeds of user-entered text and need to normalize out entities such as companies, schools, and skills.</li>
+  <li>E-commerce companies like Amazon and EBay need to import feeds of merchandise from merchants, normalize them by product, and present products with all the associated merchants and pricing information.</li>
+  <li>Web search requires building a crawler which creates essentially a <a href="http://labs.yahoo.com/files/YahooWebmap.pdf">table of web page contents</a> and joins on all the relevance attributes such as click-through ratio or pagerank.</li>
+  <li>Social networks take feeds of user-entered text and need to normalize out entities such as companies, schools, and skills.</li>
 </ul>
 
 <p>Each of these use cases is a massively complex data normalization problem that can be thought of as constructing a materialized view over many input tables. Samza can help implement such data processing pipelines robustly.</p>
 
 <h4 id="stream-table-join">Stream-table join</h4>
 
-<p><em>Example: Augment a stream of page view events with the user&rsquo;s ZIP code (perhaps to allow aggregation by zip code in a later stage)</em></p>
+<p><em>Example: Augment a stream of page view events with the user’s ZIP code (perhaps to allow aggregation by zip code in a later stage)</em></p>
 
-<p>Joining side-information to a real-time feed is a classic use for stream processing. It&rsquo;s particularly common in advertising, relevance ranking, fraud detection and other domains. Activity events such as page views generally only include a small number of attributes, such as the ID of the viewer and the viewed items, but not detailed attributes of the viewer and the viewed items, such as the ZIP code of the user. If you want to aggregate the stream by attributes of the viewer or the viewed items, you need to join with the users table or the items table respectively.</p>
+<p>Joining side-information to a real-time feed is a classic use for stream processing. It’s particularly common in advertising, relevance ranking, fraud detection and other domains. Activity events such as page views generally only include a small number of attributes, such as the ID of the viewer and the viewed items, but not detailed attributes of the viewer and the viewed items, such as the ZIP code of the user. If you want to aggregate the stream by attributes of the viewer or the viewed items, you need to join with the users table or the items table respectively.</p>
 
 <p>In data warehouse terminology, you can think of the raw event stream as rows in the central fact table, which needs to be joined with dimension tables so that you can use attributes of the dimensions in your analysis.</p>
 
@@ -687,9 +701,9 @@
 
 <p><em>Example: Join a stream of ad clicks to a stream of ad impressions (to link the information on when the ad was shown to the information on when it was clicked)</em></p>
 
-<p>A stream join is useful for &ldquo;nearly aligned&rdquo; streams, where you expect to receive related events on several input streams, and you want to combine them into a single output event. You cannot rely on the events arriving at the stream processor at the same time, but you can set a maximum period of time over which you allow the events to be spread out.</p>
+<p>A stream join is useful for “nearly aligned” streams, where you expect to receive related events on several input streams, and you want to combine them into a single output event. You cannot rely on the events arriving at the stream processor at the same time, but you can set a maximum period of time over which you allow the events to be spread out.</p>
 
-<p>In order to perform a join between streams, your job needs to buffer events for the time window over which you want to join. For short time windows, you can do this in memory (at the risk of losing events if the machine fails). You can also use Samza&rsquo;s state store to buffer events, which supports buffering more messages than you can fit in memory.</p>
+<p>In order to perform a join between streams, your job needs to buffer events for the time window over which you want to join. For short time windows, you can do this in memory (at the risk of losing events if the machine fails). You can also use Samza’s state store to buffer events, which supports buffering more messages than you can fit in memory.</p>
 
 <h4 id="more">More</h4>
 
@@ -697,28 +711,28 @@
 
 <h3 id="approaches-to-managing-task-state">Approaches to managing task state</h3>
 
-<p>So how do systems support this kind of stateful processing? We&rsquo;ll lead in by describing what we have seen in other stream processing systems, and then describe what Samza does.</p>
+<p>So how do systems support this kind of stateful processing? We’ll lead in by describing what we have seen in other stream processing systems, and then describe what Samza does.</p>
 
 <h4 id="in-memory-state-with-checkpointing">In-memory state with checkpointing</h4>
 
-<p>A simple approach, common in academic stream processing systems, is to periodically save the task&rsquo;s entire in-memory data to durable storage. This approach works well if the in-memory state consists of only a few values. However, you have to store the complete task state on each checkpoint, which becomes increasingly expensive as task state grows. Unfortunately, many non-trivial use cases for joins and aggregation have large amounts of state &mdash; often many gigabytes. This makes full dumps of the state impractical.</p>
+<p>A simple approach, common in academic stream processing systems, is to periodically save the task’s entire in-memory data to durable storage. This approach works well if the in-memory state consists of only a few values. However, you have to store the complete task state on each checkpoint, which becomes increasingly expensive as task state grows. Unfortunately, many non-trivial use cases for joins and aggregation have large amounts of state — often many gigabytes. This makes full dumps of the state impractical.</p>
 
-<p>Some academic systems produce <em>diffs</em> in addition to full checkpoints, which are smaller if only some of the state has changed since the last checkpoint. <a href="../comparisons/storm.html">Storm&rsquo;s Trident abstraction</a> similarly keeps an in-memory cache of state, and periodically writes any changes to a remote store such as Cassandra. However, this optimization only helps if most of the state remains unchanged. In some use cases, such as stream joins, it is normal to have a lot of churn in the state, so this technique essentially degrades to making a remote database request for every message (see below).</p>
+<p>Some academic systems produce <em>diffs</em> in addition to full checkpoints, which are smaller if only some of the state has changed since the last checkpoint. <a href="../comparisons/storm.html">Storm’s Trident abstraction</a> similarly keeps an in-memory cache of state, and periodically writes any changes to a remote store such as Cassandra. However, this optimization only helps if most of the state remains unchanged. In some use cases, such as stream joins, it is normal to have a lot of churn in the state, so this technique essentially degrades to making a remote database request for every message (see below).</p>
 
 <h4 id="using-an-external-store">Using an external store</h4>
 
 <p>Another common pattern for stateful processing is to store the state in an external database or key-value store. Conventional database replication can be used to make that database fault-tolerant. The architecture looks something like this:</p>
 
-<p><img src="/img/latest/learn/documentation/container/stream_job_and_db.png" alt="state-kv-store"></p>
+<p><img src="/img/latest/learn/documentation/container/stream_job_and_db.png" alt="state-kv-store" /></p>
 
-<p>Samza allows this style of processing &mdash; there is nothing to stop you querying a remote database or service within your job. However, there are a few reasons why a remote database can be problematic for stateful stream processing:</p>
+<p>Samza allows this style of processing — there is nothing to stop you querying a remote database or service within your job. However, there are a few reasons why a remote database can be problematic for stateful stream processing:</p>
 
 <ol>
-<li><strong>Performance</strong>: Making database queries over a network is slow and expensive. A Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core to a stream processor, but if you need to make a remote request for every message you process, your throughput is likely to drop by 2-3 orders of magnitude. You can somewhat mitigate this with careful caching of reads and batching of writes, but then you&rsquo;re back to the problems of checkpointing, discussed above.</li>
-<li><strong>Isolation</strong>: If your database or service also serves requests to users, it can be dangerous to use the same database with a stream processor. A scalable stream processing system can run with very high throughput, and easily generates a huge amount of load (for example when catching up on a queue backlog). If you&rsquo;re not very careful, you may cause a denial-of-service attack on your own database, and cause problems for interactive requests from users.</li>
-<li><strong>Query Capabilities</strong>: Many scalable databases expose very limited query interfaces (e.g. only supporting simple key-value lookups), because the equivalent of a &ldquo;full table scan&rdquo; or rich traversal would be too expensive. Stream processes are often less latency-sensitive, so richer query capabilities would be more feasible.</li>
-<li><strong>Correctness</strong>: When a stream processor fails and needs to be restarted, how is the database state made consistent with the processing task? For this purpose, some frameworks such as <a href="../comparisons/storm.html">Storm</a> attach metadata to database entries, but it needs to be handled carefully, otherwise the stream process generates incorrect output.</li>
-<li><strong>Reprocessing</strong>: Sometimes it can be useful to re-run a stream process on a large amount of historical data, e.g. after updating your processing task&rsquo;s code. However, the issues above make this impractical for jobs that make external queries.</li>
+  <li><strong>Performance</strong>: Making database queries over a network is slow and expensive. A Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core to a stream processor, but if you need to make a remote request for every message you process, your throughput is likely to drop by 2-3 orders of magnitude. You can somewhat mitigate this with careful caching of reads and batching of writes, but then you’re back to the problems of checkpointing, discussed above.</li>
+  <li><strong>Isolation</strong>: If your database or service also serves requests to users, it can be dangerous to use the same database with a stream processor. A scalable stream processing system can run with very high throughput, and easily generates a huge amount of load (for example when catching up on a queue backlog). If you’re not very careful, you may cause a denial-of-service attack on your own database, and cause problems for interactive requests from users.</li>
+  <li><strong>Query Capabilities</strong>: Many scalable databases expose very limited query interfaces (e.g. only supporting simple key-value lookups), because the equivalent of a “full table scan” or rich traversal would be too expensive. Stream processes are often less latency-sensitive, so richer query capabilities would be more feasible.</li>
+  <li><strong>Correctness</strong>: When a stream processor fails and needs to be restarted, how is the database state made consistent with the processing task? For this purpose, some frameworks such as <a href="../comparisons/storm.html">Storm</a> attach metadata to database entries, but it needs to be handled carefully, otherwise the stream process generates incorrect output.</li>
+  <li><strong>Reprocessing</strong>: Sometimes it can be useful to re-run a stream process on a large amount of historical data, e.g. after updating your processing task’s code. However, the issues above make this impractical for jobs that make external queries.</li>
 </ol>
 
 <h3 id="local-state-in-samza">Local state in Samza</h3>
@@ -726,31 +740,31 @@
 <p>Samza allows tasks to maintain state in a way that is different from the approaches described above:</p>
 
 <ul>
-<li>The state is stored on disk, so the job can maintain more state than would fit in memory.</li>
-<li>It is stored on the same machine as the processing task, to avoid the performance problems of making database queries over the network.</li>
-<li>Each job has its own datastore, to avoid the isolation problems of a shared database (if you make an expensive query, it affects only the current task, nobody else).</li>
-<li>Different storage engines can be plugged in, enabling rich query capabilities.</li>
-<li>The state is continuously replicated, enabling fault tolerance without the problems of checkpointing large amounts of state.</li>
+  <li>The state is stored on disk, so the job can maintain more state than would fit in memory.</li>
+  <li>It is stored on the same machine as the processing task, to avoid the performance problems of making database queries over the network.</li>
+  <li>Each job has its own datastore, to avoid the isolation problems of a shared database (if you make an expensive query, it affects only the current task, nobody else).</li>
+  <li>Different storage engines can be plugged in, enabling rich query capabilities.</li>
+  <li>The state is continuously replicated, enabling fault tolerance without the problems of checkpointing large amounts of state.</li>
 </ul>
 
 <p>Imagine you take a remote database, partition it to match the number of tasks in the stream processing job, and co-locate each partition with its task. The result looks like this:</p>
 
-<p><img src="/img/latest/learn/documentation/container/stateful_job.png" alt="state-local"></p>
+<p><img src="/img/latest/learn/documentation/container/stateful_job.png" alt="state-local" /></p>
 
 <p>If a machine fails, all the tasks running on that machine and their database partitions are lost. In order to make them highly available, all writes to the database partition are replicated to a durable changelog (typically Kafka). Now, when a machine fails, we can restart the tasks on another machine, and consume this changelog in order to restore the contents of the database partition.</p>
 
-<p>Note that each task only has access to its own database partition, not to any other task&rsquo;s partition. This is important: when you scale out your job by giving it more computing resources, Samza needs to move tasks from one machine to another. By giving each task its own state, tasks can be relocated without affecting the job&rsquo;s operation. If necessary, you can repartition your streams so that all messages for a particular database partition are routed to the same task instance.</p>
+<p>Note that each task only has access to its own database partition, not to any other task’s partition. This is important: when you scale out your job by giving it more computing resources, Samza needs to move tasks from one machine to another. By giving each task its own state, tasks can be relocated without affecting the job’s operation. If necessary, you can repartition your streams so that all messages for a particular database partition are routed to the same task instance.</p>
 
 <p><a href="http://kafka.apache.org/documentation.html#compaction">Log compaction</a> runs in the background on the changelog topic, and ensures that the changelog does not grow indefinitely. If you overwrite the same value in the store many times, log compaction keeps only the most recent value, and throws away any old values in the log. If you delete an item from the store, log compaction also removes it from the log. With the right tuning, the changelog is not much bigger than the database itself.</p>
 
 <p>With this architecture, Samza allows tasks to maintain large amounts of fault-tolerant state, at a performance that is almost as good as a pure in-memory implementation. There are just a few limitations:</p>
 
 <ul>
-<li>If you have some data that you want to share between tasks (across partition boundaries), you need to go to some additional effort to repartition and distribute the data. Each task will need its own copy of the data, so this may use more space overall.</li>
-<li>When a container is restarted, it can take some time to restore the data in all of its state partitions. The time depends on the amount of data, the storage engine, your access patterns, and other factors. As a rule of thumb, 50&nbsp;MB/sec is a reasonable restore time to expect.</li>
+  <li>If you have some data that you want to share between tasks (across partition boundaries), you need to go to some additional effort to repartition and distribute the data. Each task will need its own copy of the data, so this may use more space overall.</li>
+  <li>When a container is restarted, it can take some time to restore the data in all of its state partitions. The time depends on the amount of data, the storage engine, your access patterns, and other factors. As a rule of thumb, 50 MB/sec is a reasonable restore time to expect.</li>
 </ul>
 
-<p>Nothing prevents you from using an external database if you want to, but for many use cases, Samza&rsquo;s local state is a powerful tool for enabling stateful stream processing.</p>
+<p>Nothing prevents you from using an external database if you want to, but for many use cases, Samza’s local state is a powerful tool for enabling stateful stream processing.</p>
 
 <h3 id="key-value-storage">Key-value storage</h3>
 
@@ -762,46 +776,46 @@
 
 <p>To use a key-value store in your job, add the following to your job config:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># Use the key-value store implementation for a store called &quot;my-store&quot;</span>
-<span class="na">stores.my-store.factory</span><span class="o">=</span><span class="s">org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</span>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># Use the key-value store implementation for a store called "my-store"
+stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 
-<span class="c"># Use the Kafka topic &quot;my-store-changelog&quot; as the changelog stream for this store.</span>
-<span class="c"># This enables automatic recovery of the store after a failure. If you don&#39;t</span>
-<span class="c"># configure this, no changelog stream will be generated.</span>
-<span class="na">stores.my-store.changelog</span><span class="o">=</span><span class="s">kafka.my-store-changelog</span>
-
-<span class="c"># Encode keys and values in the store as UTF-8 strings.</span>
-<span class="na">serializers.registry.string.class</span><span class="o">=</span><span class="s">org.apache.samza.serializers.StringSerdeFactory</span>
-<span class="na">stores.my-store.key.serde</span><span class="o">=</span><span class="s">string</span>
-<span class="na">stores.my-store.msg.serde</span><span class="o">=</span><span class="s">string</span></code></pre></figure>
+# Use the Kafka topic "my-store-changelog" as the changelog stream for this store.
+# This enables automatic recovery of the store after a failure. If you don't
+# configure this, no changelog stream will be generated.
+stores.my-store.changelog=kafka.my-store-changelog
+
+# Encode keys and values in the store as UTF-8 strings.
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+stores.my-store.key.serde=string
+stores.my-store.msg.serde=string</code></pre></figure>
 
 <p>See the <a href="serialization.html">serialization section</a> for more information on the <em>serde</em> options.</p>
 
 <p>Here is a simple example that writes every incoming message to the store:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyStatefulTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span><span class="o">,</span> <span class="n">InitableTask</span> <span class="o">{</span>
-  <span class="kd">private</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">store</span><span class="o">;</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyStatefulTask</span> <span class="kd">implements</span> <span class="nc">StreamTask</span><span class="o">,</span> <span class="nc">InitableTask</span> <span class="o">{</span>
+  <span class="kd">private</span> <span class="nc">KeyValueStore</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">store</span><span class="o">;</span>
 
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Config</span> <span class="n">config</span><span class="o">,</span> <span class="n">TaskContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
-    <span class="k">this</span><span class="o">.</span><span class="na">store</span> <span class="o">=</span> <span class="o">(</span><span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStore</span><span class="o">(</span><span class="s">&quot;my-store&quot;</span><span class="o">);</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">Config</span> <span class="n">config</span><span class="o">,</span> <span class="nc">TaskContext</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">this</span><span class="o">.</span><span class="na">store</span> <span class="o">=</span> <span class="o">(</span><span class="nc">KeyValueStore</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;)</span> <span class="n">context</span><span class="o">.</span><span class="na">getStore</span><span class="o">(</span><span class="s">"my-store"</span><span class="o">);</span>
   <span class="o">}</span>
 
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span>
-                      <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
-                      <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">store</span><span class="o">.</span><span class="na">put</span><span class="o">((</span><span class="n">String</span><span class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="o">(</span><span class="n">String</span><span class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getMessage</span><span class="o">());</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span>
+                      <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
+                      <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">store</span><span class="o">.</span><span class="na">put</span><span class="o">((</span><span class="nc">String</span><span class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="o">(</span><span class="nc">String</span><span class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getMessage</span><span class="o">());</span>
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
 <p>Here is the complete key-value store API:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">KeyValueStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
-  <span class="n">V</span> <span class="nf">get</span><span class="o">(</span><span class="n">K</span> <span class="n">key</span><span class="o">);</span>
-  <span class="kt">void</span> <span class="nf">put</span><span class="o">(</span><span class="n">K</span> <span class="n">key</span><span class="o">,</span> <span class="n">V</span> <span class="n">value</span><span class="o">);</span>
-  <span class="kt">void</span> <span class="nf">putAll</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Entry</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;&gt;</span> <span class="n">entries</span><span class="o">);</span>
-  <span class="kt">void</span> <span class="nf">delete</span><span class="o">(</span><span class="n">K</span> <span class="n">key</span><span class="o">);</span>
-  <span class="n">KeyValueIterator</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="nf">range</span><span class="o">(</span><span class="n">K</span> <span class="n">from</span><span class="o">,</span> <span class="n">K</span> <span class="n">to</span><span class="o">);</span>
-  <span class="n">KeyValueIterator</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="nf">all</span><span class="o">();</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">KeyValueStore</span><span class="o">&lt;</span><span class="no">K</span><span class="o">,</span> <span class="no">V</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="no">V</span> <span class="nf">get</span><span class="o">(</span><span class="no">K</span> <span class="n">key</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">put</span><span class="o">(</span><span class="no">K</span> <span class="n">key</span><span class="o">,</span> <span class="no">V</span> <span class="n">value</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">putAll</span><span class="o">(</span><span class="nc">List</span><span class="o">&lt;</span><span class="nc">Entry</span><span class="o">&lt;</span><span class="no">K</span><span class="o">,</span><span class="no">V</span><span class="o">&gt;&gt;</span> <span class="n">entries</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">delete</span><span class="o">(</span><span class="no">K</span> <span class="n">key</span><span class="o">);</span>
+  <span class="nc">KeyValueIterator</span><span class="o">&lt;</span><span class="no">K</span><span class="o">,</span><span class="no">V</span><span class="o">&gt;</span> <span class="nf">range</span><span class="o">(</span><span class="no">K</span> <span class="n">from</span><span class="o">,</span> <span class="no">K</span> <span class="n">to</span><span class="o">);</span>
+  <span class="nc">KeyValueIterator</span><span class="o">&lt;</span><span class="no">K</span><span class="o">,</span><span class="no">V</span><span class="o">&gt;</span> <span class="nf">all</span><span class="o">();</span>
 <span class="o">}</span></code></pre></figure>
 
 <p>Additional configuration properties for the key-value store are documented in the <a href="../jobs/configuration-table.html#keyvalue-rocksdb">configuration reference</a>.</p>
@@ -812,70 +826,70 @@
 
 <p>Currently Samza provides a state storage tool which can recover the state store from the changelog stream to user-specified directory for reusing and debugging.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>samza-example/target/bin/state-storage-tool.sh <span class="se">\</span>
-  --config-path<span class="o">=</span>/path/to/job/config.properties <span class="se">\</span>
-  --path<span class="o">=</span>directory/to/put/state/stores</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">samza-example/target/bin/state-storage-tool.sh <span class="se">\</span>
+  <span class="nt">--config-path</span><span class="o">=</span>/path/to/job/config.properties <span class="se">\</span>
+  <span class="nt">--path</span><span class="o">=</span>directory/to/put/state/stores</code></pre></figure>
 
 <h4 id="read-the-value-from-a-running-rocksdb">Read the value from a running RocksDB</h4>
 
-<p>Samza also provides a tool to read the value from a running job&rsquo;s RocksDB.</p>
+<p>Samza also provides a tool to read the value from a running job’s RocksDB.</p>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>samza-example/target/bin/read-rocksdb-tool.sh <span class="se">\</span>
-  --config-path<span class="o">=</span>/path/to/job/config.properties <span class="se">\</span>
-  --db-path<span class="o">=</span>/tmp/nm-local-dir/state/test-state/Partition_0 <span class="se">\</span>
-  --db-name<span class="o">=</span>test-state <span class="se">\</span>
-  --string-key<span class="o">=</span>a,b,c</code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">samza-example/target/bin/read-rocksdb-tool.sh <span class="se">\</span>
+  <span class="nt">--config-path</span><span class="o">=</span>/path/to/job/config.properties <span class="se">\</span>
+  <span class="nt">--db-path</span><span class="o">=</span>/tmp/nm-local-dir/state/test-state/Partition_0 <span class="se">\</span>
+  <span class="nt">--db-name</span><span class="o">=</span>test-state <span class="se">\</span>
+  <span class="nt">--string-key</span><span class="o">=</span>a,b,c</code></pre></figure>
 
 <ul>
-<li><code>--config-path</code>(required): your job&rsquo;s configuration file</li>
-<li><code>--db-path</code>(required): the location of your RocksDB. This is convenient if the RocksDB is in the same machine as the tool. E.g. if you are running hello-samza in your local machine, the location maybe in
+  <li><code class="language-plaintext highlighter-rouge">--config-path</code>(required): your job’s configuration file</li>
+  <li><code class="language-plaintext highlighter-rouge">--db-path</code>(required): the location of your RocksDB. This is convenient if the RocksDB is in the same machine as the tool. E.g. if you are running hello-samza in your local machine, the location maybe in
 <em>/tmp/hadoop/nm-local-dir/usercache/username/appcache/applicationId/containerId/state/storeName/PartitionNumber</em></li>
-<li><code>--db-name</code>(required): if you only have one state store specified in the config file, you can ignore this one. Otherwise, you need to provide the state store name here.</li>
-<li><code>--string-key</code>: the key list. This one only works if your keys are string. There are also another two options: <code>--integer-key</code>, <code>--long-key</code>. They work for integer keys and long keys respectively.</li>
+  <li><code class="language-plaintext highlighter-rouge">--db-name</code>(required): if you only have one state store specified in the config file, you can ignore this one. Otherwise, you need to provide the state store name here.</li>
+  <li><code class="language-plaintext highlighter-rouge">--string-key</code>: the key list. This one only works if your keys are string. There are also another two options: <code class="language-plaintext highlighter-rouge">--integer-key</code>, <code class="language-plaintext highlighter-rouge">--long-key</code>. They work for integer keys and long keys respectively.</li>
 </ul>
 
 <p><strong>Limitations</strong>:</p>
 
 <ul>
-<li>This only works with three kinds of keys: string, integer and long. This is because we can only accept those kinds of keys from the command line (it is really tricky to accept bytes, avro, json, etc from the command line). But it is also easy to use this tool programmatically (The key and value both are deserialized.)</li>
+  <li>This only works with three kinds of keys: string, integer and long. This is because we can only accept those kinds of keys from the command line (it is really tricky to accept bytes, avro, json, etc from the command line). But it is also easy to use this tool programmatically (The key and value both are deserialized.)</li>
 </ul>
 
-<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>RocksDbKeyValueReader <span class="nv">kvReader</span> <span class="o">=</span> new RocksDbKeyValueReader<span class="o">(</span>dbName, pathOfdb, config<span class="o">)</span>
-Object <span class="nv">value</span> <span class="o">=</span> kvReader.get<span class="o">(</span>key<span class="o">)</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-bash" data-lang="bash">RocksDbKeyValueReader kvReader <span class="o">=</span> new RocksDbKeyValueReader<span class="o">(</span>dbName, pathOfdb, config<span class="o">)</span>
+Object value <span class="o">=</span> kvReader.get<span class="o">(</span>key<span class="o">)</span></code></pre></figure>
 
 <ul>
-<li>Because Samza job has some caches and buffers, you may not be able to see expected values (or even not be able to see any value, if all the data is buffered). Some of the related configuration are <code>stores.store-name.container.write.buffer.size.bytes</code>, <code>stores.store-name.write.batch.size</code>, <code>stores.store-name.object.cache.size</code>. You may want to set them to very small for testing.</li>
-<li>Since RocksDB memtable is not flushed to disk immediately on every write, you may not be able to see the expected values until it is written to the SST file on disk. For more details on RocksDb, you can refer the docs <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Basics">here</a>.</li>
+  <li>Because Samza job has some caches and buffers, you may not be able to see expected values (or even not be able to see any value, if all the data is buffered). Some of the related configuration are <code class="language-plaintext highlighter-rouge">stores.store-name.container.write.buffer.size.bytes</code>, <code class="language-plaintext highlighter-rouge">stores.store-name.write.batch.size</code>, <code class="language-plaintext highlighter-rouge">stores.store-name.object.cache.size</code>. You may want to set them to very small for testing.</li>
+  <li>Since RocksDB memtable is not flushed to disk immediately on every write, you may not be able to see the expected values until it is written to the SST file on disk. For more details on RocksDb, you can refer the docs <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Basics">here</a>.</li>
 </ul>
 
 <h4 id="known-issues">Known Issues</h4>
 
-<p>RocksDB has several rough edges. It&rsquo;s recommended that you read the RocksDB <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">tuning guide</a>. Some other notes to be aware of are:</p>
+<p>RocksDB has several rough edges. It’s recommended that you read the RocksDB <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">tuning guide</a>. Some other notes to be aware of are:</p>
 
 <ol>
-<li>RocksDB is heavily optimized to run with SSD hard disks. Performance on non-SSDs degrades significantly.</li>
-<li>Samza&rsquo;s KeyValueStorageEngine.putAll() method does not currently use RocksDB&rsquo;s batching-put API because it&rsquo;s <a href="https://github.com/facebook/rocksdb/issues/262">non-functional in Java</a>.</li>
-<li>Calling iterator.seekToFirst() is very slow <a href="https://github.com/facebook/rocksdb/issues/261">if there are a lot of deletes in the store</a>.</li>
+  <li>RocksDB is heavily optimized to run with SSD hard disks. Performance on non-SSDs degrades significantly.</li>
+  <li>Samza’s KeyValueStorageEngine.putAll() method does not currently use RocksDB’s batching-put API because it’s <a href="https://github.com/facebook/rocksdb/issues/262">non-functional in Java</a>.</li>
+  <li>Calling iterator.seekToFirst() is very slow <a href="https://github.com/facebook/rocksdb/issues/261">if there are a lot of deletes in the store</a>.</li>
 </ol>
 
 <h3 id="implementing-common-use-cases-with-the-key-value-store">Implementing common use cases with the key-value store</h3>
 
-<p>Earlier in this section we discussed some example use cases for stateful stream processing. Let&rsquo;s look at how each of these could be implemented using a key-value storage engine such as Samza&rsquo;s RocksDB store.</p>
+<p>Earlier in this section we discussed some example use cases for stateful stream processing. Let’s look at how each of these could be implemented using a key-value storage engine such as Samza’s RocksDB store.</p>
 
-<h4 id="windowed-aggregation">Windowed aggregation</h4>
+<h4 id="windowed-aggregation-1">Windowed aggregation</h4>
 
 <p><em>Example: Counting the number of page views for each user per hour</em></p>
 
 <p>Implementation: You need two processing stages.</p>
 
 <ol>
-<li>The first one re-partitions the input data by user ID, so that all the events for a particular user are routed to the same stream task. If the input stream is already partitioned by user ID, you can skip this.</li>
-<li>The second stage does the counting, using a key-value store that maps a user ID to the running count. For each new event, the job reads the current count for the appropriate user from the store, increments it, and writes it back. When the window is complete (e.g. at the end of an hour), the job iterates over the contents of the store and emits the aggregates to an output stream.</li>
+  <li>The first one re-partitions the input data by user ID, so that all the events for a particular user are routed to the same stream task. If the input stream is already partitioned by user ID, you can skip this.</li>
+  <li>The second stage does the counting, using a key-value store that maps a user ID to the running count. For each new event, the job reads the current count for the appropriate user from the store, increments it, and writes it back. When the window is complete (e.g. at the end of an hour), the job iterates over the contents of the store and emits the aggregates to an output stream.</li>
 </ol>
 
 <p>Note that this job effectively pauses at the hour mark to output its results. This is totally fine for Samza, as scanning over the contents of the key-value store is quite fast. The input stream is buffered while the job is doing this hourly work.</p>
 
-<h4 id="table-table-join">Table-table join</h4>
+<h4 id="table-table-join-1">Table-table join</h4>
 
 <p><em>Example: Join a table of user profiles to a table of user settings by user_id and emit the joined stream</em></p>
 
@@ -883,13 +897,13 @@ Object <span class="nv">value</span> <sp
 
 <h4 id="table-stream-join">Table-stream join</h4>
 
-<p><em>Example: Augment a stream of page view events with the user&rsquo;s ZIP code (perhaps to allow aggregation by zip code in a later stage)</em></p>
+<p><em>Example: Augment a stream of page view events with the user’s ZIP code (perhaps to allow aggregation by zip code in a later stage)</em></p>
 
-<p>Implementation: The job subscribes to the stream of user profile updates and the stream of page view events. Both streams must be partitioned by user_id. The job maintains a key-value store where the key is the user_id and the value is the user&rsquo;s ZIP code. Every time the job receives a profile update, it extracts the user&rsquo;s new ZIP code from the profile update and writes it to the store. Every time it receives a page view event, it reads the zip code for that user from the store, and emits the page view event with an added ZIP code field.</p>
+<p>Implementation: The job subscribes to the stream of user profile updates and the stream of page view events. Both streams must be partitioned by user_id. The job maintains a key-value store where the key is the user_id and the value is the user’s ZIP code. Every time the job receives a profile update, it extracts the user’s new ZIP code from the profile update and writes it to the store. Every time it receives a page view event, it reads the zip code for that user from the store, and emits the page view event with an added ZIP code field.</p>
 
-<p>If the next stage needs to aggregate by ZIP code, the ZIP code can be used as the partitioning key of the job&rsquo;s output stream. That ensures that all the events for the same ZIP code are sent to the same stream partition.</p>
+<p>If the next stage needs to aggregate by ZIP code, the ZIP code can be used as the partitioning key of the job’s output stream. That ensures that all the events for the same ZIP code are sent to the same stream partition.</p>
 
-<h4 id="stream-stream-join">Stream-stream join</h4>
+<h4 id="stream-stream-join-1">Stream-stream join</h4>
 
 <p><em>Example: Join a stream of ad clicks to a stream of ad impressions (to link the information on when the ad was shown to the information on when it was clicked)</em></p>
 
@@ -899,19 +913,19 @@ Object <span class="nv">value</span> <sp
 
 <h3 id="other-storage-engines">Other storage engines</h3>
 
-<p>Samza&rsquo;s fault-tolerance mechanism (sending a local store&rsquo;s writes to a replicated changelog) is completely decoupled from the storage engine&rsquo;s data structures and query APIs. While a key-value storage engine is good for general-purpose processing, you can easily add your own storage engines for other types of queries by implementing the <a href="../api/javadocs/org/apache/samza/storage/StorageEngine.html">StorageEngine</a> interface. Samza&rsquo;s model is especially amenable to embedded storage engines, which run as a library in the same process as the stream task.</p>
+<p>Samza’s fault-tolerance mechanism (sending a local store’s writes to a replicated changelog) is completely decoupled from the storage engine’s data structures and query APIs. While a key-value storage engine is good for general-purpose processing, you can easily add your own storage engines for other types of queries by implementing the <a href="../api/javadocs/org/apache/samza/storage/StorageEngine.html">StorageEngine</a> interface. Samza’s model is especially amenable to embedded storage engines, which run as a library in the same process as the stream task.</p>
 
-<p>Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), <a href="http://infolab.stanford.edu/%7Eullman/mmds/ch4.pdf">approximate algorithms</a> such as <a href="http://en.wikipedia.org/wiki/Bloom_filter">bloom filters</a> and <a href="http://research.google.com/pubs/pub40671.html">hyperloglog</a>, or full-text indexes such as <a href="http://lucene.apache.org">Lucene</a>. (Patches welcome!)</p>
+<p>Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), <a href="http://infolab.stanford.edu/~ullman/mmds/ch4.pdf">approximate algorithms</a> such as <a href="http://en.wikipedia.org/wiki/Bloom_filter">bloom filters</a> and <a href="http://research.google.com/pubs/pub40671.html">hyperloglog</a>, or full-text indexes such as <a href="http://lucene.apache.org">Lucene</a>. (Patches welcome!)</p>
 
 <h3 id="fault-tolerance-semantics-with-state">Fault tolerance semantics with state</h3>
 
-<p>As discussed in the section on <a href="checkpointing.html">checkpointing</a>, Samza currently only supports at-least-once delivery guarantees in the presence of failure (this is sometimes referred to as &ldquo;guaranteed delivery&rdquo;). This means that if a task fails, no messages are lost, but some messages may be redelivered.</p>
+<p>As discussed in the section on <a href="checkpointing.html">checkpointing</a>, Samza currently only supports at-least-once delivery guarantees in the presence of failure (this is sometimes referred to as “guaranteed delivery”). This means that if a task fails, no messages are lost, but some messages may be redelivered.</p>
 
 <p>For many of the stateful processing use cases discussed above, this is not a problem: if the effect of a message on state is idempotent, it is safe for the same message to be processed more than once. For example, if the store contains the ZIP code for each user, then processing the same profile update twice has no effect, because the duplicate update does not change the ZIP code.</p>
 
 <p>However, for non-idempotent operations such as counting, at-least-once delivery guarantees can give incorrect results. If a Samza task fails and is restarted, it may double-count some messages that were processed shortly before the failure. We are planning to address this limitation in a future release of Samza.</p>
 
-<h2 id="windowing"><a href="windowing.html">Windowing &raquo;</a></h2>
+<h2 id="windowing-"><a href="windowing.html">Windowing »</a></h2>
 
            
         </div>

Modified: samza/site/learn/documentation/latest/container/streams.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/container/streams.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/container/streams.html (original)
+++ samza/site/learn/documentation/latest/container/streams.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/container/streams">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/container/streams">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/container/streams">1.6.0</a></li>
 
               
@@ -641,27 +655,27 @@
 
 <p>The <a href="samza-container.html">samza container</a> reads and writes messages using the <a href="../api/javadocs/org/apache/samza/system/SystemConsumer.html">SystemConsumer</a> and <a href="../api/javadocs/org/apache/samza/system/SystemProducer.html">SystemProducer</a> interfaces. You can integrate any message broker with Samza by implementing these two interfaces.</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">SystemConsumer</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">SystemConsumer</span> <span class="o">{</span>
   <span class="kt">void</span> <span class="nf">start</span><span class="o">();</span>
 
   <span class="kt">void</span> <span class="nf">stop</span><span class="o">();</span>
 
   <span class="kt">void</span> <span class="nf">register</span><span class="o">(</span>
-      <span class="n">SystemStreamPartition</span> <span class="n">systemStreamPartition</span><span class="o">,</span>
-      <span class="n">String</span> <span class="n">lastReadOffset</span><span class="o">);</span>
+      <span class="nc">SystemStreamPartition</span> <span class="n">systemStreamPartition</span><span class="o">,</span>
+      <span class="nc">String</span> <span class="n">lastReadOffset</span><span class="o">);</span>
 
-  <span class="n">List</span><span class="o">&lt;</span><span class="n">IncomingMessageEnvelope</span><span class="o">&gt;</span> <span class="nf">poll</span><span class="o">(</span>
-      <span class="n">Map</span><span class="o">&lt;</span><span class="n">SystemStreamPartition</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">systemStreamPartitions</span><span class="o">,</span>
+  <span class="nc">List</span><span class="o">&lt;</span><span class="nc">IncomingMessageEnvelope</span><span class="o">&gt;</span> <span class="nf">poll</span><span class="o">(</span>
+      <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">SystemStreamPartition</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">systemStreamPartitions</span><span class="o">,</span>
       <span class="kt">long</span> <span class="n">timeout</span><span class="o">)</span>
-    <span class="kd">throws</span> <span class="n">InterruptedException</span><span class="o">;</span>
+    <span class="kd">throws</span> <span class="nc">InterruptedException</span><span class="o">;</span>
 <span class="o">}</span>
 
 <span class="kd">public</span> <span class="kd">class</span> <span class="nc">IncomingMessageEnvelope</span> <span class="o">{</span>
-  <span class="kd">public</span> <span class="n">Object</span> <span class="nf">getMessage</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+  <span class="kd">public</span> <span class="nc">Object</span> <span class="nf">getMessage</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
 
-  <span class="kd">public</span> <span class="n">Object</span> <span class="nf">getKey</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+  <span class="kd">public</span> <span class="nc">Object</span> <span class="nf">getKey</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
 
-  <span class="kd">public</span> <span class="n">SystemStreamPartition</span> <span class="nf">getSystemStreamPartition</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+  <span class="kd">public</span> <span class="nc">SystemStreamPartition</span> <span class="nf">getSystemStreamPartition</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
 <span class="o">}</span>
 
 <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">SystemProducer</span> <span class="o">{</span>
@@ -669,29 +683,29 @@
 
   <span class="kt">void</span> <span class="nf">stop</span><span class="o">();</span>
 
-  <span class="kt">void</span> <span class="nf">register</span><span class="o">(</span><span class="n">String</span> <span class="n">source</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">register</span><span class="o">(</span><span class="nc">String</span> <span class="n">source</span><span class="o">);</span>
 
-  <span class="kt">void</span> <span class="nf">send</span><span class="o">(</span><span class="n">String</span> <span class="n">source</span><span class="o">,</span> <span class="n">OutgoingMessageEnvelope</span> <span class="n">envelope</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">send</span><span class="o">(</span><span class="nc">String</span> <span class="n">source</span><span class="o">,</span> <span class="nc">OutgoingMessageEnvelope</span> <span class="n">envelope</span><span class="o">);</span>
 
-  <span class="kt">void</span> <span class="nf">flush</span><span class="o">(</span><span class="n">String</span> <span class="n">source</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">flush</span><span class="o">(</span><span class="nc">String</span> <span class="n">source</span><span class="o">);</span>
 <span class="o">}</span>
 
 <span class="kd">public</span> <span class="kd">class</span> <span class="nc">OutgoingMessageEnvelope</span> <span class="o">{</span>
   <span class="o">...</span>
-  <span class="kd">public</span> <span class="n">Object</span> <span class="nf">getKey</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+  <span class="kd">public</span> <span class="nc">Object</span> <span class="nf">getKey</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
 
-  <span class="kd">public</span> <span class="n">Object</span> <span class="nf">getMessage</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+  <span class="kd">public</span> <span class="nc">Object</span> <span class="nf">getMessage</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
 <p>Out of the box, Samza supports Kafka (KafkaSystemConsumer and KafkaSystemProducer). However, any message bus system can be plugged in, as long as it can provide the semantics required by Samza, as described in the <a href="../api/javadocs/org/apache/samza/system/SystemConsumer.html">javadoc</a>.</p>
 
-<p>SystemConsumers and SystemProducers may read and write messages of any data type. It&rsquo;s ok if they only support byte arrays &mdash; Samza has a separate <a href="serialization.html">serialization layer</a> which converts to and from objects that application code can use. Samza does not prescribe any particular data model or serialization format.</p>
+<p>SystemConsumers and SystemProducers may read and write messages of any data type. It’s ok if they only support byte arrays — Samza has a separate <a href="serialization.html">serialization layer</a> which converts to and from objects that application code can use. Samza does not prescribe any particular data model or serialization format.</p>
 
 <p>The job configuration file can include properties that are specific to a particular consumer and producer implementation. For example, the configuration would typically indicate the hostname and port of the message broker to use, and perhaps connection options.</p>
 
 <h3 id="how-streams-are-processed">How streams are processed</h3>
 
-<p>If a job is consuming messages from more than one input stream, and all input streams have messages available, messages are processed in a round robin fashion by default. For example, if a job is consuming AdImpressionEvent and AdClickEvent, the task instance&rsquo;s process() method is called with a message from AdImpressionEvent, then a message from AdClickEvent, then another message from AdImpressionEvent, &hellip; and continues to alternate between the two.</p>
+<p>If a job is consuming messages from more than one input stream, and all input streams have messages available, messages are processed in a round robin fashion by default. For example, if a job is consuming AdImpressionEvent and AdClickEvent, the task instance’s process() method is called with a message from AdImpressionEvent, then a message from AdClickEvent, then another message from AdImpressionEvent, … and continues to alternate between the two.</p>
 
 <p>If one of the input streams has no new messages available (the most recent message has already been consumed), that stream is skipped, and the job continues to consume from the other inputs. It continues to check for new messages becoming available.</p>
 
@@ -699,38 +713,38 @@
 
 <p>When a Samza container has several incoming messages on different stream partitions, how does it decide which to process first? The behavior is determined by a <a href="../api/javadocs/org/apache/samza/system/chooser/MessageChooser.html">MessageChooser</a>. The default chooser is RoundRobinChooser, but you can override it by implementing a custom chooser.</p>
 
-<p>To plug in your own message chooser, you need to implement the <a href="../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html">MessageChooserFactory</a> interface, and set the &ldquo;task.chooser.class&rdquo; configuration to the fully-qualified class name of your implementation:</p>
+<p>To plug in your own message chooser, you need to implement the <a href="../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html">MessageChooserFactory</a> interface, and set the “task.chooser.class” configuration to the fully-qualified class name of your implementation:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">task.chooser.class</span><span class="o">=</span><span class="s">com.example.samza.YourMessageChooserFactory</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">task.chooser.class=com.example.samza.YourMessageChooserFactory</code></pre></figure>
 
 <h4 id="prioritizing-input-streams">Prioritizing input streams</h4>
 
-<p>There are certain times when messages from one stream should be processed with higher priority than messages from another stream. For example, some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. In this case, it&rsquo;s useful to prioritize the real-time stream over the batch stream, so that the real-time processing doesn&rsquo;t slow down if there is a sudden burst of data on the batch stream.</p>
+<p>There are certain times when messages from one stream should be processed with higher priority than messages from another stream. For example, some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. In this case, it’s useful to prioritize the real-time stream over the batch stream, so that the real-time processing doesn’t slow down if there is a sudden burst of data on the batch stream.</p>
 
 <p>Samza provides a mechanism to prioritize one stream over another by setting this configuration parameter: systems.&lt;system&gt;.streams.&lt;stream&gt;.samza.priority=&lt;number&gt;. For example:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.kafka.streams.my-real-time-stream.samza.priority</span><span class="o">=</span><span class="s">2</span>
-<span class="na">systems.kafka.streams.my-batch-stream.samza.priority</span><span class="o">=</span><span class="s">1</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">systems.kafka.streams.my-real-time-stream.samza.priority=2
+systems.kafka.streams.my-batch-stream.samza.priority=1</code></pre></figure>
 
-<p>This declares that my-real-time-stream&rsquo;s messages should be processed with higher priority than my-batch-stream&rsquo;s messages. If my-real-time-stream has any messages available, they are processed first. Only if there are no messages currently waiting on my-real-time-stream, the Samza job continues processing my-batch-stream.</p>
+<p>This declares that my-real-time-stream’s messages should be processed with higher priority than my-batch-stream’s messages. If my-real-time-stream has any messages available, they are processed first. Only if there are no messages currently waiting on my-real-time-stream, the Samza job continues processing my-batch-stream.</p>
 
-<p>Each priority level gets its own MessageChooser. It is valid to define two streams with the same priority. If messages are available from two streams at the same priority level, it&rsquo;s up to the MessageChooser for that priority level to decide which message should be processed first.</p>
+<p>Each priority level gets its own MessageChooser. It is valid to define two streams with the same priority. If messages are available from two streams at the same priority level, it’s up to the MessageChooser for that priority level to decide which message should be processed first.</p>
 
-<p>It&rsquo;s also valid to only define priorities for some streams. All non-prioritized streams are treated as the lowest priority, and share a MessageChooser.</p>
+<p>It’s also valid to only define priorities for some streams. All non-prioritized streams are treated as the lowest priority, and share a MessageChooser.</p>
 
 <h4 id="bootstrapping">Bootstrapping</h4>
 
-<p>Sometimes, a Samza job needs to fully consume a stream (from offset 0 up to the most recent message) before it processes messages from any other stream. This is useful in situations where the stream contains some prerequisite data that the job needs, and it doesn&rsquo;t make sense to process messages from other streams until the job has loaded that prerequisite data. Samza supports this use case with <em>bootstrap streams</em>.</p>
+<p>Sometimes, a Samza job needs to fully consume a stream (from offset 0 up to the most recent message) before it processes messages from any other stream. This is useful in situations where the stream contains some prerequisite data that the job needs, and it doesn’t make sense to process messages from other streams until the job has loaded that prerequisite data. Samza supports this use case with <em>bootstrap streams</em>.</p>
 
-<p>A bootstrap stream seems similar to a stream with a high priority, but is subtly different. Before allowing any other stream to be processed, a bootstrap stream waits for the consumer to explicitly confirm that the stream has been fully consumed. Until then, the bootstrap stream is the exclusive input to the job: even if a network issue or some other factor causes the bootstrap stream consumer to slow down, other inputs can&rsquo;t sneak their messages in.</p>
+<p>A bootstrap stream seems similar to a stream with a high priority, but is subtly different. Before allowing any other stream to be processed, a bootstrap stream waits for the consumer to explicitly confirm that the stream has been fully consumed. Until then, the bootstrap stream is the exclusive input to the job: even if a network issue or some other factor causes the bootstrap stream consumer to slow down, other inputs can’t sneak their messages in.</p>
 
-<p>Another difference between a bootstrap stream and a high-priority stream is that the bootstrap stream&rsquo;s special treatment is temporary: when it has been fully consumed (we say it has &ldquo;caught up&rdquo;), its priority drops to be the same as all the other input streams.</p>
+<p>Another difference between a bootstrap stream and a high-priority stream is that the bootstrap stream’s special treatment is temporary: when it has been fully consumed (we say it has “caught up”), its priority drops to be the same as all the other input streams.</p>
 
-<p>To configure a stream called &ldquo;my-bootstrap-stream&rdquo; to be a fully-consumed bootstrap stream, use the following settings:</p>
+<p>To configure a stream called “my-bootstrap-stream” to be a fully-consumed bootstrap stream, use the following settings:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">systems.kafka.streams.my-bootstrap-stream.samza.bootstrap</span><span class="o">=</span><span class="s">true</span>
-<span class="na">systems.kafka.streams.my-bootstrap-stream.samza.reset.offset</span><span class="o">=</span><span class="s">true</span>
-<span class="na">systems.kafka.streams.my-bootstrap-stream.samza.offset.default</span><span class="o">=</span><span class="s">oldest</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">systems.kafka.streams.my-bootstrap-stream.samza.bootstrap=true
+systems.kafka.streams.my-bootstrap-stream.samza.reset.offset=true
+systems.kafka.streams.my-bootstrap-stream.samza.offset.default=oldest</code></pre></figure>
 
 <p>The bootstrap=true parameter enables the bootstrap behavior (prioritization over other streams). The combination of reset.offset=true and offset.default=oldest tells Samza to always start reading the stream from the oldest offset, every time a container starts up (rather than starting to read from the most recent checkpoint).</p>
 
@@ -742,11 +756,11 @@
 
 <p>For example, if you want to read 100 messages in a row from each stream partition (regardless of the MessageChooser), you can use this configuration parameter:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">task.consumer.batch.size</span><span class="o">=</span><span class="s">100</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">task.consumer.batch.size=100</code></pre></figure>
 
-<p>With this setting, Samza tries to read a message from the most recently used <a href="../api/javadocs/org/apache/samza/system/SystemStreamPartition.html">SystemStreamPartition</a>. This behavior continues either until no more messages are available for that SystemStreamPartition, or until the batch size has been reached. When that happens, Samza defers to the MessageChooser to determine the next message to process. It then again tries to continue consume from the chosen message&rsquo;s SystemStreamPartition until the batch size is reached.</p>
+<p>With this setting, Samza tries to read a message from the most recently used <a href="../api/javadocs/org/apache/samza/system/SystemStreamPartition.html">SystemStreamPartition</a>. This behavior continues either until no more messages are available for that SystemStreamPartition, or until the batch size has been reached. When that happens, Samza defers to the MessageChooser to determine the next message to process. It then again tries to continue consume from the chosen message’s SystemStreamPartition until the batch size is reached.</p>
 
-<h2 id="serialization"><a href="serialization.html">Serialization &raquo;</a></h2>
+<h2 id="serialization-"><a href="serialization.html">Serialization »</a></h2>
 
            
         </div>

Modified: samza/site/learn/documentation/latest/container/windowing.html
URL: http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/container/windowing.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/container/windowing.html (original)
+++ samza/site/learn/documentation/latest/container/windowing.html Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a href="/learn/documentation/1.8.0/container/windowing">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a href="/learn/documentation/1.7.0/container/windowing">1.7.0</a></li>
+
+              
+
               <li class="hide"><a href="/learn/documentation/1.6.0/container/windowing">1.6.0</a></li>
 
               
@@ -641,40 +655,40 @@
 
 <p>Sometimes a stream processing job needs to do something in regular time intervals, regardless of how many incoming messages the job is processing. For example, say you want to report the number of page views per minute. To do this, you increment a counter every time you see a page view event. Once per minute, you send the current counter value to an output stream and reset the counter to zero.</p>
 
-<p>Samza&rsquo;s <em>windowing</em> feature provides a way for tasks to do something in regular time intervals, for example once per minute. To enable windowing, you just need to set one property in your job configuration:</p>
+<p>Samza’s <em>windowing</em> feature provides a way for tasks to do something in regular time intervals, for example once per minute. To enable windowing, you just need to set one property in your job configuration:</p>
 
-<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># Call the window() method every 60 seconds</span>
-<span class="na">task.window.ms</span><span class="o">=</span><span class="s">60000</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># Call the window() method every 60 seconds
+task.window.ms=60000</code></pre></figure>
 
 <p>Next, your stream task needs to implement the <a href="../api/javadocs/org/apache/samza/task/WindowableTask.html">WindowableTask</a> interface. This interface defines a window() method which is called by Samza in the regular interval that you configured.</p>
 
 <p>For example, this is how you would implement a basic per-minute event counter:</p>
 
-<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">EventCounterTask</span> <span class="kd">implements</span> <span class="n">StreamTask</span><span class="o">,</span> <span class="n">WindowableTask</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">EventCounterTask</span> <span class="kd">implements</span> <span class="nc">StreamTask</span><span class="o">,</span> <span class="nc">WindowableTask</span> <span class="o">{</span>
 
-  <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">SystemStream</span> <span class="n">OUTPUT_STREAM</span> <span class="o">=</span>
-    <span class="k">new</span> <span class="n">SystemStream</span><span class="o">(</span><span class="s">&quot;kafka&quot;</span><span class="o">,</span> <span class="s">&quot;events-per-minute&quot;</span><span class="o">);</span>
+  <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="nc">SystemStream</span> <span class="no">OUTPUT_STREAM</span> <span class="o">=</span>
+    <span class="k">new</span> <span class="nf">SystemStream</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">,</span> <span class="s">"events-per-minute"</span><span class="o">);</span>
 
   <span class="kd">private</span> <span class="kt">int</span> <span class="n">eventsSeen</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
 
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span>
-                      <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
-                      <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span>
+                      <span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
+                      <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
     <span class="n">eventsSeen</span><span class="o">++;</span>
   <span class="o">}</span>
 
-  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">window</span><span class="o">(</span><span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
-                     <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span class="n">OUTPUT_STREAM</span><span class="o">,</span> <span class="n">eventsSeen</span><span class="o">));</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">window</span><span class="o">(</span><span class="nc">MessageCollector</span> <span class="n">collector</span><span class="o">,</span>
+                     <span class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="nc">OutgoingMessageEnvelope</span><span class="o">(</span><span class="no">OUTPUT_STREAM</span><span class="o">,</span> <span class="n">eventsSeen</span><span class="o">));</span>
     <span class="n">eventsSeen</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>If you need to send messages to output streams, you can use the <a href="../api/javadocs/org/apache/samza/task/MessageCollector.html">MessageCollector</a> object passed to the window() method. Please only use that MessageCollector object for sending messages, and don&rsquo;t use it outside of the call to window().</p>
+<p>If you need to send messages to output streams, you can use the <a href="../api/javadocs/org/apache/samza/task/MessageCollector.html">MessageCollector</a> object passed to the window() method. Please only use that MessageCollector object for sending messages, and don’t use it outside of the call to window().</p>
 
-<p>Note that Samza uses <a href="event-loop.html">single-threaded execution</a>, so the window() call can never happen concurrently with a process() call. This has the advantage that you don&rsquo;t need to worry about thread safety in your code (no need to synchronize anything), but the downside that the window() call may be delayed if your process() method takes a long time to return.</p>
+<p>Note that Samza uses <a href="event-loop.html">single-threaded execution</a>, so the window() call can never happen concurrently with a process() call. This has the advantage that you don’t need to worry about thread safety in your code (no need to synchronize anything), but the downside that the window() call may be delayed if your process() method takes a long time to return.</p>
 
-<h2 id="coordinator-stream"><a href="coordinator-stream.html">Coordinator Stream &raquo;</a></h2>
+<h2 id="coordinator-stream-"><a href="coordinator-stream.html">Coordinator Stream »</a></h2>
 
            
         </div>