You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/06/09 22:35:58 UTC

svn commit: r1601502 [2/3] - in /incubator/samza/site: ./ css/ img/0.7.0/learn/documentation/container/ learn/documentation/0.7.0/ learn/documentation/0.7.0/api/ learn/documentation/0.7.0/comparisons/ learn/documentation/0.7.0/container/ learn/document...

Modified: incubator/samza/site/learn/documentation/0.7.0/container/state-management.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/container/state-management.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/container/state-management.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/container/state-management.html Mon Jun  9 20:35:57 2014
@@ -72,202 +72,154 @@
           <div class="content">
             <h2>State Management</h2>
 
-<p>One of the more interesting aspects of Samza is the ability for tasks to store data locally and execute rich queries on this data.</p>
+<p>One of the more interesting features of Samza is stateful stream processing. Tasks can store and query data through APIs provided by Samza. That data is stored on the same machine as the stream task; compared to connecting over the network to a remote database, Samza&#39;s local state allows you to read and write large amounts of data with better performance. Samza replicates this state across multiple machines for fault-tolerance (described in detail below).</p>
 
-<p>Of course simple filtering or single-row transformations can be done without any need for collecting state. A simple analogy to SQL may make this more obvious. The select- and where-clauses of a SQL query don&#39;t usually require state: these can be executed a row at a time on input data and maintain state between rows. The rest of SQL, multi-row aggregations and joins, require accumulating state between rows. Samza doesn&#39;t provide a high-level language like SQL but it does provide lower-level primitives that make streaming aggregation and joins and other stateful processing easy to implement.</p>
+<p>Some stream processing jobs don&#39;t require state: if you only need to transform one message at a time, or filter out messages based on some condition, your job can be simple. Every call to your task&#39;s <a href="../api/overview.html">process method</a> handles one incoming message, and each message is independent of all the other messages.</p>
 
-<p>Let&#39;s dive into how this works and why it is useful.</p>
+<p>However, being able to maintain state opens up many possibilities for sophisticated stream processing jobs: joining input streams, grouping messages and aggregating groups of messages. By analogy to SQL, the <em>select</em> and <em>where</em> clauses of a query are usually stateless, but <em>join</em>, <em>group by</em> and aggregation functions like <em>sum</em> and <em>count</em> require state. Samza doesn&#39;t yet provide a higher-level SQL-like language, but it does provide lower-level primitives that you can use to implement streaming aggregation and joins.</p>
 
 <h3>Common use cases for stateful processing</h3>
 
-<p>First, let&#39;s look at some simplistic examples of stateful stream processing that might be seen on a consumer website. Later in this document we&#39;ll go through specific details of using Samza&#39;s built-in key-value storage capabilities to implement each of these applications, but for now it is enough just to see some examples of the kind of applications that tend to need to manage state.</p>
+<p>First, let&#39;s look at some simple examples of stateful stream processing that might be seen in the backend of a consumer website. Later in this page we&#39;ll discuss how to implement these applications using Samza&#39;s built-in key-value storage capabilities.</p>
 
-<h5>Windowed aggregation</h5>
+<h4>Windowed aggregation</h4>
 
-<p>Example: Counting the number of page views for each user per hour</p>
+<p><em>Example: Counting the number of page views for each user per hour</em></p>
 
-<p>This kind of windowed processing is common for ranking and relevance, detecting &quot;trending topics&quot;, as well as simple real-time reporting and monitoring. For small windows one can just maintain the aggregate in memory and manually commit the task position only at window boundaries. However this means we have to recover up to a full window on fail-over, which will be very slow for large windows due to the amount of reprocessing. For large (or infinite!) windows it is better to make the in-process aggregation fault-tolerant rather than try to recompute it.</p>
+<p>In this case, your state typically consists of a number of counters which are incremented when a message is processed. The aggregation is typically limited to a time window (e.g. 1 minute, 1 hour, 1 day) so that you can observe changes of activity over time. This kind of windowed processing is common for ranking and relevance, detecting &quot;trending topics&quot;, as well as real-time reporting and monitoring.</p>
 
-<h5>Table-table join</h5>
+<p>The simplest implementation keeps this state in memory (e.g. a hash map in the task instances), and writes it to a database or output stream at the end of every time window. However, you need to consider what happens when a container fails and your in-memory state is lost. You might be able to restore it by processing all the messages in the current window again, but that might take a long time if the window covers a long period of time. Samza can speed up this recovery by making the state fault-tolerant rather than trying to recompute it.</p>
 
-<p>Example: Join a table of user profiles to a table of user_settings by user_id and emit a &quot;materialized view&quot; of the joined stream</p>
+<h4>Table-table join</h4>
 
-<p>This example is somewhat simplistic: one might wonder why you would want to join two tables in a stream processing system. However real-life examples are often far more complex then what would normally be considered the domain of materialized views over tables. Consider a few examples of real-time data normalization:</p>
+<p><em>Example: Join a table of user profiles to a table of user settings by user_id and emit the joined stream</em></p>
+
+<p>You might wonder: does it make sense to join two tables in a stream processing system? It does if your database can supply a log of all the changes in the database. There is a <a href="http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying">duality between a database and a changelog stream</a>: you can publish every data change to a stream, and if you consume the entire stream from beginning to end, you can reconstruct the entire contents of the database. Samza is designed for data processing jobs that follow this philosophy.</p>
+
+<p>If you have changelog streams for several database tables, you can write a stream processing job which keeps the latest state of each table in a local key-value store, where you can access it much faster than by making queries to the original database. Now, whenever data in one table changes, you can join it with the latest data for the same key in the other table, and output the joined result.</p>
+
+<p>There are several real-life examples of data normalization which essentially work in this way:</p>
 
 <ul>
 <li>E-commerce companies like Amazon and EBay need to import feeds of merchandise from merchants, normalize them by product, and present products with all the associated merchants and pricing information.</li>
-<li>Web search requires building a crawler which creates essentially a <a href="http://labs.yahoo.com/files/YahooWebmap.pdf">table of web page contents</a> and joins on all the relevance attributes such as page CTR or pagerank.</li>
+<li>Web search requires building a crawler which creates essentially a <a href="http://labs.yahoo.com/files/YahooWebmap.pdf">table of web page contents</a> and joins on all the relevance attributes such as click-through ratio or pagerank.</li>
 <li>Social networks take feeds of user-entered text and need to normalize out entities such as companies, schools, and skills.</li>
 </ul>
 
-<p>Each of these use cases is a massively complex data normalization problem that can be thought of as constructing a very complex materialized view over many input tables.</p>
+<p>Each of these use cases is a massively complex data normalization problem that can be thought of as constructing a materialized view over many input tables. Samza can help implement such data processing pipelines robustly.</p>
+
+<h4>Stream-table join</h4>
 
-<h5>Stream-table join</h5>
+<p><em>Example: Augment a stream of page view events with the user&#39;s ZIP code (perhaps to allow aggregation by zip code in a later stage)</em></p>
 
-<p>Example: Join user region information on to a stream of page views to create an augmented stream of page view with region.</p>
+<p>Joining side-information to a real-time feed is a classic use for stream processing. It&#39;s particularly common in advertising, relevance ranking, fraud detection and other domains. Activity events such as page views generally only include a small number of attributes, such as the ID of the viewer and the viewed items, but not detailed attributes of the viewer and the viewed items, such as the ZIP code of the user. If you want to aggregate the stream by attributes of the viewer or the viewed items, you need to join with the users table or the items table respectively.</p>
 
-<p>Joining side-information to a real-time feed is a classic use for stream processing. It&#39;s particularly common in advertising, relevance ranking, fraud detection and other domains. Activity data such as page views are generally captured with only a few primary keys, the additional attributes about the viewer and viewed items that are needed for processing need to joined on after-the-fact.</p>
+<p>In data warehouse terminology, you can think of the raw event stream as rows in the central fact table, which needs to be joined with dimension tables so that you can use attributes of the dimensions in your analysis.</p>
 
-<h5>Stream-stream join</h5>
+<h4>Stream-stream join</h4>
 
-<p>Example: Join a stream of ad clicks to a stream of ad views to link the ad view that lead to the click</p>
+<p><em>Example: Join a stream of ad clicks to a stream of ad impressions (to link the information on when the ad was shown to the information on when it was clicked)</em></p>
 
-<p>This is the classic stream join for &quot;nearly aligned&quot; streams. If the events that need to be joined arrive in a limited window it may be possible to buffer unjoined events in memory. Obviously this will be only approximate: any in-flight items will be lost if a machine crashes. However for more exact results, or to handle a very large window of misalignment, stateful processing is needed.</p>
+<p>A stream join is useful for &quot;nearly aligned&quot; streams, where you expect to receive related events on several input streams, and you want to combine them into a single output event. You cannot rely on the events arriving at the stream processor at the same time, but you can set a maximum period of time over which you allow the events to be spread out.</p>
 
