You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jay Kreps <ja...@gmail.com> on 2013/09/06 02:14:35 UTC

state management docs

I took a pass at improving the state management documentation (talking to
people, I don't think anyone understood what we were saying):
http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html

I would love to get some feedback on this, especially from anyone who
doesn't already know Samza. Does this make any sense? Does it tell you what
you need to know in the right order?

-Jay

RE: state management docs

Posted by Guozhang Wang <gu...@linkedin.com>.
1. How about "sequential logging stream"?

Guozhang
________________________________________
From: Jay Kreps [jay.kreps@gmail.com]
Sent: Monday, September 09, 2013 9:40 PM
To: dev@samza.incubator.apache.org
Subject: Re: state management docs

Thanks for the feedback Guozhang! Those are excellent points!

1. Well technically none of the output kafka topics are necessarily
collocated with the task producing to that topic. The little databases in
the boxes are meant to be the leveldbs, perhaps I should label them? I do
think the terminology (stream vs log) is quite confusing. Unfortunately the
terminology log tends to be very confusing to non-database people who think
logs are text files! What do you think would be the least confusing set of
terms?

2. This is a good point, I will add that.

3. Yes to give exact consistency in the presense of faults a few things are
required, one of which is restoring to the same point between changelog,
inputs, and output.

4. Yes but when the task fails it will restart on another machine that
doesn't have the leveldb image on disk. The one exception to this is when
the user kills their own job. In this case it would be nice to have an
optimization that let's us preferentially reuse the same machine to avoid
rebuilding the leveldb.

-Jay


On Mon, Sep 9, 2013 at 6:43 PM, Guozhang Wang <gu...@linkedin.com> wrote:

> Well written doc. I think I can understand every point even with limited
> knowledge of Samza :)
>
> A few comments:
>
> 1) stateful_job.png
>
> Could you make the changelog streams into the box of tasks to demo they
> are collocated? And could we also add the KV stores into the box? At first
> glance I thought the changelog is actually the store. Also the name
> "changelog stream" is a little confusing. At first I was wondering why this
> is called a stream and would it be just a "log". I only get this while
> reading the fault tolerance section.
>
> 2) "This does not impact consistency—a task always reads what it wrote
> (since it checks the cache first)"
>
> I think another main reason is that a task only read/write its own
> partition.
>
> 3) Do we need guarantee consistency between the incoming stream and the
> change log stream. For example, the change log should indicate that this
> change entry is for the state processed until offset X of the incoming
> stream so that on failover we know where we should set the offset of the
> incoming stream?
>
> 4) Would reading directly from LevelDB a better approach than replaying
> the whole (although compacted) changelog? Writes to LevelDB should be
> persistent, and we only need to make sure we have checkpoints, for example,
> the current state (excluding writes that are still in cache) represent all
> the changes till the offset of the changelog?
>
> Guozhang
>
>
>
> ________________________________________
> From: Jay Kreps [jay.kreps@gmail.com]
> Sent: Thursday, September 05, 2013 5:14 PM
> To: dev@samza.incubator.apache.org
> Subject: state management docs
>
> I took a pass at improving the state management documentation (talking to
> people, I don't think anyone understood what we were saying):
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
>
> I would love to get some feedback on this, especially from anyone who
> doesn't already know Samza. Does this make any sense? Does it tell you what
> you need to know in the right order?
>
> -Jay
>

Re: state management docs

Posted by Jay Kreps <ja...@gmail.com>.
Thanks for the feedback Guozhang! Those are excellent points!

1. Well technically none of the output kafka topics are necessarily
collocated with the task producing to that topic. The little databases in
the boxes are meant to be the leveldbs, perhaps I should label them? I do
think the terminology (stream vs log) is quite confusing. Unfortunately the
terminology log tends to be very confusing to non-database people who think
logs are text files! What do you think would be the least confusing set of
terms?

2. This is a good point, I will add that.

3. Yes to give exact consistency in the presense of faults a few things are
required, one of which is restoring to the same point between changelog,
inputs, and output.

4. Yes but when the task fails it will restart on another machine that
doesn't have the leveldb image on disk. The one exception to this is when
the user kills their own job. In this case it would be nice to have an
optimization that let's us preferentially reuse the same machine to avoid
rebuilding the leveldb.

-Jay


On Mon, Sep 9, 2013 at 6:43 PM, Guozhang Wang <gu...@linkedin.com> wrote:

> Well written doc. I think I can understand every point even with limited
> knowledge of Samza :)
>
> A few comments:
>
> 1) stateful_job.png
>
> Could you make the changelog streams into the box of tasks to demo they
> are collocated? And could we also add the KV stores into the box? At first
> glance I thought the changelog is actually the store. Also the name
> "changelog stream" is a little confusing. At first I was wondering why this
> is called a stream and would it be just a "log". I only get this while
> reading the fault tolerance section.
>
> 2) "This does not impact consistency—a task always reads what it wrote
> (since it checks the cache first)"
>
> I think another main reason is that a task only read/write its own
> partition.
>
> 3) Do we need guarantee consistency between the incoming stream and the
> change log stream. For example, the change log should indicate that this
> change entry is for the state processed until offset X of the incoming
> stream so that on failover we know where we should set the offset of the
> incoming stream?
>
> 4) Would reading directly from LevelDB a better approach than replaying
> the whole (although compacted) changelog? Writes to LevelDB should be
> persistent, and we only need to make sure we have checkpoints, for example,
> the current state (excluding writes that are still in cache) represent all
> the changes till the offset of the changelog?
>
> Guozhang
>
>
>
> ________________________________________
> From: Jay Kreps [jay.kreps@gmail.com]
> Sent: Thursday, September 05, 2013 5:14 PM
> To: dev@samza.incubator.apache.org
> Subject: state management docs
>
> I took a pass at improving the state management documentation (talking to
> people, I don't think anyone understood what we were saying):
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
>
> I would love to get some feedback on this, especially from anyone who
> doesn't already know Samza. Does this make any sense? Does it tell you what
> you need to know in the right order?
>
> -Jay
>

RE: state management docs

Posted by Guozhang Wang <gu...@linkedin.com>.
Well written doc. I think I can understand every point even with limited knowledge of Samza :)

