You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jk...@apache.org on 2013/09/12 02:28:21 UTC

git commit: Misc. improvements to the state management docs.

Updated Branches:
  refs/heads/master 0ac53dc5d -> ecbebc6e5


Misc. improvements to the state management docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ecbebc6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ecbebc6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ecbebc6e

Branch: refs/heads/master
Commit: ecbebc6e55eab4841414add77a9a7816fd13a82a
Parents: 0ac53dc
Author: Jay Kreps <ja...@gmail.com>
Authored: Wed Sep 11 17:27:55 2013 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Wed Sep 11 17:27:55 2013 -0700

----------------------------------------------------------------------
 .reviewboardrc                                  |  1 +
 .../0.7.0/container/state-management.md         | 62 ++++++++++++--------
 2 files changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ecbebc6e/.reviewboardrc
----------------------------------------------------------------------
diff --git a/.reviewboardrc b/.reviewboardrc
new file mode 100644
index 0000000..0ee6a71
--- /dev/null
+++ b/.reviewboardrc
@@ -0,0 +1 @@
+REPOSITORY = 'git://git.apache.org/incubator-samza.git'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ecbebc6e/docs/learn/documentation/0.7.0/container/state-management.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/state-management.md b/docs/learn/documentation/0.7.0/container/state-management.md
index a96655c..cafb0f8 100644
--- a/docs/learn/documentation/0.7.0/container/state-management.md
+++ b/docs/learn/documentation/0.7.0/container/state-management.md
@@ -3,9 +3,9 @@ layout: page
 title: State Management
 ---
 
-One of the more interesting aspects of Samza is the ability for tasks to store data locally and execute rich queries against it.
+One of the more interesting aspects of Samza is the ability for tasks to store data locally and execute rich queries on this data.
 
-Of course simple filtering or single-row transformations can be done without any need for collecting state. A simple analogy to SQL may make this more obvious. The select- and where-clauses of a SQL query don't usually require state: these can be executed a row at a time on input data and maintain state between rows. The rest of SQL, multi-row aggregations and joins, require more support to execute correctly in a streaming fashion. Samza doesn't provide a high-level language like SQL but it does provide lower-level primitives that make streaming aggregation and joins and other stateful processing easy to implement.
+Of course simple filtering or single-row transformations can be done without any need for collecting state. A simple analogy to SQL may make this more obvious. The select- and where-clauses of a SQL query don't usually require state: these can be executed a row at a time on input data and maintain state between rows. The rest of SQL, multi-row aggregations and joins, require accumulating state between rows. Samza doesn't provide a high-level language like SQL but it does provide lower-level primitives that make streaming aggregation and joins and other stateful processing easy to implement.
 
 Let's dive into how this works and why it is useful.
 
@@ -17,17 +17,23 @@ First, let's look at some simplistic examples of stateful stream processing that
 
 Example: Counting the number of page views for each user per hour
 
-This kind of windowed processing is common for ranking and relevance, "trending topics", as well as simple real-time reporting and monitoring. For small windows one can just maintain the aggregate in memory and manually commit the task position only at window boundaries. However this means we have to recover up to a full window on fail-over. But this will not be very slow for large windows because of the amount of reprocessing. For larger windows or for effectively infinite windows it is better to make the in-process aggregation fault-tolerant rather than try to recompute it.
+This kind of windowed processing is common for ranking and relevance, detecting "trending topics", as well as simple real-time reporting and monitoring. For small windows one can just maintain the aggregate in memory and manually commit the task position only at window boundaries. However this means we have to recover up to a full window on fail-over, which will be very slow for large windows due to the amount of reprocessing. For large (or infinite!) windows it is better to make the in-process aggregation fault-tolerant rather than try to recompute it.
 
 ##### Table-table join
 
-Example: Join a table of user profiles to a table of user\_settings by user\_id and emit the joined stream
+Example: Join a table of user profiles to a table of user\_settings by user\_id and emit a "materialized view" of the joined stream
 