-<h5>More</h5>
+<p>In order to perform a join between streams, your job needs to buffer events for the time window over which you want to join. For short time windows, you can do this in memory (at the risk of losing events if the machine fails). You can also use Samza&#39;s state store to buffer events, which supports buffering more messages than you can fit in memory.</p>
 
-<p>Of course there are infinite variations on joins and aggregations, but most amount to essentially variations and combinations of the above patterns.</p>
+<h4>More</h4>
+
+<p>There are many variations of joins and aggregations, but most are essentially variations and combinations of the above patterns.</p>
 
 <h3>Approaches to managing task state</h3>
 
-<p>So how do systems support this kind of stateful processing? We&#39;ll lead in by describing what we have seen in other systems and then describe what Samza does.</p>
+<p>So how do systems support this kind of stateful processing? We&#39;ll lead in by describing what we have seen in other stream processing systems, and then describe what Samza does.</p>
 
 <h4>In-memory state with checkpointing</h4>
 
-<p>A simple approach, common in academic stream processing systems, is to periodically save out the state of the task&#39;s in-memory data. S4&#39;s <a href="http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance">state management</a> implements this approach&mdash;tasks implement Java&#39;s serializable interface and are periodically serialized using java serialization to save out copies of the processor state.</p>
+<p>A simple approach, common in academic stream processing systems, is to periodically save the task&#39;s entire in-memory data to durable storage. This approach works well if the in-memory state consists of only a few values. However, you have to store the complete task state on each checkpoint, which becomes increasingly expensive as task state grows. Unfortunately, many non-trivial use cases for joins and aggregation have large amounts of state &mdash; often many gigabytes. This makes full dumps of the state impractical.</p>
 
-<p>This approach works well enough if the in-memory state consists of only a few values. However since you have to save out the complete task state on each save this will become increasingly expensive as task state grows. Unfortunately most use cases we have seen revolve around joins and aggregation and so have large amounts of state&mdash;often many gigabytes. This makes periodic full dumps extremely impractical. Some academic systems handle this case by having the tasks produce &quot;diffs&quot; in addition to full checkpoints. However this requires a great deal of complexity in the task to track what has changed and efficiently produce a compact diff of changes.</p>
+<p>Some academic systems produce <em>diffs</em> in addition to full checkpoints, which are smaller if only some of the state has changed since the last checkpoint. <a href="../comparisons/storm.html">Storm&#39;s Trident abstraction</a> similarly keeps an in-memory cache of state, and periodically writes any changes to a remote store such as Cassandra. However, this optimization only helps if most of the state remains unchanged. In some use cases, such as stream joins, it is normal to have a lot of churn in the state, so this technique essentially degrades to making a remote database request for every message (see below).</p>
 
 <h4>Using an external store</h4>
 
-<p>In the absence of built-in support a common pattern for stateful processing is to push any state that would be accumulated between rows into an external database or key-value store. The database holds aggregates or the dataset being queried to enrich the incoming stream. You get something that looks like this:</p>
+<p>Another common pattern for stateful processing is to store the state in an external database or key-value store. Conventional database replication can be used to make that database fault-tolerant. The architecture looks something like this:</p>
 
 <p><img src="/img/0.7.0/learn/documentation/container/stream_job_and_db.png" alt="state-kv-store"></p>
 
-<p>Samza allows this style of processing (nothing will stop you from querying a remote database or service from your job) but also supports stateful processing natively in a way we think is often superior.</p>
-
-<h4>The problems of remote stores</h4>
-
-<p>To understand why this is useful let&#39;s first understand some of the drawbacks of making remote queries in a stream processing job:</p>
+<p>Samza allows this style of processing &mdash; there is nothing to stop you querying a remote database or service within your job. However, there are a few reasons why a remote database can be problematic for stateful stream processing:</p>
 
 <ol>
-<li><strong>Performance</strong>: The first major drawback of making remote queries is that they are slow and expensive. For example, a Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core because it transfers large chunks of data at a time. But a remote database query is a more expensive proposition. Though the database may be partitioned and scalable this partitioning doesn&#39;t match the partitioning of the job into tasks so batching becomes much less effective. As a result you would expect to get a few thousand queries per second per core for remote requests. This means that adding a processing stage that uses an external database will often reduce the throughput by several orders of magnitude.</li>
-<li><strong>Isolation</strong>: If your database or service is also running live processing, mixing in asynchronous processing can be quite dangerous. A scalable stream processing system can run with very high parallelism. If such a job comes down (say for a code push) it queues up data for processing, when it restarts it will potentially have a large backlog of data to process. Since the job may actually have very high parallelism this can result in huge load spikes, many orders of magnitude higher than steady state load. If this load is mixed with live queries (i.e. the queries used to build web pages or render mobile UI or anything else that has a user waiting on the other end) then you may end up causing a denial-of-service attack on your live service.</li>
-<li><strong>Query Capabilities</strong>: Many scalable databases expose very limited query interfaces--only supporting simple key-value lookups. Doing the equivalent of a &quot;full table scan&quot; or rich traversal may not be practical in this model.</li>
-<li><strong>Correctness</strong>: If your task keeps counts or otherwise modifies state in a remote store how is this rolled back if the task fails? </li>
+<li><strong>Performance</strong>: Making database queries over a network is slow and expensive. A Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core to a stream processor, but if you need to make a remote request for every message you process, your throughput is likely to drop by 2-3 orders of magnitude. You can somewhat mitigate this with careful caching of reads and batching of writes, but then you&#39;re back to the problems of checkpointing, discussed above.</li>
+<li><strong>Isolation</strong>: If your database or service also serves requests to users, it can be dangerous to use the same database with a stream processor. A scalable stream processing system can run with very high throughput, and easily generates a huge amount of load (for example when catching up on a queue backlog). If you&#39;re not very careful, you may cause a denial-of-service attack on your own database, and cause problems for interactive requests from users.</li>
+<li><strong>Query Capabilities</strong>: Many scalable databases expose very limited query interfaces (e.g. only supporting simple key-value lookups), because the equivalent of a &quot;full table scan&quot; or rich traversal would be too expensive. Stream processes are often less latency-sensitive, so richer query capabilities would be more feasible.</li>
+<li><strong>Correctness</strong>: When a stream processor fails and needs to be restarted, how is the database state made consistent with the processing task? For this purpose, some frameworks such as <a href="../comparisons/storm.html">Storm</a> attach metadata to database entries, but it needs to be handled carefully, otherwise the stream process generates incorrect output.</li>
+<li><strong>Reprocessing</strong>: Sometimes it can be useful to re-run a stream process on a large amount of historical data, e.g. after updating your processing task&#39;s code. However, the issues above make this impractical for jobs that make external queries.</li>
 </ol>
 
-<p>Where these issues become particularly problematic is when you need to reprocess data. Your output, after all, is a combination of your code and your input&mdash;when you change your code you often want to reprocess input to recreate the output state with the new improved code. This is generally quite reasonable for pure stream processing jobs, but generally impractical for performance and isolation reasons for jobs that make external queries.</p>
-
 <h3>Local state in Samza</h3>
 
-<p>Samza allows tasks to maintain persistent, mutable, queryable state that is physically co-located with each task. The state is highly available: in the event of a task failure it will be restored when the task fails over to another machine.</p>
+<p>Samza allows tasks to maintain state in a way that is different from the approaches described above:</p>
 
-<p>You can think of this as taking the remote table out of the remote database and physically partitioning it up and co-locating these partitions with the tasks. This looks something like this:</p>
-
-<p><img src="/img/0.7.0/learn/documentation/container/stateful_job.png" alt="state-local"></p>
-
-<p>Note that now the state is physically on the same machine as the tasks, and each task has access only to its local partition. However the combination of stateful tasks with the normal partitioning capabilities Samza offers makes this a very general feature: you just repartition on the key by which you want to split your processing and then you have full local access to the data within storage in that partition.</p>
+<ul>
+<li>The state is stored on disk, so the job can maintain more state than would fit in memory.</li>
+<li>It is stored on the same machine as the processing task, to avoid the performance problems of making database queries over the network.</li>
+<li>Each job has its own datastore, to avoid the isolation problems of a shared database (if you make an expensive query, it affects only the current task, nobody else).</li>
+<li>Different storage engines can be plugged in, enabling rich query capabilities.</li>
+<li>The state is continuously replicated, enabling fault tolerance without the problems of checkpointing large amounts of state.</li>
+</ul>
 
-<p>Let&#39;s look at how this addresses the problems of the remote store:</p>
+<p>Imagine you take a remote database, partition it to match the number of tasks in the stream processing job, and co-locate each partition with its task. The result looks like this:</p>
 
-<ol>
-<li>This fixes the performance issues of remote queries because the data is now local, what would otherwise be a remote query may now just be a lookup against local memory or disk (we ship a <a href="https://code.google.com/p/leveldb">LevelDB</a>-based store which is described in detail below).</li>
-<li>The isolation issue goes away as well as the queries are executed against the same servers the job runs against and this computation is not intermixed with live service calls.</li>
-<li>Data is now local so any kind of data-intensive processing, scans, and filtering is now possible.</li>
-<li>Since the state changes are themselves modeled as a stream the store can abide by the same delivery and fault-tolerance guarantees that Samza gives tasks.</li>
-</ol>
+<p><img src="/img/0.7.0/learn/documentation/container/stateful_job.png" alt="state-local"></p>
 