A few comments:

1) stateful_job.png

Could you make the changelog streams into the box of tasks to demo they are collocated? And could we also add the KV stores into the box? At first glance I thought the changelog is actually the store. Also the name "changelog stream" is a little confusing. At first I was wondering why this is called a stream and would it be just a "log". I only get this while reading the fault tolerance section.

2) "This does not impact consistency—a task always reads what it wrote (since it checks the cache first)"

I think another main reason is that a task only read/write its own partition.

3) Do we need guarantee consistency between the incoming stream and the change log stream. For example, the change log should indicate that this change entry is for the state processed until offset X of the incoming stream so that on failover we know where we should set the offset of the incoming stream?

4) Would reading directly from LevelDB a better approach than replaying the whole (although compacted) changelog? Writes to LevelDB should be persistent, and we only need to make sure we have checkpoints, for example, the current state (excluding writes that are still in cache) represent all the changes till the offset of the changelog?

Guozhang



________________________________________
From: Jay Kreps [jay.kreps@gmail.com]
Sent: Thursday, September 05, 2013 5:14 PM
To: dev@samza.incubator.apache.org
Subject: state management docs

I took a pass at improving the state management documentation (talking to
people, I don't think anyone understood what we were saying):
http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html

I would love to get some feedback on this, especially from anyone who
doesn't already know Samza. Does this make any sense? Does it tell you what
you need to know in the right order?

-Jay

Re: state management docs

Posted by Jay Kreps <ja...@gmail.com>.
Thanks for the feedback, comments inline!


On Mon, Sep 9, 2013 at 6:46 PM, Chris Riccomini <cr...@linkedin.com>wrote:

> Hey Jay,
>
> Here are my notes.
>
> 1. I don't follow this:
>
> "However this means we have to recover up to a full window on fail-over.
> But this will not be very slow for large windows because of the amount of
> reprocessing. For larger windows or for effectively infinite windows it is
> better to make the in-process aggregation fault-tolerant rather than try
> to recompute it."
>
> Was this supposed to be, "But this will not be very slow for small
> windowsŠ"?
>

Yeah this has one too many negations fixed.


> 2. I'm not sure I understand what the example (user settings - user
> profiles) would be used for in the table-table join section.
>

Tried to improve, take a look and let me know what you think.

3. Recommend reversing the order of the phrasing in the table-stream join
> section to, "Example: Join page view data to user region information." For
> some reason my brain grasps it better when it's posed as stream->table
> join in the example.
>

Good point, clarified what is being "joined on".


> 4. "systems is to simply to periodically" .. one to too many? (ho ho :)
>

Fixed.

5. "mobile ui" -> "mobile UI"
>

Fixed.

6. "general feature: in general" .. Eliminate the second "in general".
>

Fixed.

7. I think this statement needs to be expanded upon: "In cases where we
> were querying the external database on each input message to join on
> additional data for our output stream we would now instead create an input
> stream coming from the remote database that captures the changes to the
> database."
>
> This is kind of key for table-table and table-stream joins, and it's not
> something that most people think about, or have. An example would suffice.
>

Agreed, I moved up the "Databases as input streams" section to right after
this example.

 8. "isolation issues goes away" -> "isolation issue goes away"
>

Fixed.


> 9. Should have a draw-backs section for the Samza approach. Namely lack of
> isolation within the machine (disk usage, iops, etc), and potentially slow
> restart time when recovering large state.
>

Added


> 10. "The store can abide by the same delivery and fault-tolerance
> guarantees that the Samza task itself does." .. Since it's change log is
> modeled as a stream.
>

Added that clarification.

11. "out-of-the-box and gives" -> "out-of-the-box that gives"
>

fixed


> 12. "let's us" -> "lets us"
>

fixed


> 13. "because is it an aggregation" -> "because it is an aggregation"
>

fixed


> 14. "stores.my-store.changelog=my-stream-name" Was there anything special
> about the change log stream? I recall there being some strange-ness with
> serdes, but I don't remember exactly what it was. Something like, if no
> serde is defined and the msg is a byte array, we don't double serialize?
>

Not that is relevant to the user, right?


> 15. Call out that the LevelDB-Java implementation that we're using is
> running with JNI. Just good to be aware, in case people see weird off-heap
> issues, or segfaults in their tasks.
>