-This example is somewhat simplistic: one might wonder why you would want to join two tables in a stream processing system. However consider a more realistic example: real-time data normalization. E-commerce companies need to handle product imports, web-crawlers need to update their [database of the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks need to normalize and index social data for search. Each of these processing flows are immensely complex and contain many complex processing stages that effectively join together and normalize many data sources into a single clean feed.
+This example is somewhat simplistic: one might wonder why you would want to join two tables in a stream processing system. However real-life examples are often far more complex then what would normally be considered the domain of materialized views over tables. Consider a few examples of real-time data normalization:
 
-##### Table-stream join
+* E-commerce companies like Amazon and EBay need to import feeds of merchandise from merchants, normalize them by product, and present products with all the associated merchants and pricing information.
+* Web search requires building a crawler which creates essentially a [table of web page contents](http://labs.yahoo.com/files/YahooWebmap.pdf) and joins on all the relevance attributes such as page CTR or pagerank.
+* Social networks take feeds of user-entered text and need to normalize out entities such as companies, schools, and skills.
+
+Each of these use cases is a massively complex data normalization problem that can be thought of as constructing a very complex materialized view over many input tables.
 
-Example: Join user region information to page view data
+##### Stream-table join
+
+Example: Join user region information on to a stream of page views to create an augmented stream of page view with region.
 
 Joining side-information to a real-time feed is a classic use for stream processing. It's particularly common in advertising, relevance ranking, fraud detection and other domains. Activity data such as page views are generally captured with only a few primary keys, the additional attributes about the viewer and viewed items that are needed for processing need to joined on after-the-fact.
 
@@ -47,7 +53,7 @@ So how do systems support this kind of stateful processing? We'll lead in by des
 
 #### In-memory state with checkpointing
 
-A simple approach common in academic stream processing systems is to simply to periodically save out the state of the task's in-memory data. S4's [state management](http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance) implements this approach&mdash;tasks implement Java's serializable interface and are periodically serialized using java serialization to save out copies of the processor state.
+A simple approach, common in academic stream processing systems, is to periodically save out the state of the task's in-memory data. S4's [state management](http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance) implements this approach&mdash;tasks implement Java's serializable interface and are periodically serialized using java serialization to save out copies of the processor state.
 
 This approach works well enough if the in-memory state consists of only a few values. However since you have to save out the complete task state on each save this will become increasingly expensive as task state grows. Unfortunately most use cases we have seen revolve around joins and aggregation and so have large amounts of state&mdash;often many gigabytes. This makes periodic full dumps extremely impractical. Some academic systems handle this case by having the tasks produce "diffs" in addition to full checkpoints. However this requires a great deal of complexity in the task to track what has changed and efficiently produce a compact diff of changes.
 
@@ -64,7 +70,7 @@ Samza allows this style of processing (nothing will stop you from querying a rem
 To understand why this is useful let's first understand some of the drawbacks of making remote queries in a stream processing job:
 
 1. **Performance**: The first major drawback of making remote queries is that they are slow and expensive. For example, a Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core because it transfers large chunks of data at a time. But a remote database query is a more expensive proposition. Though the database may be partitioned and scalable this partitioning doesn't match the partitioning of the job into tasks so batching becomes much less effective. As a result you would expect to get a few thousand queries per second per core for remote requests. This means that adding a processing stage that uses an external database will often reduce the throughput by several orders of magnitude.
-1. **Isolation**: If your database or service is also running live processing, mixing in asynchronous processing can be quite dangerous. A scalable stream processing system can run with very high parallelism. If such a job comes down (say for a code push) it queues up data for processing, when it restarts it will potentially have a large backlog of data to process. Since the job may actually have very high parallelism this can result in huge load spikes, many orders of magnitude higher than steady state load. If this load is mixed with live queries (i.e. the queries used to build web pages or render mobile ui or anything else that has a user waiting on the other end) then you may end up causing a denial-of-service attack on your live service.
+1. **Isolation**: If your database or service is also running live processing, mixing in asynchronous processing can be quite dangerous. A scalable stream processing system can run with very high parallelism. If such a job comes down (say for a code push) it queues up data for processing, when it restarts it will potentially have a large backlog of data to process. Since the job may actually have very high parallelism this can result in huge load spikes, many orders of magnitude higher than steady state load. If this load is mixed with live queries (i.e. the queries used to build web pages or render mobile UI or anything else that has a user waiting on the other end) then you may end up causing a denial-of-service attack on your live service.
 1. **Query Capabilities**: Many scalable databases expose very limited query interfaces--only supporting simple key-value lookups. Doing the equivalent of a "full table scan" or rich traversal may not be practical in this model.
 1. **Correctness**: If your task keeps counts or otherwise modifies state in a remote store how is this rolled back if the task fails? 
 
@@ -78,28 +84,40 @@ You can think of this as taking the remote table out of the remote database and
 
 ![state-local](/img/0.7.0/learn/documentation/container/stateful_job.png)
 
-Note that now the state is physically on the same machine as the tasks, and each task has access only to its local partition. However the combination of stateful tasks with the normal partitioning capabilities Samza offers makes this a very general feature: in general you just repartition on the key by which you want to split your processing and then you have full local access to the data within storage in that partition.
-
-In cases where we were querying the external database on each input message to join on additional data for our output stream we would now instead create an input stream coming from the remote database that captures the changes to the database.
+Note that now the state is physically on the same machine as the tasks, and each task has access only to its local partition. However the combination of stateful tasks with the normal partitioning capabilities Samza offers makes this a very general feature: you just repartition on the key by which you want to split your processing and then you have full local access to the data within storage in that partition.
 
 Let's look at how this addresses the problems of the remote store:
 
 1. This fixes the performance issues of remote queries because the data is now local, what would otherwise be a remote query may now just be a lookup against local memory or disk (we ship a [LevelDB](https://code.google.com/p/leveldb)-based store which is described in detail below).
-1. The isolation issues goes away as well as the queries are executed against the same servers the job runs against and this computation is not intermixed with live service calls.
+1. The isolation issue goes away as well as the queries are executed against the same servers the job runs against and this computation is not intermixed with live service calls.
 1. Data is now local so any kind of data-intensive processing, scans, and filtering is now possible.
-1. The store can abide by the same delivery and fault-tolerance guarantees that the Samza task itself does.
+1. Since the state changes are themselves modeled as a stream the store can abide by the same delivery and fault-tolerance guarantees that Samza gives tasks.
+
+This isn't always the right pattern to follow, it has a few drawbacks too.
+1. If the data is very large then storing it with each task that uses the data may use more space.
+1. As the per-container data size grows so too will the restore time for a failed task (50Mb/sec is a reasonable restore time to expect).
+
+However we find that the local state approach is the best more often than not, and, of course, nothing prevents the use of external storage when needed.
+
+### Databases as input streams
+
+In cases where we were querying the external database on each input message we can transform this to local processing by instead transforming the database into a stream of row changes. These changes can be taken as input by a task, stored, and queried against just as the remote database would be.
+
+But how can you get such a stream? Many databases such Oracle, HBase, MySQL, and MongoDB offer built-in support for directly capturing changes. If not this can be done by publishing a stream of changes to Kafka or by implementing our [pluggable stream interface](streams.html) to directly poll the database for changes (say by some last_modified timestamp). You want this to be done in a way that you can reset the offset or timestamp back to zero to replay the current state of the database as changes if you need to reprocess. If this stream capture is efficient enough you can often avoid having changelogs for your tasks and simply replay them from the source when they fail.
+
+A wonderful contribution would be a generic jdbc-based stream implementation for extracting changes from relational databases by modified date.
 
 ### Key-value storage
 
-Though the storage format is pluggable, we provide a key-value store implementation to tasks out-of-the-box and gives the usual put/get/delete/range queries. This is backed by a highly available "changelog" stream that provides fault-tolerance by acting as a kind of [redo log](http://en.wikipedia.org/wiki/Redo_log) for the task's state (we describe this more in the next section).
+Though the storage format is pluggable, we provide a key-value store implementation to tasks out-of-the-box that gives the usual put/get/delete/range queries. This is backed by a highly available "changelog" stream that provides fault-tolerance by acting as a kind of [redo log](http://en.wikipedia.org/wiki/Redo_log) for the task's state (we describe this more in the next section).
 
-This key-value storage engine is built on top of [LevelDB](https://code.google.com/p/leveldb). LevelDB has several nice properties. First it maintains data outside the java heap which means it is immediately preferable to any simple approach using a hash table both because of memory-efficiency and to avoid GC. It will use an off-heap memory cache and when that is exhausted go to disk for lookups&mdash;so small data sets can be [very fast](https://code.google.com/p/leveldb) and non-memory-resident datasets, though slower, are still possible. It is [log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/) and writes can be performed at close to disk speeds. It also does built-in block compression which helps to reduce both I/O and memory usage.
+This key-value storage engine is built on top of [LevelDB](https://code.google.com/p/leveldb) using a [LevelDB JNI API](https://github.com/fusesource/leveldbjni). LevelDB has several nice properties. First it maintains data outside the java heap which means it is immediately preferable to any simple approach using a hash table both because of memory-efficiency and to avoid GC. It will use an off-heap memory cache and when that is exhausted go to disk for lookups&mdash;so small data sets can be [very fast](https://code.google.com/p/leveldb) and non-memory-resident datasets, though slower, are still possible. It is [log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/) and writes can be performed at close to disk speeds. It also does built-in block compression which helps to reduce both I/O and memory usage.
 
-The nature of Samza's usage allows us to optimize this further. We add an optional "L1" LRU cache which is in-heap and holds deserialized rows. This cache is meant to be very small and let's us introduce several optimizations for both reads and writes.
+The nature of Samza's usage allows us to optimize this further. We add an optional "L1" LRU cache which is in-heap and holds deserialized rows. This cache is meant to be very small and lets us introduce several optimizations for both reads and writes.
 
 The cache is an "object" or "row" cache&mdash;that is it maintains the java objects stored with no transformation or serialization. This complements LevelDB's own block level caching well. Reads and writes both populate the cache, and reads on keys in the cache avoid the cost of deserialization for these very common objects.
 
-For writes the cache provides two benefits. Since LevelDB is itself really only a persistent "cache" in our architecture we do not immediately need to apply every write to the filesystem. We can batch together a few hundred writes and apply them all at once. LevelDB heavily optimizes this kind of batch write. This does not impact consistency&mdash;a task always reads what it wrote (since it checks the cache first). Secondly the cache effectively deduplicates updates so that if multiple updates to the same key occur close together we can optimize away all but the final write to leveldb and the changelog. For example, an important use case is maintaining a small number of counters that are incremented on every input. A naive implementation would need to write out each new value to LevelDB as well as perhaps logging the change out to the changelog for the task. In the extreme case where you had only a single variable, x, incremented on each input, an uncached implementation would produ
 ce writes in the form "x=1", "x=2", "x=3", etc which is quite inefficient. This is overkill, we only need to flush to the changelog at [commit points](checkpointing.html) not on every write. This allows us to "deduplicate" the writes that go to leveldb and the changelog to just the final value before the commit point ("x=3" or whatever it happened to be).
+For writes the cache provides two benefits. Since LevelDB is itself really only a persistent "cache" in our architecture we do not immediately need to apply every write to the filesystem. We can batch together a few hundred writes and apply them all at once. LevelDB heavily optimizes this kind of batch write. This does not impact consistency&mdash;a task always reads what it wrote (since it checks the cache first and is the only writer to its store). Secondly the cache effectively deduplicates updates so that if multiple updates to the same key occur close together we can optimize away all but the final write to leveldb and the changelog. For example, an important use case is maintaining a small number of counters that are incremented on every input. A naive implementation would need to write out each new value to LevelDB as well as perhaps logging the change out to the changelog for the task. In the extreme case where you had only a single variable, x, incremented on each input, an
  uncached implementation would produce writes in the form "x=1", "x=2", "x=3", etc which is quite inefficient. This is overkill, we only need to flush to the changelog at [commit points](checkpointing.html) not on every write. This allows us to "deduplicate" the writes that go to leveldb and the changelog to just the final value before the commit point ("x=3" or whatever it happened to be).
 
 The combination of these features makes it possible to provide highly available processing that performs very close to memory speeds for small datasets yet still scales up to TBs of data (partitioned up across all the tasks).
 
@@ -111,7 +129,7 @@ The answer is that Samza handles state as just another stream. There are two mec
 
 The first approach is just to allow the task to replay one or more of its input streams to populate its store when it restarts. This works well if the input stream maintains the complete data (as a stream fed by a database table might) and if the input stream is fast enough to make this practical. This requires no framework support.
 
-However often the state that is stored is much smaller than the input stream (because is it an aggregation or projection of the original input streams). Or the input stream may not maintain a complete, replayable set of inputs (say for event logs). To support these cases we provide the ability to back the state of the store with a changelog stream. A changelog is just a stream to which the task logs each change to its state&mdash;i.e. the sequence of key-value pairs applied to the local store. Changelogs are co-partitioned with their tasks (so each task has its own stream partition for which it is the only writer).
+However often the state that is stored is much smaller than the input stream (because it is an aggregation or projection of the original input streams). Or the input stream may not maintain a complete, replayable set of inputs (say for event logs). To support these cases we provide the ability to back the state of the store with a changelog stream. A changelog is just a stream to which the task logs each change to its state&mdash;i.e. the sequence of key-value pairs applied to the local store. Changelogs are co-partitioned with their tasks (so each task has its own stream partition for which it is the only writer).
 
 The changelogs are just normal streams&mdash;other downstream tasks can subscribe to this state and use it. And it turns out that very often the most natural way to represent the output of a job is as the changelog of its task (we'll show some examples in a bit).
 
@@ -247,12 +265,6 @@ Note: In this example we are assuming that impressions are assigned a unique gui
 
 Implementation: Partition the ad click and ad impression streams by the impression id or user id. The task keeps a store of unmatched clicks and unmatched impressions. When a click comes in we try to find its matching impression in the impression store, and vice versa. If a match is found emit the joined pair and delete the entry. If no match is found store the event to wait for a match. Since this is presumably a left outer join (i.e. every click has a corresponding impression but not vice versa) we will periodically scan the impression table and delete old impressions for which no click arrived.
 
-### Databases as streams
-
-One assumption we are making in the above is that you can extract a stream of changes from your databases. This could be done by publishing a stream of changes to Kafka or by implementing our [pluggable stream interface](streams.html) to directly poll the database for changes. You want this to be done in a way that you can reset the offset or timestamp back to zero to replay the current state of the database as changes if you need to reprocess. If this stream capture is efficient enough you can often avoid having changelogs for your tasks and simply replay them from the source when they fail.
-
-A wonderful contribution would be a generic jdbc-based stream implementation for extracting changes from relational databases.
-
 ### Implementing storage engines
 
 We mentioned that the storage engine interface was pluggable. Of course you can use any data structure you like in your task provided you can repopulate it off your inputs on failure. However to plug into our changelog infrastructure you need to implement a generic StorageEngine interface that handles restoring your state on failure and ensures that data is flushed prior to commiting the task position.