-<p>This isn&#39;t always the right pattern to follow, it has a few drawbacks too.
-1. If the data is very large then storing it with each task that uses the data may use more space.
-1. As the per-container data size grows so too will the restore time for a failed task (50Mb/sec is a reasonable restore time to expect).</p>
+<p>If a machine fails, all the tasks running on that machine and their database partitions are lost. In order to make them highly available, all writes to the database partition are replicated to a durable changelog (typically Kafka). Now, when a machine fails, we can restart the tasks on another machine, and consume this changelog in order to restore the contents of the database partition.</p>
 
-<p>However we find that the local state approach is the best more often than not, and, of course, nothing prevents the use of external storage when needed.</p>
+<p>Note that each task only has access to its own database partition, not to any other task&#39;s partition. This is important: when you scale out your job by giving it more computing resources, Samza needs to move tasks from one machine to another. By giving each task its own state, tasks can be relocated without affecting the job&#39;s operation. If necessary, you can repartition your streams so that all messages for a particular database partition are routed to the same task instance.</p>
 
-<h3>Databases as input streams</h3>
+<p><a href="http://kafka.apache.org/documentation.html#compaction">Log compaction</a> runs in the background on the changelog topic, and ensures that the changelog does not grow indefinitely. If you overwrite the same value in the store many times, log compaction keeps only the most recent value, and throws away any old values in the log. If you delete an item from the store, log compaction also removes it from the log. With the right tuning, the changelog is not much bigger than the database itself.</p>
 
-<p>In cases where we were querying the external database on each input message we can transform this to local processing by instead transforming the database into a stream of row changes. These changes can be taken as input by a task, stored, and queried against just as the remote database would be.</p>
+<p>With this architecture, Samza allows tasks to maintain large amounts of fault-tolerant state, at a performance that is almost as good as a pure in-memory implementation. There are just a few limitations:</p>
 
-<p>But how can you get such a stream? Many databases such Oracle, HBase, MySQL, and MongoDB offer built-in support for directly capturing changes. If not this can be done by publishing a stream of changes to Kafka or by implementing our <a href="streams.html">pluggable stream interface</a> to directly poll the database for changes (say by some last_modified timestamp). You want this to be done in a way that you can reset the offset or timestamp back to zero to replay the current state of the database as changes if you need to reprocess. If this stream capture is efficient enough you can often avoid having changelogs for your tasks and simply replay them from the source when they fail.</p>
+<ul>
+<li>If you have some data that you want to share between tasks (across partition boundaries), you need to go to some additional effort to repartition and distribute the data. Each task will need its own copy of the data, so this may use more space overall.</li>
+<li>When a container is restarted, it can take some time to restore the data in all of its state partitions. The time depends on the amount of data, the storage engine, your access patterns, and other factors. As a rule of thumb, 50&nbsp;MB/sec is a reasonable restore time to expect.</li>
+</ul>
 
-<p>A wonderful contribution would be a generic jdbc-based stream implementation for extracting changes from relational databases by modified date.</p>
+<p>Nothing prevents you from using an external database if you want to, but for many use cases, Samza&#39;s local state is a powerful tool for enabling stateful stream processing.</p>
 
 <h3>Key-value storage</h3>
 
-<p>Though the storage format is pluggable, we provide a key-value store implementation to tasks out-of-the-box that gives the usual put/get/delete/range queries. This is backed by a highly available &quot;changelog&quot; stream that provides fault-tolerance by acting as a kind of <a href="http://en.wikipedia.org/wiki/Redo_log">redo log</a> for the task&#39;s state (we describe this more in the next section).</p>
-
-<p>This key-value storage engine is built on top of <a href="https://code.google.com/p/leveldb">LevelDB</a> using a <a href="https://github.com/fusesource/leveldbjni">LevelDB JNI API</a>. LevelDB has several nice properties. First it maintains data outside the java heap which means it is immediately preferable to any simple approach using a hash table both because of memory-efficiency and to avoid GC. It will use an off-heap memory cache and when that is exhausted go to disk for lookups&mdash;so small data sets can be <a href="https://code.google.com/p/leveldb">very fast</a> and non-memory-resident datasets, though slower, are still possible. It is <a href="http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/">log-structured</a> and writes can be performed at close to disk speeds. It also does built-in block compression which helps to reduce both I/O and memory usage.</p>
-
-<p>The nature of Samza&#39;s usage allows us to optimize this further. We add an optional &quot;L1&quot; LRU cache which is in-heap and holds deserialized rows. This cache is meant to be very small and lets us introduce several optimizations for both reads and writes.</p>
-
-<p>The cache is an &quot;object&quot; or &quot;row&quot; cache&mdash;that is it maintains the java objects stored with no transformation or serialization. This complements LevelDB&#39;s own block level caching well. Reads and writes both populate the cache, and reads on keys in the cache avoid the cost of deserialization for these very common objects.</p>
-
-<p>For writes the cache provides two benefits. Since LevelDB is itself really only a persistent &quot;cache&quot; in our architecture we do not immediately need to apply every write to the filesystem. We can batch together a few hundred writes and apply them all at once. LevelDB heavily optimizes this kind of batch write. This does not impact consistency&mdash;a task always reads what it wrote (since it checks the cache first and is the only writer to its store). Secondly the cache effectively deduplicates updates so that if multiple updates to the same key occur close together we can optimize away all but the final write to leveldb and the changelog. For example, an important use case is maintaining a small number of counters that are incremented on every input. A naive implementation would need to write out each new value to LevelDB as well as perhaps logging the change out to the changelog for the task. In the extreme case where you had only a single variable, x, incremented on e
 ach input, an uncached implementation would produce writes in the form &quot;x=1&quot;, &quot;x=2&quot;, &quot;x=3&quot;, etc which is quite inefficient. This is overkill, we only need to flush to the changelog at <a href="checkpointing.html">commit points</a> not on every write. This allows us to &quot;deduplicate&quot; the writes that go to leveldb and the changelog to just the final value before the commit point (&quot;x=3&quot; or whatever it happened to be).</p>
