You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Martin Kleppmann (JIRA)" <ji...@apache.org> on 2014/09/05 01:16:23 UTC

[jira] [Commented] (SAMZA-402) Provide a "shared state" store among StreamTasks

    [ https://issues.apache.org/jira/browse/SAMZA-402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14122155#comment-14122155 ] 

Martin Kleppmann commented on SAMZA-402:
----------------------------------------

I think this is a useful feature. Some more example use cases for a "map-side join" (shared state that is only read by the job):

* A control channel for a job, allowing an administrator to adjust aspects of the job at runtime without job restart. For example, a job could be deployed with two different algorithms and an A/B-testing facility. An administrator could adjust whether algorithm A or algorithm B should be used by updating a particular key in the shared store.
* A repository for Avro schemas (see SAMZA-317), where the store contains a mapping from schema ID to schema.

At the moment, if you want to implement those things, you have to do one of the following:

* Create a stream for the shared state (with at least as many partitions as any other input stream), co-partition that stream with the input streams, and write each state update to *all* partitions of that stream (so that it is delivered to all StreamTasks in the job). That seems wasteful and ugly.
* Connect to some external system (not managed by Samza), e.g. an external database. This increases the number of infrastructure components that need to be deployed, makes the job more complicated, and potentially performs worse. For example, the job may cache lookups to an external database, but without consuming a changelog from that database it won't know when cache entries need to be invalidated (you can only use a TTL and cross your fingers). A cold cache after container restart will perform badly as every lookup incurs a network round-trip.

Both of those suck. Even a simple shared state abstraction (read-only, no support for atomic swaps, no special handling of deletions) would make the implementation of this kind of use cases significantly nicer.

If we want to support use cases where a batch job pushes a new version of the state that completely replaces the old version, then we would probably need atomic swaps and handling of deletions. For that reason, I'm inclined to not support such batch updates of shared state. Batch-updated state can continue to use Voldemort.

Regarding mutable state shared between StreamTasks: I think this would be a dangerous abstraction for the reasons you describe. A true implementation of mutable shared state would require a consensus algorithm and would be a nightmare.

I think single-writer state would probably be safe (as you describe, using the task name as key). However, I would prefer to think of this as a kind of asynchronous message passing: one task is sending a message to the other tasks, saying "my counter value is now x". Put that way, the key is the "sender" of the message.

In my opinion, the key-value interface for a shared store should not permit writing (calling put() should raise an exception), to avoid setting false expectations of synchronous updates and magical distributed consistency. Job authors who know what they're doing can still write to the store asynchronously by sending a message to the output stream that is the changelog for the store. That way the write looks conceptually more like sending a message to the other tasks, and less like a state update. But that should be considered advanced usage, because it's up to the job author to enforce things like the single-writer constraint.

(If this kind of cross-task coordination use case turns out to be common, we could consider adding an abstraction on top of shared stores which enforces things like "one writer per key". But that can be a separate, future issue.)

On SAMZA-353 we discussed whether the StreamTask should be notified about changes in the store. I now think that probably isn't necessary, at least for a first version.

In summary: just because certain use cases can't easily be satisfied, we shouldn't throw the baby out with the bathwater. I think we should implement a simple version of shared state which is read-only and which only supports single-key updates (no batch updates, no atomic switching), like you describe in the implementation section of the design doc. That would already be very useful, and leave open our options to support more use cases in future.

> Provide a "shared state" store among StreamTasks
> ------------------------------------------------
>
>                 Key: SAMZA-402
>                 URL: https://issues.apache.org/jira/browse/SAMZA-402
>             Project: Samza
>          Issue Type: Bug
>          Components: container, kv
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>         Attachments: DESIGN-SAMZA-402-0.md, DESIGN-SAMZA-402-0.pdf
>
>
> There has been a lot of discussion about shared state stores in SAMZA-353. Initially, it seemed as though we might implement them through SAMZA-353, but now it seems more preferable to implement them separately. As such, this ticket is to discuss global state/shared state (terms that are being used interchangeably) between StreamTasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)