Will do.


>
> Cheers,
> Chris
>
> On 9/9/13 12:58 PM, "Jay Kreps" <ja...@gmail.com> wrote:
>
> >1. Yeah I think maybe the confusion is that we give the examples of
> >stateful processing without saying why. I tried to make it a little more
> >clear.
> >2. Tried to make the transition a little more clear. Samza kind of assumes
> >at least passing familiarity with Kafka so to some extent this is
> >unavoidable, but I think the problem is that it isn't clear why we are
> >talking about kafka (a stream implementation).
> >3. Not sure how to clarify the diagram (any suggestions?), but hopefully
> >improved the text.
> >
> >Let me know if you feel this helps:
> >
> >diff --git
> >a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn
> >/documentation/0.7.0/container/
> >state-management.md
> >index c23c7c3..2f9b740 100644
> >--- a/docs/learn/documentation/0.7.0/container/state-management.md
> >+++ b/docs/learn/documentation/0.7.0/container/state-management.md
> >@@ -5,25 +5,25 @@ title: State Management
> >
> > One of the more interesting aspects of Samza is the ability for tasks to
> >store data locally and execute rich queries against it.
> >
> >-Of course simple filtering or single-row transformations can be done
> >without any need for collecting state. A simple analogy to SQL may make
> >make this more obvious. The select- and where-clauses of a SQL query don't
> >usually require state: these can be executed a row at a time on input data
> >and maintain state between rows. The rest of SQL, multi-row aggregations
> >and joins, require more support to execute correctly in a streaming
> >fashion. Samza doesn't provide a high-level language like SQL but it does
> >provide lower-level primitives that make streaming aggregation and joins
> >and other stateful processing easy to implement.
> >+Of course simple filtering or single-row transformations can be done
> >without any need for collecting state. A simple analogy to SQL may make
> >this more obvious. The select- and where-clauses of a SQL query don't
> >usually require state: these can be executed a row at a time on input data
> >and maintain state between rows. The rest of SQL, multi-row aggregations
> >and joins, require more support to execute correctly in a streaming
> >fashion. Samza doesn't provide a high-level language like SQL but it does
> >provide lower-level primitives that make streaming aggregation and joins
> >and other stateful processing easy to implement.
> >
> > Let's dive into how this works and why it is useful.
> >
> > ### Common use cases for stateful processing
> >
> >-First, let's look at some simplistic examples of stateful stream
> >processing that might be seen on a consumer website.
> >+First, let's look at some simplistic examples of stateful stream
> >processing that might be seen on a consumer website. Later in this
> >document
> >we'll go through specific details of using Samza's built-in key-value
> >storage capabilities to implement each of these applications.
> >
> > ##### Windowed aggregation
> >
> > Example: Counting the number of page views for each user per hour
> >
> >-This kind of windowed processing is common for ranking and relevance,
> >"trending topics", as well as simple real-time reporting and monitoring.
> >+This kind of windowed processing is common for ranking and relevance,
> >"trending topics", as well as simple real-time reporting and monitoring.
> >For small windows one can just maintain the aggregate in memory and
> >manually commit the task position only at window boundaries. However this
> >means we have to recover up to a full window on fail-over. But this will
> >not be very slow for large windows because of the amount of reprocessing.
> >For larger windows or for effectively infinite windows it is better to
> >make
> >the in-process aggregation fault-tolerant rather than try to recompute it.
> >
> > ##### Table-table join
> >
> > Example: Join a table of user profiles to a table of user\_settings by
> >user\_id and emit the joined stream
> >
> >-This example is somewhat simplistic: one might wonder why you would want
> >to join two tables in a stream processing system. However consider a more
> >realistic example: real-time data normalization. E-commerce companies need
> >to handle product imports, web-crawlers need to update their [database of
> >the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need
> >to normalize and index social data for search. Each of these processing
> >flows are emensely complex and contain many complex processing stages that
> >effectively join together and normalize many data sources into a single
> >clean feed.
> >+This example is somewhat simplistic: one might wonder why you would want
> >to join two tables in a stream processing system. However consider a more
> >realistic example: real-time data normalization. E-commerce companies need
> >to handle product imports, web-crawlers need to update their [database of
> >the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social
> networks
> >need to normalize and index social data for search. Each of these
> >processing flows are immensely complex and contain many complex processing
> >stages that effectively join together and normalize many data sources into
> >a single clean feed.
> >
> > ##### Table-stream join
> >
> >@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state
> >consists of only a few va
> >
> > #### Using an external store
> >
> >-In the absence of built-in support a common pattern for stateful
> >processing is to push any state that would be accumulated between rows
> >into
> >an external database or key-value store. You get something that looks like
> >this:
> >+In the absence of built-in support a common pattern for stateful
> >processing is to push any state that would be accumulated between rows
> >into
> >an external database or key-value store. The database holds aggregates or
> >the dataset being queried to enrich the incoming stream. You get something
> >that looks like this:
> >
> >
> >![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_
> >db.png)
> >
> >@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop
> >you from querying a rem
> >
> > To understand why this is useful let's first understand some of the
> >drawbacks of making remote queries in a stream processing job:
> >
> >-1. **Performance**: The first major drawback of making remote queries is
> >that they are slow and expensive. A Kafka stream can deliver hundreds of
> >thousands or even millions of messages per second per CPU core because it
> >transfers large chunks of data at a time. But a remote database query is a
> >more expensive proposition. Though the database may be partitioned and
> >scalable this partitioning doesn't match the partitioning of the job into
> >tasks so batching becomes much less effective. As a result you would
> >expect
> >to get a few thousand queries per second per core for remote requests.
> >This
> >means that adding a processing stage that uses an external database will
> >often reduce the throughput by several orders of magnitude.
> >+1. **Performance**: The first major drawback of making remote queries is
> >that they are slow and expensive. For example, a Kafka stream can deliver
> >hundreds of thousands or even millions of messages per second per CPU core
> >because it transfers large chunks of data at a time. But a remote database
> >query is a more expensive proposition. Though the database may be
> >partitioned and scalable this partitioning doesn't match the partitioning
> >of the job into tasks so batching becomes much less effective. As a result
> >you would expect to get a few thousand queries per second per core for
> >remote requests. This means that adding a processing stage that uses an
> >external database will often reduce the throughput by several orders of
> >magnitude.
> > 1. **Isolation**: If your database or service is also running live
> >processing, mixing in asynchronous processing can be quite dangerous. A
> >scalable stream processing system can run with very high parallelism. If
> >such a job comes down (say for a code push) it queues up data for
> >processing, when it restarts it will potentially have a large backlog of
> >data to process. Since the job may actually have very high parallelism
> >this
> >can result in huge load spikes, many orders of magnitude higher than
> >steady
> >state load. If this load is mixed with live queries (i.e. the queries used
> >to build web pages or render mobile ui or anything else that has a user
> >waiting on the other end) then you may end up causing a denial-of-service
> >attack on your live service.
> > 1. **Query Capabilities**: Many scalable databases expose very limited
> >query interfaces--only supporting simple key-value lookups. Doing the
> >equivalent of a "full table scan" or rich traversal may not be practical
> >in
> >this model.
> > 1. **Correctness**: If your task keeps counts or otherwise modifies state
> >in a remote store how is this rolled back if the task fails?
> >@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of
> >the remote database and
> >
> > Note that now the state is physically on the same machine as the tasks,
> >and each task has access only to its local partition. However the
> >combination of stateful tasks with the normal partitioning capabilities
> >Samza offers makes this a very general feature: in general you just
> >repartition on the key by which you want to split your processing and then
> >you have full local access to the data within storage in that partition.
> >
> >+In cases where we were querying the external database on each input
> >message to join on additional data for our output stream we would now
> >instead create an input stream coming from the remote database that
> >captures the changes to the database.
> >+
> > Let's look at how this addresses the problems of the remote store:
> >
> > 1. This fixes the performance issues of remote queries because the data
> >is
> >now local, what would otherwise be a remote query may now just be a lookup
> >against local memory or disk (we ship a [LevelDB](
> >https://code.google.com/p/leveldb)-based store which is described in
> >detail
> >below).
> >
> >
> >
> >
> >
> >On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil
> ><te...@gmail.com>wrote:
> >
> >> I am probably the dumbest person on this list in terms of technical
> >> know-how, but hey,.. if I can understand the doc, then most people would
> >> understand easily :)
> >>
> >> Some comments:
> >>
> >> (1) What does a typical task state consist of ? An explicit example of
> >> "task state" would be helpful. There are couple of good examples in the
> >>doc
> >> but none of them say "hey, for this use case the task state is .."
> >>
> >> (2) "The problems of remote stores" -> "Performance":
> >> Before this point, there was no reference of Kafka at all in the doc and
> >> you suddenly start comparing things with Kafka stream. People w/o any
> >>Kafka
> >> background would not get that part.
> >>
> >> (3) "Approaches to managing task state" -> "Using an external store"
> >> The figure gave me an impression that tasks' o/p goes to 2 places: o/p
> >> stream and external store. However reading further made me realize that
> >>we
> >> just the task state to the external DB
> >>  which is different from o/p stream...right ?
> >>
> >> Trivial things:
> >> - "A simple analogy to SQL may make make this more obvious." : Word
> >>"make"
> >> occurs twice
> >> - The hyperlink for "database of the web" is not working
> >>
> >> Thanks,
> >> Tejas
> >>
> >>
> >> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <ja...@gmail.com> wrote:
> >>
> >> > I took a pass at improving the state management documentation
> >>(talking to
> >> > people, I don't think anyone understood what we were saying):
> >> >
> >> >
> >>
> >>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
> >>te-management.html
> >> >
> >> > I would love to get some feedback on this, especially from anyone who
> >> > doesn't already know Samza. Does this make any sense? Does it tell you
> >> what
> >> > you need to know in the right order?
> >> >
> >> > -Jay
> >> >
> >>
>
>

Re: state management docs

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Jay,

Here are my notes.

1. I don't follow this:

"However this means we have to recover up to a full window on fail-over.
But this will not be very slow for large windows because of the amount of
reprocessing. For larger windows or for effectively infinite windows it is
better to make the in-process aggregation fault-tolerant rather than try
to recompute it."

Was this supposed to be, "But this will not be very slow for small
windowsŠ"?

2. I'm not sure I understand what the example (user settings - user
profiles) would be used for in the table-table join section.

3. Recommend reversing the order of the phrasing in the table-stream join
section to, "Example: Join page view data to user region information." For
some reason my brain grasps it better when it's posed as stream->table
join in the example.

4. "systems is to simply to periodically" .. one to too many? (ho ho :)

5. "mobile ui" -> "mobile UI"

6. "general feature: in general" .. Eliminate the second "in general".

7. I think this statement needs to be expanded upon: "In cases where we
were querying the external database on each input message to join on
additional data for our output stream we would now instead create an input
stream coming from the remote database that captures the changes to the
database."

This is kind of key for table-table and table-stream joins, and it's not
something that most people think about, or have. An example would suffice.

8. "isolation issues goes away" -> "isolation issue goes away"

9. Should have a draw-backs section for the Samza approach. Namely lack of
isolation within the machine (disk usage, iops, etc), and potentially slow
restart time when recovering large state.

10. "The store can abide by the same delivery and fault-tolerance
guarantees that the Samza task itself does." .. Since it's change log is
modeled as a stream.

11. "out-of-the-box and gives" -> "out-of-the-box that gives"

12. "let's us" -> "lets us"

13. "because is it an aggregation" -> "because it is an aggregation"

14. "stores.my-store.changelog=my-stream-name" Was there anything special
about the change log stream? I recall there being some strange-ness with
serdes, but I don't remember exactly what it was. Something like, if no
serde is defined and the msg is a byte array, we don't double serialize?

15. Call out that the LevelDB-Java implementation that we're using is
running with JNI. Just good to be aware, in case people see weird off-heap
issues, or segfaults in their tasks.

Cheers,
Chris

On 9/9/13 12:58 PM, "Jay Kreps" <ja...@gmail.com> wrote:

>1. Yeah I think maybe the confusion is that we give the examples of
>stateful processing without saying why. I tried to make it a little more
>clear.
>2. Tried to make the transition a little more clear. Samza kind of assumes
>at least passing familiarity with Kafka so to some extent this is
>unavoidable, but I think the problem is that it isn't clear why we are
>talking about kafka (a stream implementation).
>3. Not sure how to clarify the diagram (any suggestions?), but hopefully
>improved the text.
>
>Let me know if you feel this helps:
>
>diff --git 
>a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn
>/documentation/0.7.0/container/
>state-management.md
>index c23c7c3..2f9b740 100644
>--- a/docs/learn/documentation/0.7.0/container/state-management.md
>+++ b/docs/learn/documentation/0.7.0/container/state-management.md
>@@ -5,25 +5,25 @@ title: State Management
>
> One of the more interesting aspects of Samza is the ability for tasks to
>store data locally and execute rich queries against it.
>
>-Of course simple filtering or single-row transformations can be done
>without any need for collecting state. A simple analogy to SQL may make
>make this more obvious. The select- and where-clauses of a SQL query don't
>usually require state: these can be executed a row at a time on input data
>and maintain state between rows. The rest of SQL, multi-row aggregations
>and joins, require more support to execute correctly in a streaming
>fashion. Samza doesn't provide a high-level language like SQL but it does
>provide lower-level primitives that make streaming aggregation and joins
>and other stateful processing easy to implement.
>+Of course simple filtering or single-row transformations can be done
>without any need for collecting state. A simple analogy to SQL may make
>this more obvious. The select- and where-clauses of a SQL query don't
>usually require state: these can be executed a row at a time on input data
>and maintain state between rows. The rest of SQL, multi-row aggregations
>and joins, require more support to execute correctly in a streaming
>fashion. Samza doesn't provide a high-level language like SQL but it does
>provide lower-level primitives that make streaming aggregation and joins
>and other stateful processing easy to implement.
>
> Let's dive into how this works and why it is useful.
>
> ### Common use cases for stateful processing
>
>-First, let's look at some simplistic examples of stateful stream
>processing that might be seen on a consumer website.
>+First, let's look at some simplistic examples of stateful stream
>processing that might be seen on a consumer website. Later in this
>document
>we'll go through specific details of using Samza's built-in key-value
>storage capabilities to implement each of these applications.
>
> ##### Windowed aggregation
>
> Example: Counting the number of page views for each user per hour
>
>-This kind of windowed processing is common for ranking and relevance,
>"trending topics", as well as simple real-time reporting and monitoring.
>+This kind of windowed processing is common for ranking and relevance,
>"trending topics", as well as simple real-time reporting and monitoring.
>For small windows one can just maintain the aggregate in memory and
>manually commit the task position only at window boundaries. However this
>means we have to recover up to a full window on fail-over. But this will
>not be very slow for large windows because of the amount of reprocessing.
>For larger windows or for effectively infinite windows it is better to
>make
>the in-process aggregation fault-tolerant rather than try to recompute it.
>
> ##### Table-table join
>
> Example: Join a table of user profiles to a table of user\_settings by
>user\_id and emit the joined stream
>
>-This example is somewhat simplistic: one might wonder why you would want
>to join two tables in a stream processing system. However consider a more
>realistic example: real-time data normalization. E-commerce companies need
>to handle product imports, web-crawlers need to update their [database of
>the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need
>to normalize and index social data for search. Each of these processing
>flows are emensely complex and contain many complex processing stages that
>effectively join together and normalize many data sources into a single
>clean feed.
>+This example is somewhat simplistic: one might wonder why you would want
>to join two tables in a stream processing system. However consider a more
>realistic example: real-time data normalization. E-commerce companies need
>to handle product imports, web-crawlers need to update their [database of
>the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks
>need to normalize and index social data for search. Each of these
>processing flows are immensely complex and contain many complex processing
>stages that effectively join together and normalize many data sources into
>a single clean feed.
>
> ##### Table-stream join
>
>@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state
>consists of only a few va
>
> #### Using an external store
>
>-In the absence of built-in support a common pattern for stateful
>processing is to push any state that would be accumulated between rows
>into
>an external database or key-value store. You get something that looks like
>this:
>+In the absence of built-in support a common pattern for stateful
>processing is to push any state that would be accumulated between rows
>into
>an external database or key-value store. The database holds aggregates or
>the dataset being queried to enrich the incoming stream. You get something
>that looks like this:
>
> 
>![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_
>db.png)
>
>@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop
>you from querying a rem
>
> To understand why this is useful let's first understand some of the
>drawbacks of making remote queries in a stream processing job:
>
>-1. **Performance**: The first major drawback of making remote queries is
>that they are slow and expensive. A Kafka stream can deliver hundreds of
>thousands or even millions of messages per second per CPU core because it
>transfers large chunks of data at a time. But a remote database query is a
>more expensive proposition. Though the database may be partitioned and
>scalable this partitioning doesn't match the partitioning of the job into
>tasks so batching becomes much less effective. As a result you would
>expect
>to get a few thousand queries per second per core for remote requests.
>This
>means that adding a processing stage that uses an external database will
>often reduce the throughput by several orders of magnitude.
>+1. **Performance**: The first major drawback of making remote queries is
>that they are slow and expensive. For example, a Kafka stream can deliver
>hundreds of thousands or even millions of messages per second per CPU core
>because it transfers large chunks of data at a time. But a remote database
>query is a more expensive proposition. Though the database may be
>partitioned and scalable this partitioning doesn't match the partitioning
>of the job into tasks so batching becomes much less effective. As a result
>you would expect to get a few thousand queries per second per core for
>remote requests. This means that adding a processing stage that uses an
>external database will often reduce the throughput by several orders of
>magnitude.
> 1. **Isolation**: If your database or service is also running live
>processing, mixing in asynchronous processing can be quite dangerous. A
>scalable stream processing system can run with very high parallelism. If
>such a job comes down (say for a code push) it queues up data for
>processing, when it restarts it will potentially have a large backlog of
>data to process. Since the job may actually have very high parallelism
>this
>can result in huge load spikes, many orders of magnitude higher than
>steady
>state load. If this load is mixed with live queries (i.e. the queries used
>to build web pages or render mobile ui or anything else that has a user
>waiting on the other end) then you may end up causing a denial-of-service
>attack on your live service.
> 1. **Query Capabilities**: Many scalable databases expose very limited
>query interfaces--only supporting simple key-value lookups. Doing the
>equivalent of a "full table scan" or rich traversal may not be practical
>in
>this model.
> 1. **Correctness**: If your task keeps counts or otherwise modifies state
>in a remote store how is this rolled back if the task fails?
>@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of
>the remote database and
>
> Note that now the state is physically on the same machine as the tasks,
>and each task has access only to its local partition. However the
>combination of stateful tasks with the normal partitioning capabilities
>Samza offers makes this a very general feature: in general you just
>repartition on the key by which you want to split your processing and then
>you have full local access to the data within storage in that partition.
>
>+In cases where we were querying the external database on each input
>message to join on additional data for our output stream we would now
>instead create an input stream coming from the remote database that
>captures the changes to the database.
>+
> Let's look at how this addresses the problems of the remote store:
>
> 1. This fixes the performance issues of remote queries because the data
>is
>now local, what would otherwise be a remote query may now just be a lookup
>against local memory or disk (we ship a [LevelDB](
>https://code.google.com/p/leveldb)-based store which is described in
>detail
>below).
>
>
>
>
>
>On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil
><te...@gmail.com>wrote:
>
>> I am probably the dumbest person on this list in terms of technical
>> know-how, but hey,.. if I can understand the doc, then most people would
>> understand easily :)
>>
>> Some comments:
>>
>> (1) What does a typical task state consist of ? An explicit example of
>> "task state" would be helpful. There are couple of good examples in the
>>doc
>> but none of them say "hey, for this use case the task state is .."
>>
>> (2) "The problems of remote stores" -> "Performance":
>> Before this point, there was no reference of Kafka at all in the doc and
>> you suddenly start comparing things with Kafka stream. People w/o any
>>Kafka
>> background would not get that part.
>>
>> (3) "Approaches to managing task state" -> "Using an external store"
>> The figure gave me an impression that tasks' o/p goes to 2 places: o/p
>> stream and external store. However reading further made me realize that
>>we
>> just the task state to the external DB
>>  which is different from o/p stream...right ?
>>
>> Trivial things:
>> - "A simple analogy to SQL may make make this more obvious." : Word
>>"make"
>> occurs twice
>> - The hyperlink for "database of the web" is not working
>>
>> Thanks,
>> Tejas
>>
>>
>> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > I took a pass at improving the state management documentation
>>(talking to
>> > people, I don't think anyone understood what we were saying):
>> >
>> >
>> 
>>http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
>>te-management.html
>> >
>> > I would love to get some feedback on this, especially from anyone who
>> > doesn't already know Samza. Does this make any sense? Does it tell you
>> what
>> > you need to know in the right order?
>> >
>> > -Jay
>> >
>>


Re: state management docs

Posted by Jay Kreps <ja...@gmail.com>.
1. Yeah I think maybe the confusion is that we give the examples of
stateful processing without saying why. I tried to make it a little more
clear.
2. Tried to make the transition a little more clear. Samza kind of assumes
at least passing familiarity with Kafka so to some extent this is
unavoidable, but I think the problem is that it isn't clear why we are
talking about kafka (a stream implementation).
3. Not sure how to clarify the diagram (any suggestions?), but hopefully
improved the text.

Let me know if you feel this helps:

diff --git a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn/documentation/0.7.0/container/
state-management.md
index c23c7c3..2f9b740 100644
--- a/docs/learn/documentation/0.7.0/container/state-management.md
+++ b/docs/learn/documentation/0.7.0/container/state-management.md
@@ -5,25 +5,25 @@ title: State Management

 One of the more interesting aspects of Samza is the ability for tasks to
store data locally and execute rich queries against it.

-Of course simple filtering or single-row transformations can be done
without any need for collecting state. A simple analogy to SQL may make
make this more obvious. The select- and where-clauses of a SQL query don't
usually require state: these can be executed a row at a time on input data
and maintain state between rows. The rest of SQL, multi-row aggregations
and joins, require more support to execute correctly in a streaming
fashion. Samza doesn't provide a high-level language like SQL but it does
provide lower-level primitives that make streaming aggregation and joins
and other stateful processing easy to implement.
+Of course simple filtering or single-row transformations can be done
without any need for collecting state. A simple analogy to SQL may make
this more obvious. The select- and where-clauses of a SQL query don't
usually require state: these can be executed a row at a time on input data
and maintain state between rows. The rest of SQL, multi-row aggregations
and joins, require more support to execute correctly in a streaming
fashion. Samza doesn't provide a high-level language like SQL but it does
provide lower-level primitives that make streaming aggregation and joins
and other stateful processing easy to implement.

 Let's dive into how this works and why it is useful.

 ### Common use cases for stateful processing

-First, let's look at some simplistic examples of stateful stream
processing that might be seen on a consumer website.
+First, let's look at some simplistic examples of stateful stream
processing that might be seen on a consumer website. Later in this document
we'll go through specific details of using Samza's built-in key-value
storage capabilities to implement each of these applications.

 ##### Windowed aggregation

 Example: Counting the number of page views for each user per hour

-This kind of windowed processing is common for ranking and relevance,
"trending topics", as well as simple real-time reporting and monitoring.
+This kind of windowed processing is common for ranking and relevance,
"trending topics", as well as simple real-time reporting and monitoring.
For small windows one can just maintain the aggregate in memory and
manually commit the task position only at window boundaries. However this
means we have to recover up to a full window on fail-over. But this will
not be very slow for large windows because of the amount of reprocessing.
For larger windows or for effectively infinite windows it is better to make
the in-process aggregation fault-tolerant rather than try to recompute it.

 ##### Table-table join

 Example: Join a table of user profiles to a table of user\_settings by
user\_id and emit the joined stream

-This example is somewhat simplistic: one might wonder why you would want
to join two tables in a stream processing system. However consider a more
realistic example: real-time data normalization. E-commerce companies need
to handle product imports, web-crawlers need to update their [database of
the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need
to normalize and index social data for search. Each of these processing
flows are emensely complex and contain many complex processing stages that
effectively join together and normalize many data sources into a single
clean feed.
+This example is somewhat simplistic: one might wonder why you would want
to join two tables in a stream processing system. However consider a more
realistic example: real-time data normalization. E-commerce companies need
to handle product imports, web-crawlers need to update their [database of
the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks
need to normalize and index social data for search. Each of these
processing flows are immensely complex and contain many complex processing
stages that effectively join together and normalize many data sources into
a single clean feed.

 ##### Table-stream join

@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state
consists of only a few va

 #### Using an external store

-In the absence of built-in support a common pattern for stateful
processing is to push any state that would be accumulated between rows into
an external database or key-value store. You get something that looks like
this:
+In the absence of built-in support a common pattern for stateful
processing is to push any state that would be accumulated between rows into
an external database or key-value store. The database holds aggregates or
the dataset being queried to enrich the incoming stream. You get something
that looks like this:

 ![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_db.png)

@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop
you from querying a rem

 To understand why this is useful let's first understand some of the
drawbacks of making remote queries in a stream processing job:

-1. **Performance**: The first major drawback of making remote queries is
that they are slow and expensive. A Kafka stream can deliver hundreds of
thousands or even millions of messages per second per CPU core because it
transfers large chunks of data at a time. But a remote database query is a
more expensive proposition. Though the database may be partitioned and
scalable this partitioning doesn't match the partitioning of the job into
tasks so batching becomes much less effective. As a result you would expect
to get a few thousand queries per second per core for remote requests. This
means that adding a processing stage that uses an external database will
often reduce the throughput by several orders of magnitude.
+1. **Performance**: The first major drawback of making remote queries is
that they are slow and expensive. For example, a Kafka stream can deliver
hundreds of thousands or even millions of messages per second per CPU core
because it transfers large chunks of data at a time. But a remote database
query is a more expensive proposition. Though the database may be
partitioned and scalable this partitioning doesn't match the partitioning
of the job into tasks so batching becomes much less effective. As a result
you would expect to get a few thousand queries per second per core for
remote requests. This means that adding a processing stage that uses an
external database will often reduce the throughput by several orders of
magnitude.
 1. **Isolation**: If your database or service is also running live
processing, mixing in asynchronous processing can be quite dangerous. A
scalable stream processing system can run with very high parallelism. If
such a job comes down (say for a code push) it queues up data for
processing, when it restarts it will potentially have a large backlog of
data to process. Since the job may actually have very high parallelism this
can result in huge load spikes, many orders of magnitude higher than steady
state load. If this load is mixed with live queries (i.e. the queries used
to build web pages or render mobile ui or anything else that has a user
waiting on the other end) then you may end up causing a denial-of-service
attack on your live service.
 1. **Query Capabilities**: Many scalable databases expose very limited
query interfaces--only supporting simple key-value lookups. Doing the
equivalent of a "full table scan" or rich traversal may not be practical in
this model.
 1. **Correctness**: If your task keeps counts or otherwise modifies state
in a remote store how is this rolled back if the task fails?
@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of
the remote database and

 Note that now the state is physically on the same machine as the tasks,
and each task has access only to its local partition. However the
combination of stateful tasks with the normal partitioning capabilities
Samza offers makes this a very general feature: in general you just
repartition on the key by which you want to split your processing and then
you have full local access to the data within storage in that partition.

+In cases where we were querying the external database on each input
message to join on additional data for our output stream we would now
instead create an input stream coming from the remote database that
captures the changes to the database.
+
 Let's look at how this addresses the problems of the remote store:

 1. This fixes the performance issues of remote queries because the data is
now local, what would otherwise be a remote query may now just be a lookup
against local memory or disk (we ship a [LevelDB](
https://code.google.com/p/leveldb)-based store which is described in detail
below).





On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil <te...@gmail.com>wrote:

> I am probably the dumbest person on this list in terms of technical
> know-how, but hey,.. if I can understand the doc, then most people would
> understand easily :)
>
> Some comments:
>
> (1) What does a typical task state consist of ? An explicit example of
> "task state" would be helpful. There are couple of good examples in the doc
> but none of them say "hey, for this use case the task state is .."
>
> (2) "The problems of remote stores" -> "Performance":
> Before this point, there was no reference of Kafka at all in the doc and
> you suddenly start comparing things with Kafka stream. People w/o any Kafka
> background would not get that part.
>
> (3) "Approaches to managing task state" -> "Using an external store"
> The figure gave me an impression that tasks' o/p goes to 2 places: o/p
> stream and external store. However reading further made me realize that we
> just the task state to the external DB
>  which is different from o/p stream...right ?
>
> Trivial things:
> - "A simple analogy to SQL may make make this more obvious." : Word "make"
> occurs twice
> - The hyperlink for "database of the web" is not working
>
> Thanks,
> Tejas
>
>
> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > I took a pass at improving the state management documentation (talking to
> > people, I don't think anyone understood what we were saying):
> >
> >
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
> >
> > I would love to get some feedback on this, especially from anyone who
> > doesn't already know Samza. Does this make any sense? Does it tell you
> what
> > you need to know in the right order?
> >
> > -Jay
> >
>

Re: state management docs

Posted by Tejas Patil <te...@gmail.com>.
I am probably the dumbest person on this list in terms of technical
know-how, but hey,.. if I can understand the doc, then most people would
understand easily :)

Some comments:

(1) What does a typical task state consist of ? An explicit example of
"task state" would be helpful. There are couple of good examples in the doc
but none of them say "hey, for this use case the task state is .."

(2) "The problems of remote stores" -> "Performance":
Before this point, there was no reference of Kafka at all in the doc and
you suddenly start comparing things with Kafka stream. People w/o any Kafka
background would not get that part.

(3) "Approaches to managing task state" -> "Using an external store"
The figure gave me an impression that tasks' o/p goes to 2 places: o/p
stream and external store. However reading further made me realize that we
just the task state to the external DB
 which is different from o/p stream...right ?

Trivial things:
- "A simple analogy to SQL may make make this more obvious." : Word "make"
occurs twice
- The hyperlink for "database of the web" is not working

Thanks,
Tejas


On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <ja...@gmail.com> wrote:

> I took a pass at improving the state management documentation (talking to
> people, I don't think anyone understood what we were saying):
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
>
> I would love to get some feedback on this, especially from anyone who
> doesn't already know Samza. Does this make any sense? Does it tell you what
> you need to know in the right order?
>
> -Jay
>