-
-<p>The combination of these features makes it possible to provide highly available processing that performs very close to memory speeds for small datasets yet still scales up to TBs of data (partitioned up across all the tasks).</p>
-
-<h3>Fault-tolerance</h3>
-
-<p>As mentioned the actual local storage (i.e. LevelDB for key-value storage) is really just a cache. How can we ensure that this data is not lost when a machine fails and the tasks running on that machine have to be brought up on another machine (which, of course, doesn&#39;t yet have the local persistent state)?</p>
-
-<p>The answer is that Samza handles state as just another stream. There are two mechanisms for accomplishing this.</p>
-
-<p>The first approach is just to allow the task to replay one or more of its input streams to populate its store when it restarts. This works well if the input stream maintains the complete data (as a stream fed by a database table might) and if the input stream is fast enough to make this practical. This requires no framework support.</p>
-
-<p>However often the state that is stored is much smaller than the input stream (because it is an aggregation or projection of the original input streams). Or the input stream may not maintain a complete, replayable set of inputs (say for event logs). To support these cases we provide the ability to back the state of the store with a changelog stream. A changelog is just a stream to which the task logs each change to its state&mdash;i.e. the sequence of key-value pairs applied to the local store. Changelogs are co-partitioned with their tasks (so each task has its own stream partition for which it is the only writer).</p>
-
-<p>The changelogs are just normal streams&mdash;other downstream tasks can subscribe to this state and use it. And it turns out that very often the most natural way to represent the output of a job is as the changelog of its task (we&#39;ll show some examples in a bit).</p>
+<p>Any storage engine can be plugged into Samza, as described below. Out of the box, Samza ships with a key-value store implementation that is built on <a href="https://code.google.com/p/leveldb">LevelDB</a> using a <a href="https://github.com/fusesource/leveldbjni">JNI API</a>.</p>
 
-<p>Of course a log of changes only grows over time so this would soon become impractical. Kafka has <a href="http://kafka.apache.org/documentation#compaction">log compaction</a> which provides special support for this kind of use case, though. This feature allows Kafka to compact duplicate entries (i.e. multiple updates with the same key) in the log rather than just deleting old log segments. This feature is available since Kafka 0.8.1.</p>
+<p>LevelDB has several nice properties. Its memory allocation is outside of the Java heap, which makes it more memory-efficient and less prone to garbage collection pauses than a Java-based storage engine. It is very fast for small datasets that fit in memory; datasets larger than memory are slower but still possible. It is <a href="http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/">log-structured</a>, allowing very fast writes. It also includes support for block compression, which helps to reduce I/O and memory usage.</p>
 
-<p>The Kafka brokers scale well up to terabytes of data per machine for changelogs as for other topics. Log compaction proceeds at about 50MB/sec/core or whatever the I/O limits of the broker are.</p>
+<p>Samza includes an additional in-memory caching layer in front of LevelDB, which avoids the cost of deserialization for frequently-accessed objects and batches writes. If the same key is updated multiple times in quick succession, the batching coalesces those updates into a single write. The writes are flushed to the changelog when a task <a href="checkpointing.html">commits</a>.</p>
 
-<h3>Other storage engines</h3>
-
-<p>One interesting aspect of this design is that the fault-tolerance mechanism is completely decoupled from the query apis the storage engine provides to the task or the way it stores data on disk or in memory. We have provided a key-value index with Samza, but you can easily implement and plug-in storage engines that are optimized for other types of queries and plug them in to our fault tolerance mechanism to provide different query capabilities to your tasks.</p>
-
-<p>Here are a few examples of storage engine types we think would be interesting to pursue in the future (patches accepted!):</p>
-
-<h5>Persistent heap</h5>
-
-<p>A common operation in stream processing is to maintain a running top-N. There are two primary applications of this. The first is ranking items over some window. The second is performing a &quot;bounded sort&quot; operation to transform a nearly sorted input stream into a totally sorted output stream. This occurs when dealing with a data stream where the order is by arrival and doesn&#39;t exactly match the source timestamp (for example log events collected across many machines).</p>
-
-<h5>Sketches</h5>
-
-<p>Many applications don&#39;t require exact results and for these <a href="http://infolab.stanford.edu/%7Eullman/mmds/ch4.pdf">approximate algorithms</a> such as <a href="http://en.wikipedia.org/wiki/Bloom_filter">bloom filters</a> for set membership, <a href="http://research.google.com/pubs/pub40671.html">hyperloglog</a> for counting distinct keys, and a multitude of algorithms for quantile and histogram approximation.</p>
-
-<p>These algorithms are inherently approximate but good algorithms give a strong bound on the accuracy of the approximation. This obviously doesn&#39;t carry over well to the case where the task can crash and lose all state. By logging out the changes to the structure we can ensure it is restored on fail-over. The nature of sketch algorithms allows significant opportunity for optimization in the form of logging. </p>
-
-<h5>Inverted index</h5>
-
-<p>Inverted indexes such as is provided by <a href="http://lucene.apache.org">Lucene</a> are common for text matching and other applications that do matching and ranking with selective queries and large result sets. </p>
-
-<h5>More</h5>
-
-<p>There are a variety of other storage engines that could be useful:</p>
-
-<ul>
-<li>For small datasets logged, in-memory collections may be ideal.</li>
-<li>Specialized data structures for graph traversal are common.</li>
-<li>Many applications are doing OLAP-like aggregations on their input. It might be possible to optimize these kinds of dimensional summary queries.</li>
-</ul>
-
-<h3>Using the key-value store</h3>
-
-<p>In this section we will give a quick tutorial on configuring and using the key-value store.</p>
-
-<p>To declare a new store for usage you add the following to your job config:</p>
+<p>To use a key-value store in your job, add the following to your job config:</p>
 <div class="highlight"><pre><code class="text language-text" data-lang="text"># Use the key-value store implementation for a store called &quot;my-store&quot;
 stores.my-store.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
 
-# Log changes to the store to an output stream for restore
-# If no changelog is specified the store will not be logged (but you can still rebuild off your input streams)
-stores.my-store.changelog=kafka.my-stream-name
+# Use the Kafka topic &quot;my-store-changelog&quot; as the changelog stream for this store.
+# This enables automatic recovery of the store after a failure. If you don&#39;t
+# configure this, no changelog stream will be generated.
+stores.my-store.changelog=kafka.my-store-changelog
 
-# The serialization format to use
+# Encode keys and values in the store as UTF-8 strings.
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 stores.my-store.key.serde=string
 stores.my-store.msg.serde=string
 </code></pre></div>
-<p>Here is some simple example code that only writes to the store:</p>
+<p>See the <a href="serialization.html">serialization section</a> for more information on the <em>serde</em> options.</p>
+
+<p>Here is a simple example that writes every incoming message to the store:</p>
 <div class="highlight"><pre><code class="text language-text" data-lang="text">public class MyStatefulTask implements StreamTask, InitableTask {
   private KeyValueStore&lt;String, String&gt; store;
 
   public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore&lt;String, String&gt;) context.getStore(&quot;store&quot;);
+    this.store = (KeyValueStore&lt;String, String&gt;) context.getStore(&quot;my-store&quot;);
   }
 
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-    System.out.println(&quot;Adding &quot; + envelope.getKey() + &quot; =&gt; &quot; + envelope.getMessage() + &quot; to the store.&quot;);
+  public void process(IncomingMessageEnvelope envelope,
+                      MessageCollector collector,
+                      TaskCoordinator coordinator) {
     store.put((String) envelope.getKey(), (String) envelope.getMessage());
   }
 }
@@ -282,78 +234,86 @@ stores.my-store.msg.serde=string
   KeyValueIterator&lt;K,V&gt; all();
 }
 </code></pre></div>
-<p>Here is a list of additional configurations accepted by the key-value store along with their default values:</p>
+<p>Here is a list of additional configurations accepted by the key-value store, along with their default values:</p>
 <div class="highlight"><pre><code class="text language-text" data-lang="text"># The number of writes to batch together
 stores.my-store.write.batch.size=500
 
-# The total number of objects to cache in the &quot;L1&quot; object cache. This must be at least as large as the batch.size.
+# The number of objects to keep in Samza&#39;s cache (in front of LevelDB).
+# This must be at least as large as write.batch.size.
 # A cache size of 0 disables all caching and batching.
 stores.my-store.object.cache.size=1000
 
-# The size of the off-heap leveldb block cache in bytes, per container. If you have multiple tasks within
-# one container, each task is given a proportional share of this cache.
+# The size of the off-heap leveldb block cache in bytes, per container.
+# If you have multiple tasks within one container, each task is given a
+# proportional share of this cache.
 stores.my-store.container.cache.size.bytes=104857600
 
-# The amount of memory leveldb uses for buffering writes before they are written to disk, per container.
-# If you have multiple tasks within one container, each task is given a proportional share of this buffer.
+# The amount of memory leveldb uses for buffering writes before they are
+# written to disk, per container. If you have multiple tasks within one
+# container, each task is given a proportional share of this buffer.
 # This setting also determines the size of leveldb&#39;s segment files.
 stores.my-store.container.write.buffer.size.bytes=33554432
 
 # Enable block compression? (set compression=none to disable)
 stores.my-store.leveldb.compression=snappy
 
-# If compression is enabled, leveldb groups approximately this many uncompressed bytes into one compressed block.
-# You probably don&#39;t need to change this unless you are a compulsive fiddler.
+# If compression is enabled, leveldb groups approximately this many
+# uncompressed bytes into one compressed block. You probably don&#39;t need
+# to change this unless you are a compulsive fiddler.
 stores.my-store.leveldb.block.size.bytes=4096
 </code></pre></div>
 <h3>Implementing common use cases with the key-value store</h3>
 
-<p>Let&#39;s look at how you can address some of the common use-cases we discussed before using the key-value storage engine.</p>
+<p>Earlier in this section we discussed some example use cases for stateful stream processing. Let&#39;s look at how each of these could be implemented using a key-value storage engine such as Samza&#39;s LevelDB.</p>
+
+<h4>Windowed aggregation</h4>
+
+<p><em>Example: Counting the number of page views for each user per hour</em></p>
 
-<h5>Windowed aggregation</h5>
+<p>Implementation: You need two processing stages.</p>
 
-<p>Example: Counting the number of page views for each user per hour</p>
+<ol>
+<li>The first one re-partitions the input data by user ID, so that all the events for a particular user are routed to the same stream task. If the input stream is already partitioned by user ID, you can skip this.</li>
+<li>The second stage does the counting, using a key-value store that maps a user ID to the running count. For each new event, the job reads the current count for the appropriate user from the store, increments it, and writes it back. When the window is complete (e.g. at the end of an hour), the job iterates over the contents of the store and emits the aggregates to an output stream.</li>
+</ol>
 
-<p>Implementation: We have two processing stages. The first partitions the input data by user id (if it&#39;s already partitioned by user id, which would be reasonable, you can skip this), and the second stage does the counting. The job has a single store containing the mapping of user_id to the running count. Each new input record would cause the job to retrieve the current running count, increment it and write back the count. When the window is complete (i.e. the hour is over), we iterate over the contents of our store and emit the aggregates.</p>
+<p>Note that this job effectively pauses at the hour mark to output its results. This is totally fine for Samza, as scanning over the contents of the key-value store is quite fast. The input stream is buffered while the job is doing this hourly work.</p>
 
-<p>One thing to note is that this job effectively pauses at the hour mark to output its results. This is unusual for stream processing, but totally fine for Samza&mdash;and we have specifically designed for this case. Scans over the contents of the key-value store will be quite fast and input data will buffer while the job is doing this scanning and emitting aggregates.</p>
+<h4>Table-table join</h4>
 
-<h5>Table-table join</h5>
+<p><em>Example: Join a table of user profiles to a table of user settings by user_id and emit the joined stream</em></p>
 
-<p>Example: Join a table of user profiles to a table of user settings by user_id and emit the joined stream</p>
+<p>Implementation: The job subscribes to the change streams for the user profiles database and the user settings database, both partitioned by user_id. The job keeps a key-value store keyed by user_id, which contains the latest profile record and the latest settings record for each user_id. When a new event comes in from either stream, the job looks up the current value in its store, updates the appropriate fields (depending on whether it was a profile update or a settings update), and writes back the new joined record to the store. The changelog of the store doubles as the output stream of the task.</p>
 
-<p>Implementation: The job subscribes to the change stream for user profiles and for user settings databases, both partitioned by user_id. The job keeps a single key-value store keyed by user_id containing the joined contents of profiles and settings. When a new record comes in from either stream it looks up the current value in its store and writes back the record with the appropriate fields updated (i.e. new profile fields if it was a profile update, and new settings fields if it was a settings update). The changelog of the store doubles as the output stream of the task.</p>
+<h4>Table-stream join</h4>
 
-<h5>Table-stream join</h5>
+<p><em>Example: Augment a stream of page view events with the user&#39;s ZIP code (perhaps to allow aggregation by zip code in a later stage)</em></p>
 
-<p>Example: Join user zip code to page view data (perhaps to allow aggregation by zip code in a later stage)</p>
+<p>Implementation: The job subscribes to the stream of user profile updates and the stream of page view events. Both streams must be partitioned by user_id. The job maintains a key-value store where the key is the user_id and the value is the user&#39;s ZIP code. Every time the job receives a profile update, it extracts the user&#39;s new ZIP code from the profile update and writes it to the store. Every time it receives a page view event, it reads the zip code for that user from the store, and emits the page view event with an added ZIP code field.</p>
 
-<p>Implementation: The job subscribes to the user profile stream and page view stream. Each time it gets a profile update it stores the zipcode keyed by user_id. Each time a page view arrives it looks up the zip code for the user and emits the enriched page view + zipcode event.</p>
+<p>If the next stage needs to aggregate by ZIP code, the ZIP code can be used as the partitioning key of the job&#39;s output stream. That ensures that all the events for the same ZIP code are sent to the same stream partition.</p>
 
-<h5>Stream-stream join</h5>
+<h4>Stream-stream join</h4>
 
-<p>Example: Join ad clicks to ad impressions by impression id (an impression is advertising terminology for the event that records the display of an ad)</p>
+<p><em>Example: Join a stream of ad clicks to a stream of ad impressions (to link the information on when the ad was shown to the information on when it was clicked)</em></p>
 
-<p>Note: In this example we are assuming that impressions are assigned a unique guid and this is present in both the original impression event and any subsequent click. In the absence of this the business logic for choosing the join could be substituted for the simple lookup.</p>
+<p>In this example we assume that each impression of an ad has a unique identifier, e.g. a UUID, and that the same identifier is included in both the impression and the click events. This identifier is used as the join key.</p>
 
-<p>Implementation: Partition the ad click and ad impression streams by the impression id or user id. The task keeps a store of unmatched clicks and unmatched impressions. When a click comes in we try to find its matching impression in the impression store, and vice versa. If a match is found emit the joined pair and delete the entry. If no match is found store the event to wait for a match. Since this is presumably a left outer join (i.e. every click has a corresponding impression but not vice versa) we will periodically scan the impression table and delete old impressions for which no click arrived.</p>
+<p>Implementation: Partition the ad click and ad impression streams by the impression ID or user ID (assuming that two events with the same impression ID always have the same user ID). The task keeps two stores, one containing click events and one containing impression events, using the impression ID as key for both stores. When the job receives a click event, it looks for the corresponding impression in the impression store, and vice versa. If a match is found, the joined pair is emitted and the entry is deleted. If no match is found, the event is written to the appropriate store. Periodically the job scans over both stores and deletes any old events that were not matched within the time window of the join.</p>
 
-<h3>Implementing storage engines</h3>
+<h3>Other storage engines</h3>
 
-<p>We mentioned that the storage engine interface was pluggable. Of course you can use any data structure you like in your task provided you can repopulate it off your inputs on failure. However to plug into our changelog infrastructure you need to implement a generic StorageEngine interface that handles restoring your state on failure and ensures that data is flushed prior to commiting the task position.</p>
+<p>Samza&#39;s fault-tolerance mechanism (sending a local store&#39;s writes to a replicated changelog) is completely decoupled from the storage engine&#39;s data structures and query APIs. While a key-value storage engine is good for general-purpose processing, you can easily add your own storage engines for other types of queries by implementing the <a href="../api/javadocs/org/apache/samza/storage/StorageEngine.html">StorageEngine</a> interface. Samza&#39;s model is especially amenable to embedded storage engines, which run as a library in the same process as the stream task. </p>
 
-<p>The above code shows usage of the key-value storage engine, but it is not too hard to implement an alternate storage engine. To do so, you implement methods to restore the contents of the store from a stream, flush any cached content on commit, and close the store:</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">public interface StorageEngine {
-  void restore(Iterator&lt;IncomingMessageEnvelope&gt; envelopes);
-  void flush();
-  void stop();
-}
-</code></pre></div>
-<p>The user specifies the type of storage engine they want by passing in a factory for that store in their configuration.</p>
+<p>Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), <a href="http://infolab.stanford.edu/%7Eullman/mmds/ch4.pdf">approximate algorithms</a> such as <a href="http://en.wikipedia.org/wiki/Bloom_filter">bloom filters</a> and <a href="http://research.google.com/pubs/pub40671.html">hyperloglog</a>, or full-text indexes such as <a href="http://lucene.apache.org">Lucene</a>. (Patches accepted!)</p>
 
 <h3>Fault tolerance semantics with state</h3>
 
-<p>Samza currently only supports at-least-once delivery guarantees in the presence of failure (this is sometimes referred to as &quot;guaranteed delivery&quot;). This means messages are not lost but if a task fails some messages may be redelivered. The guarantee holds from the commit point of the task which records the position from which the task will restart on failure (the user can either force a commit at convenient points or the framework will by default do this at regular intervals). This is true for both input, output, and changelog streams. This is a fairly weak guarantee&mdash;duplicates can give incorrect results in counts, for example. We have a plan to extend this to exact semantics in the presence of failure which we will include in a future release.</p>
+<p>As discussed in the section on <a href="checkpointing.html">checkpointing</a>, Samza currently only supports at-least-once delivery guarantees in the presence of failure (this is sometimes referred to as &quot;guaranteed delivery&quot;). This means that if a task fails, no messages are lost, but some messages may be redelivered.</p>
+
+<p>For many of the stateful processing use cases discussed above, this is not a problem: if the effect of a message on state is idempotent, it is safe for the same message to be processed more than once. For example, if the store contains the ZIP code for each user, then processing the same profile update twice has no effect, because the duplicate update does not change the ZIP code.</p>
+
+<p>However, for non-idempotent operations such as counting, at-least-once delivery guarantees can give incorrect results. If a Samza task fails and is restarted, it may double-count some messages that were processed shortly before the failure. We are planning to address this limitation in a future release of Samza.</p>
 
 <h2><a href="metrics.html">Metrics &raquo;</a></h2>
 

Modified: incubator/samza/site/learn/documentation/0.7.0/container/streams.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/container/streams.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/container/streams.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/container/streams.html Mon Jun  9 20:35:57 2014
@@ -72,16 +72,20 @@
           <div class="content">
             <h2>Streams</h2>
 
-<p>The <a href="task-runner.html">TaskRunner</a> reads and writes messages using the SystemConsumer and SystemProducer interfaces.</p>
+<p>The <a href="samza-container.html">samza container</a> reads and writes messages using the <a href="../api/javadocs/org/apache/samza/system/SystemConsumer.html">SystemConsumer</a> and <a href="../api/javadocs/org/apache/samza/system/SystemProducer.html">SystemProducer</a> interfaces. You can integrate any message broker with Samza by implementing these two interfaces.</p>
 <div class="highlight"><pre><code class="text language-text" data-lang="text">public interface SystemConsumer {
-
   void start();
 
   void stop();
 
-  void register(SystemStreamPartition systemStreamPartition, String lastReadOffset);
-
-  List&lt;IncomingMessageEnvelope&gt; poll(Map&lt;SystemStreamPartition, Integer&gt; systemStreamPartitions, long timeout) throws InterruptedException;
+  void register(
+      SystemStreamPartition systemStreamPartition,
+      String lastReadOffset);
+
+  List&lt;IncomingMessageEnvelope&gt; poll(
+      Map&lt;SystemStreamPartition, Integer&gt; systemStreamPartitions,
+      long timeout)
+    throws InterruptedException;
 }
 
 public class IncomingMessageEnvelope {
@@ -111,89 +115,66 @@ public class OutgoingMessageEnvelope {
   public Object getMessage() { ... }
 }
 </code></pre></div>
-<p>Out of the box, Samza supports reads and writes to Kafka (i.e. it has a KafkaSystemConsumer/KafkaSystemProducer), but the interfaces are pluggable, and most message bus systems can be plugged in, with some degree of support.</p>
+<p>Out of the box, Samza supports Kafka (KafkaSystemConsumer and KafkaSystemProducer). However, any message bus system can be plugged in, as long as it can provide the semantics required by Samza, as described in the <a href="../api/javadocs/org/apache/samza/system/SystemConsumer.html">javadoc</a>.</p>
 
-<p>A number of stream-related properties should be defined in your Samza job&#39;s configuration file. These properties define systems that Samza can read from, the streams on these systems, and how to serialize and deserialize the messages from the streams. For example, you might wish to read PageViewEvent from a specific Kafka cluster. The system properties in the configuration file would define how to connect to the Kafka cluster. The stream section would define PageViewEvent as an input stream. The serializer in the configuration would define the serde to use to decode PageViewEvent messages.</p>
+<p>SystemConsumers and SystemProducers may read and write messages of any data type. It&#39;s ok if they only support byte arrays &mdash; Samza has a separate <a href="serialization.html">serialization layer</a> which converts to and from objects that application code can use. Samza does not prescribe any particular data model or serialization format.</p>
 
-<p>When the TaskRunner starts up, it will use the stream-related properties in your configuration to instantiate consumers for each stream partition. For example, if your input stream is PageViewEvent, which has 12 partitions, then the TaskRunner would create 12 KafkaSystemConsumers. Each consumer will read ByteBuffers from one partition, deserialize the ByteBuffer to an object, and put them into a queue. This queue is what the <a href="event-loop.html">event loop</a> will use to feed messages to your StreamTask instances.</p>
+<p>The job configuration file can include properties that are specific to a particular consumer and producer implementation. For example, the configuration would typically indicate the hostname and port of the message broker to use, and perhaps connection options.</p>
 
-<p>In the process method in StreamTask, there is a MessageCollector parameter given to use. When the TaskRunner calls process() on one of your StreamTask instances, it provides the collector. After the process() method completes, the TaskRunner takes any output messages that your StreamTask wrote to the collector, serializes the messages, and calls the send() method on the appropriate SystemProducer.</p>
+<h3>How streams are processed</h3>
 
-<h3>Message Ordering</h3>
+<p>If a job is consuming messages from more than one input stream, and all input streams have messages available, messages are processed in a round robin fashion by default. For example, if a job is consuming AdImpressionEvent and AdClickEvent, the task instance&#39;s process() method is called with a message from AdImpressionEvent, then a message from AdClickEvent, then another message from AdImpressionEvent, ... and continues to alternate between the two.</p>
 
-<p>If a job is consuming messages from more than one system/stream/partition combination, by default, messages will be processed in a round robin fashion. For example, if a job is reading partitions 1 and 2 of page-view-events from a Kafka system, and there are messages available to be processed from both partitions, your StreamTask will get messages in round robin order (partition 1, partition 2, partition 1, partition 2, etc). If a message is not available for a given partition, it will be skipped, until a message becomes available.</p>
+<p>If one of the input streams has no new messages available (the most recent message has already been consumed), that stream is skipped, and the job continues to consume from the other inputs. It continues to check for new messages becoming available.</p>
 
 <h4>MessageChooser</h4>
 
-<p>The default round robin behavior can be overridden by implementing a custom MessageChooser. A MessageChooser&#39;s job is to answer the question, &quot;Given a set of incoming messages, which one should a Samza container process next?&quot;.  To write a custom MessageChooser, take a look at the <a href="../api/javadocs/org/apache/samza/system/MessageChooser.html">Javadocs</a>, and then configure your task with the &quot;task.chooser.class&quot; configuration, which should point to your MessageChooserFactory.</p>
+<p>When a Samza container has several incoming messages on different stream partitions, how does it decide which to process first? The behavior is determined by a <a href="../api/javadocs/org/apache/samza/system/chooser/MessageChooser.html">MessageChooser</a>. The default chooser is RoundRobinChooser, but you can override it by implementing a custom chooser.</p>
 
-<p>Out of the box, Samza ships with a RoundRobinChooser, which is the default. You can use the StreamChooser by adding the following configuration to your job.</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">task.chooser.class=org.apache.samza.system.YourStreamChooserFactory
+<p>To plug in your own message chooser, you need to implement the <a href="../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html">MessageChooserFactory</a> interface, and set the &quot;task.chooser.class&quot; configuration to the fully-qualified class name of your implementation:</p>
+<div class="highlight"><pre><code class="text language-text" data-lang="text">task.chooser.class=com.example.samza.YourMessageChooserFactory
 </code></pre></div>
-<h4>Prioritizing</h4>
+<h4>Prioritizing input streams</h4>
 
-<p>There are certain times when messages from a stream should be favored over messages from any other stream. For example, some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. A typical pattern is to have a Samza processor with a statistical model that is ranking a real-time feed of data. Periodically, this model needs to be retrained and updated. The Samza processor can be re-deployed with the new model, but how do you re-process all of the old data that the processor has already seen? This can be accomplished by having a batch system send messages to the Samza processor for any data that needs to be re-processed. In this example, you&#39;d like to favor the real-time system over the batch system, when messages are available for the real-time system. This prevents latency from being introduced into the real-time feed even when the batch system is sending messages by always processing the real-time messages first
 .</p>
+<p>There are certain times when messages from one stream should be processed with higher priority than messages from another stream. For example, some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. In this case, it&#39;s useful to prioritize the real-time stream over the batch stream, so that the real-time processing doesn&#39;t slow down if there is a sudden burst of data on the batch stream.</p>
 
-<p>Samza provides a mechanism to prioritize one stream over another by setting this value: systems.&lt;system&gt;.streams.&lt;stream&gt;.samza.priority=2. A config snippet illustrates the settings:</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.my-stream.samza.priority=2
-systems.kafka.streams.my-other-stream.samza.priority=1
+<p>Samza provides a mechanism to prioritize one stream over another by setting this configuration parameter: systems.&lt;system&gt;.streams.&lt;stream&gt;.samza.priority=&lt;number&gt;. For example:</p>
+<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.my-real-time-stream.samza.priority=2
+systems.kafka.streams.my-batch-stream.samza.priority=1
 </code></pre></div>
-<p>This declares that my-stream&#39;s messages will be processed before my-other-stream&#39;s. If my-stream has no messages available at the moment (because more are still being read in, for instance), then my-other-stream&#39;s messages will get processed.</p>
-
-<p>Each priority level gets its own MessageChooser. In the example above, one MessageChooser is used for my-stream, and another is used for my-other-stream. The MessageChooser for my-other-stream will only be used when my-stream&#39;s MessageChooser doesn&#39;t return a message to process. </p>
+<p>This declares that my-real-time-stream&#39;s messages should be processed with higher priority than my-batch-stream&#39;s messages. If my-real-time-stream has any messages available, they are processed first. Only if there are no messages currently waiting on my-real-time-stream, the Samza job continues processing my-batch-stream.</p>
 
-<p>It is also valid to define two streams with the same priority. If messages are available from two streams at the same priority level, it&#39;s up to the MessageChooser for that priority level to decide which message should be processed first.</p>
+<p>Each priority level gets its own MessageChooser. It is valid to define two streams with the same priority. If messages are available from two streams at the same priority level, it&#39;s up to the MessageChooser for that priority level to decide which message should be processed first.</p>
 
-<p>It&#39;s also valid to only define priorities for some streams. All non-prioritized streams will be treated as the lowest priority, and will share a single MessageChooser. If you had my-third-stream, as a third input stream in the example above, it would be prioritized as the lowest stream, and also get its own MessageChooser.</p>
+<p>It&#39;s also valid to only define priorities for some streams. All non-prioritized streams are treated as the lowest priority, and share a MessageChooser.</p>
 
 <h4>Bootstrapping</h4>
 
-<p>Some Samza jobs wish to fully consume a stream from offset 0 all the way through to the last message in the stream before they process messages from any other stream. This is useful for streams that have some key-value data that a Samza job wishes to use when processing messages from another stream. This is </p>
+<p>Sometimes, a Samza job needs to fully consume a stream (from offset 0 up to the most recent message) before it processes messages from any other stream. This is useful in situations where the stream contains some prerequisite data that the job needs, and it doesn&#39;t make sense to process messages from other streams until the job has loaded that prerequisite data. Samza supports this use case with <em>bootstrap streams</em>.</p>
 
-<p>Consider a case where you want to read a currency-code stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {&quot;country&quot;: &quot;USD&quot;, &quot;amount&quot;: 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {&quot;amount&quot;: &quot;$1234&quot;}.</p>
+<p>A bootstrap stream seems similar to a stream with a high priority, but is subtly different. Before allowing any other stream to be processed, a bootstrap stream waits for the consumer to explicitly confirm that the stream has been fully consumed. Until then, the bootstrap stream is the exclusive input to the job: even if a network issue or some other factor causes the bootstrap stream consumer to slow down, other inputs can&#39;t sneak their messages in.</p>
 
-<p>To bootstrap the currency-code stream, you need to read it from offset 0 all the way to the last message in the stream (what I&#39;m calling head). It is not desirable to read any message from the transactions stream until the currency-code stream has been fully read, or else you might try to join a transaction message to a country code that hasn&#39;t yet been read.</p>
+<p>Another difference between a bootstrap stream and a high-priority stream is that the bootstrap stream&#39;s special treatment is temporary: when it has been fully consumed (we say it has &quot;caught up&quot;), its priority drops to be the same as all the other input streams.</p>
 
-<p>Samza supports this style of processing with the systems.&lt;system&gt;.streams.&lt;stream&gt;.samza.bootstrap property.</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.currency-code.samza.bootstrap=true
+<p>To configure a stream called &quot;my-bootstrap-stream&quot; to be a fully-consumed bootstrap stream, use the following settings:</p>
+<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.my-bootstrap-stream.samza.bootstrap=true
+systems.kafka.streams.my-bootstrap-stream.samza.reset.offset=true
+systems.kafka.streams.my-bootstrap-stream.samza.offset.default=oldest
 </code></pre></div>
-<p>This configuration tells Samza that currency-code&#39;s messages should be read from the last checkpointed offset all the way until the stream is caught up to &quot;head&quot;, before any other message is processed. If you wish to process all messages in currency-code from offset 0 to head, you can define:</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.currency-code.samza.bootstrap=true
-systems.kafka.streams.currency-code.samza.reset.offset=true
-</code></pre></div>
-<p>This tells Samza to start from beginning of the currency-code stream, and read all the way to head.</p>
-
-<p>The difference between prioritizing a stream and bootstrapping a stream, is a high priority stream will still allow lower priority stream messages to be processed when no messages are available for the high priority stream. In the case of bootstrapping, no streams will be allowed to be processed until all messages in the bootstrap stream have been read up to the last message.</p>
+<p>The bootstrap=true parameter enables the bootstrap behavior (prioritization over other streams). The combination of reset.offset=true and offset.default=oldest tells Samza to always start reading the stream from the oldest offset, every time a container starts up (rather than starting to read from the most recent checkpoint).</p>
 
-<p>Once a bootstrap stream has been fully consumed (&quot;caught up&quot;), it is treated like a normal stream, and no bootstrapping logic happens.</p>
-
-<p>It is valid to define multiple bootstrap streams.</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.currency-code.samza.bootstrap=true
-systems.kafka.streams.other-bootstrap-stream.samza.bootstrap=true
-</code></pre></div>
-<p>In this case, currency-code and other-bootstrap-stream will both be processed before any other stream is processed. The order of message processing (the bootstrap order) between currency-code and other-bootstrap-stream is up to the MessageChooser. If you want to fully process one bootstrap stream before another, you can use priorities:</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.currency-code.samza.bootstrap=true
-systems.kafka.streams.currency-code.samza.priority=2
-systems.kafka.streams.other-bootstrap-stream.samza.bootstrap=true
-systems.kafka.streams.other-bootstrap-stream.samza.priority=1
-</code></pre></div>
-<p>This defines a specific bootstrap ordering: fully bootstrap currency-code before bootstrapping other-bootstrap-stream.</p>
-
-<p>Lastly, bootstrap and non-bootstrap prioritized streams can be mixed:</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">systems.kafka.streams.currency-code.samza.bootstrap=true
-systems.kafka.streams.non-bootstrap-stream.samza.priority=2
-systems.kafka.streams.other-non-bootstrap-stream.samza.priority=1
-</code></pre></div>
-<p>Bootstrap streams are assigned a priority of Int.MaxInt by default, so they will always be prioritized over any other prioritized stream. In this case, currency-code will be fully bootstrapped, and then treated as the highest priority stream (Int.IntMax). The next highest priority stream will be non-bootstrap-stream (priority 2), followed by other-non-bootstrap-stream (priority 1), and then any non-bootstrap/non-prioritized streams.</p>
+<p>It is valid to define multiple bootstrap streams. In this case, the order in which they are bootstrapped is determined by the priority.</p>
 
 <h4>Batching</h4>
 
-<p>There are cases where consuming from the same SystemStreamPartition repeatedly leads to better performance. Samza allows for consumer batching to satisfy this use case. For example, if you had two SystemStreamPartitions, SSP1 and SSP2, you might wish to read 100 messages from SSP1 and then one from SSP2, regardless of the MessageChooser that&#39;s used. This can be accomplished with:</p>
+<p>In some cases, you can improve performance by consuming several messages from the same stream partition in sequence. Samza supports this mode of operation, called <em>batching</em>.</p>
+
+<p>For example, if you want to read 100 messages in a row from each stream partition (regardless of the MessageChooser), you can use this configuration parameter:</p>
 <div class="highlight"><pre><code class="text language-text" data-lang="text">task.consumer.batch.size=100
 </code></pre></div>
-<p>With this setting, Samza will always try and read a message from the last SystemStreamPartition that was read. This behavior will continue until no message is available for the SystemStreamPartition, or the batch size has been reached. In either of these cases, Samza will defer to the MessageChooser to determine the next message to process. It will then try and stick to the new message&#39;s SystemStreamPartition again.</p>
+<p>With this setting, Samza tries to read a message from the most recently used <a href="../api/javadocs/org/apache/samza/system/SystemStreamPartition.html">SystemStreamPartition</a>. This behavior continues either until no more messages are available for that SystemStreamPartition, or until the batch size has been reached. When that happens, Samza defers to the MessageChooser to determine the next message to process. It then again tries to continue consume from the chosen message&#39;s SystemStreamPartition until the batch size is reached.</p>
 
-<h2><a href="checkpointing.html">Checkpointing &raquo;</a></h2>
+<h2><a href="serialization.html">Serialization &raquo;</a></h2>
 
 
           </div>

Modified: incubator/samza/site/learn/documentation/0.7.0/container/windowing.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/container/windowing.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/container/windowing.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/container/windowing.html Mon Jun  9 20:35:57 2014
@@ -72,16 +72,38 @@
           <div class="content">
             <h2>Windowing</h2>
 
-<p>Referring back to the &quot;count PageViewEvent by user ID&quot; example in the <a href="../introduction/architecture.html">Architecture</a> section, one thing that we left out was what we do with the counts. Let&#39;s say that the Samza job wants to update the user ID counts in a database once every minute. Here&#39;s how it would work. The Samza job that does the counting would keep a Map&lt;Integer, Integer&gt; in memory, which maps user IDs to page view counts. Every time a message arrives, the job would take the user ID in the PageViewEvent, and use it to increment the user ID&#39;s count in the in-memory map. Then, once a minute, the StreamTask would update the database (total<em>count += current</em>count) for every user ID in the map, and then reset the count map.</p>
+<p>Sometimes a stream processing job needs to do something in regular time intervals, regardless of how many incoming messages the job is processing. For example, say you want to report the number of page views per minute. To do this, you increment a counter every time you see a page view event. Once per minute, you send the current counter value to an output stream and reset the counter to zero.</p>
 
-<p>Windowing is how we achieve this. If a StreamTask implements the WindowableTask interface, the TaskRunner will call the window() method on the task over a configured interval.</p>
-<div class="highlight"><pre><code class="text language-text" data-lang="text">public interface WindowableTask {
-  void window(MessageCollector collector, TaskCoordinator coordinator);
+<p>Samza&#39;s <em>windowing</em> feature provides a way for tasks to do something in regular time intervals, for example once per minute. To enable windowing, you just need to set one property in your job configuration:</p>
+<div class="highlight"><pre><code class="text language-text" data-lang="text"># Call the window() method every 60 seconds
+task.window.ms=60000
+</code></pre></div>
+<p>Next, your stream task needs to implement the <a href="../api/javadocs/org/apache/samza/task/WindowableTask.html">WindowableTask</a> interface. This interface defines a window() method which is called by Samza in the regular interval that you configured.</p>
+
+<p>For example, this is how you would implement a basic per-minute event counter:</p>
+<div class="highlight"><pre><code class="text language-text" data-lang="text">public class EventCounterTask implements StreamTask, WindowableTask {
+
+  public static final SystemStream OUTPUT_STREAM =
+    new SystemStream(&quot;kafka&quot;, &quot;events-per-minute&quot;);
+
+  private int eventsSeen = 0;
+
+  public void process(IncomingMessageEnvelope envelope,
+                      MessageCollector collector,
+                      TaskCoordinator coordinator) {
+    eventsSeen++;
+  }
+
+  public void window(MessageCollector collector,
+                     TaskCoordinator coordinator) {
+    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
+    eventsSeen = 0;
+  }
 }
 </code></pre></div>
-<p>If you choose to implement the WindowableTask interface, you can use the Samza job&#39;s configuration to define how often the TaskRunner should call your window() method. In the PageViewEvent example (above), you would define it to flush every 60000 milliseconds (60 seconds).</p>
+<p>If you need to send messages to output streams, you can use the <a href="../api/javadocs/org/apache/samza/task/MessageCollector.html">MessageCollector</a> object passed to the window() method. Please only use that MessageCollector object for sending messages, and don&#39;t use it outside of the call to window().</p>
 
-<p>If you need to send messages to output streams, you can use the MessageCollector object passed to the window() method. Please only use that MessageCollector object for sending messages, and don&#39;t use it outside of the call to window().</p>
+<p>Note that Samza uses <a href="event-loop.html">single-threaded execution</a>, so the window() call can never happen concurrently with a process() call. This has the advantage that you don&#39;t need to worry about thread safety in your code (no need to synchronize anything), but the downside that the window() call may be delayed if your process() method takes a long time to return.</p>
 
 <h2><a href="event-loop.html">Event Loop &raquo;</a></h2>
 

Modified: incubator/samza/site/learn/documentation/0.7.0/index.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/index.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/index.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/index.html Mon Jun  9 20:35:57 2014
@@ -103,14 +103,15 @@
 <h4>Container</h4>
 
 <ul class="documentation-list">
-  <li><a href="container/task-runner.html">TaskRunner</a></li>
+  <li><a href="container/samza-container.html">SamzaContainer</a></li>
   <li><a href="container/streams.html">Streams</a></li>
+  <li><a href="container/serialization.html">Serialization</a></li>
   <li><a href="container/checkpointing.html">Checkpointing</a></li>
   <li><a href="container/state-management.html">State Management</a></li>
   <li><a href="container/metrics.html">Metrics</a></li>
   <li><a href="container/windowing.html">Windowing</a></li>
   <li><a href="container/event-loop.html">Event Loop</a></li>
-  <li><a href="container/jmx.html">JMX</a>
+  <li><a href="container/jmx.html">JMX</a></li>
 </ul>
 
 <h4>Jobs</h4>

Modified: incubator/samza/site/learn/documentation/0.7.0/introduction/architecture.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/introduction/architecture.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/introduction/architecture.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/introduction/architecture.html Mon Jun  9 20:35:57 2014
@@ -138,7 +138,7 @@
 
 <p><img src="/img/0.7.0/learn/documentation/introduction/samza-yarn-integration.png" alt="diagram-small"></p>
 
-<p>The Samza client talks to the YARN RM when it wants to start a new Samza job. The YARN RM talks to a YARN NM to allocate space on the cluster for Samza&#39;s ApplicationMaster. Once the NM allocates space, it starts the Samza AM. After the Samza AM starts, it asks the YARN RM for one or more YARN containers to run Samza <a href="../container/task-runner.html">TaskRunners</a>. Again, the RM works with NMs to allocate space for the containers. Once the space has been allocated, the NMs start the Samza containers.</p>
+<p>The Samza client talks to the YARN RM when it wants to start a new Samza job. The YARN RM talks to a YARN NM to allocate space on the cluster for Samza&#39;s ApplicationMaster. Once the NM allocates space, it starts the Samza AM. After the Samza AM starts, it asks the YARN RM for one or more YARN containers to run <a href="../container/samza-container.html">SamzaContainers</a>. Again, the RM works with NMs to allocate space for the containers. Once the space has been allocated, the NMs start the Samza containers.</p>
 
 <h3>Samza</h3>
 
@@ -146,7 +146,7 @@
 
 <p><img src="/img/0.7.0/learn/documentation/introduction/samza-yarn-kafka-integration.png" alt="diagram-small"></p>
 
-<p>The Samza client uses YARN to run a Samza job. The Samza <a href="../container/task-runner.html">TaskRunners</a> run in one or more YARN containers, and execute user-written Samza <a href="../api/overview.html">StreamTasks</a>. The input and output for the Samza StreamTasks come from Kafka brokers that are (usually) co-located on the same machines as the YARN NMs.</p>
+<p>The Samza client uses YARN to run a Samza job: YARN starts and supervises one or more <a href="../container/samza-container.html">SamzaContainers</a>, and your processing code (using the <a href="../api/overview.html">StreamTask</a> API) runs inside those containers. The input and output for the Samza StreamTasks come from Kafka brokers that are (usually) co-located on the same machines as the YARN NMs.</p>
 
 <h3>Example</h3>
 
@@ -157,7 +157,7 @@
 
 <p>In the first job, the grouping is done by sending all messages with the same user ID to the same partition of an intermediate topic. You can do this by using the user ID as key of the messages that are emitted by the first job, and this key is mapped to one of the intermediate topic&#39;s partitions (usually by taking a hash of the key mod the number of partitions). The second job consumes the intermediate topic. Each task in the second job consumes one partition of the intermediate topic, i.e. all the messages for a subset of user IDs. The task has a counter for each user ID in its partition, and the appropriate counter is incremented every time the task receives a message with a particular user ID.</p>
 
-<p><img src="/img/0.7.0/learn/documentation/introduction/group-by-example.png" alt="diagram-large"></p>
+<p><img src="/img/0.7.0/learn/documentation/introduction/group-by-example.png" alt="Repartitioning for a GROUP BY" class="diagram-large"></p>
 
 <p>If you are familiar with Hadoop, you may recognize this as a Map/Reduce operation, where each record is associated with a particular key in the mappers, records with the same key are grouped together by the framework, and then counted in the reduce step. The difference between Hadoop and Samza is that Hadoop operates on a fixed input, whereas Samza works with unbounded streams of data.</p>
 

Modified: incubator/samza/site/learn/documentation/0.7.0/jobs/configuration.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/jobs/configuration.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/jobs/configuration.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/jobs/configuration.html Mon Jun  9 20:35:57 2014
@@ -82,7 +82,7 @@ task.class=samza.task.example.MyJavaStre
 task.inputs=example-system.example-stream
 
 # Serializers
-serializers.registry.json.class=samza.serializers.JsonSerdeFactory
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 
 # Systems
@@ -90,7 +90,14 @@ systems.example-system.samza.factory=sam
 systems.example-system.samza.key.serde=string
 systems.example-system.samza.msg.serde=json
 </code></pre></div>
-<p>There are four major sections to a configuration file. The job section defines things like the name of the job, and whether to use the YarnJobFactory or LocalJobFactory. The task section is where you specify the class name for your StreamTask. It&#39;s also where you define what the input streams are for your task. The serializers section defines the classes of the serdes used for serialization and deserialization of specific objects that are received and sent along different streams. The system section defines systems that your StreamTask can read from along with the types of serdes used for sending keys and messages from that system. Usually, you&#39;ll define a Kafka system, if you&#39;re reading from Kafka, although you can also specify your own self-implemented Samza-compatible systems. See the hello-samza example project&#39;s Wikipedia system for a good example of a self-implemented system.</p>
+<p>There are four major sections to a configuration file:</p>
+
+<ol>
+<li>The job section defines things like the name of the job, and whether to use the YarnJobFactory or LocalJobFactory.</li>
+<li>The task section is where you specify the class name for your <a href="../api/overview.html">StreamTask</a>. It&#39;s also where you define what the <a href="../container/streams.html">input streams</a> are for your task.</li>
+<li>The serializers section defines the classes of the <a href="../container/serialization.html">serdes</a> used for serialization and deserialization of specific objects that are received and sent along different streams.</li>
+<li>The system section defines systems that your StreamTask can read from along with the types of serdes used for sending keys and messages from that system. Usually, you&#39;ll define a Kafka system, if you&#39;re reading from Kafka, although you can also specify your own self-implemented Samza-compatible systems. See the <a href="/startup/hello-samza/0.7.0">hello-samza example project</a>&#39;s Wikipedia system for a good example of a self-implemented system.</li>
+</ol>
 
 <h3>Required Configuration</h3>
 

Modified: incubator/samza/site/learn/documentation/0.7.0/jobs/job-runner.html
URL: http://svn.apache.org/viewvc/incubator/samza/site/learn/documentation/0.7.0/jobs/job-runner.html?rev=1601502&r1=1601501&r2=1601502&view=diff
==============================================================================
--- incubator/samza/site/learn/documentation/0.7.0/jobs/job-runner.html (original)
+++ incubator/samza/site/learn/documentation/0.7.0/jobs/job-runner.html Mon Jun  9 20:35:57 2014
@@ -97,9 +97,7 @@
   ApplicationStatus getStatus();
 }
 </code></pre></div>
-<p>Once the JobRunner gets a job, it calls submit() on the job. This method is what tells the StreamJob implementation to start the TaskRunner. In the case of LocalJobRunner, it uses a run-container.sh script to execute the TaskRunner in a separate process, which will start one TaskRunner locally on the machine that you ran run-job.sh on.</p>
-
-<p><img src="/img/0.7.0/learn/documentation/container/job-flow.png" alt="diagram"></p>
+<p>Once the JobRunner gets a job, it calls submit() on the job. This method is what tells the StreamJob implementation to start the SamzaContainer. In the case of LocalJobRunner, it uses a run-container.sh script to execute the SamzaContainer in a separate process, which will start one SamzaContainer locally on the machine that you ran run-job.sh on.</p>
 
 <p>This flow differs slightly when you use YARN, but we&#39;ll get to that later.</p>