You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/09/02 21:57:22 UTC

[jira] [Commented] (SAMZA-353) Support assigning the same SSP to multiple tasknames

    [ https://issues.apache.org/jira/browse/SAMZA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14118627#comment-14118627 ] 

Chris Riccomini commented on SAMZA-353:
---------------------------------------

bq. With global (unpartitioned) state, I think the container should definitely continue consuming the stream after the initial bootstrapping. Some jobs may have global state that changes continuously, or have periodic (e.g. hourly) state updates from an external process. It would be very disruptive if you had to keep restarting the job in order to pick up those state changes, or if you had to wait for some TTL to expire before the state was refreshed.

I agree. I would actually take this one step further, and consider how we might implement atomic swaps of shared state within a container (note: oops, I see that you mention this down below). We have an existing pattern where a DB index is built offline in Hadoop, pushed to the live site, and then atomically swapped to enable reads for it. This is the case with Voldemort's read-only stores. Even though the existing proposals come close with shared state, there's still no easy way to do an atomic swap. The nearest that I can think of is to somehow have two stores, and some control message that determines which one should be read off of at any given point. This seems ugly, though.

Related to this is the problem where you're generating a data set in Hadoop, and pushing through Kafka to your Samza job's shared state store. If you generate a feed that has a value for key A, and then future generations of the feed don't contain key A, you'll still have the value for key A forever, since it's sitting in a log-compacted Kafka topic. The only way around this, that I can see, is to have the Hadoop job intersect its keys from its latest run with the keys from the prior run, and send a key A message with a null value (a delete) to the Kafka topic. This seems ugly as well.

bq. Would a "global state" store be read-only, and only be updated by messages on the stream? If so, a job could still write to it by sending a message to the changelog stream, but the write would only take effect after a round-trip through the message broker. I think that's fine.

I had been thinking that it'd be read-write in my latest proposal, but I actually think that proposal is broken. In it, I propose having each container have a partition for the shared state store, and then having all containers except the partition's owner read from it. The problem with this approach is that it totally breaks partitioning by key, which every state store needs. The reason that it breaks this is because if container 0 sends a write for key A to its partition, and container 1 does the same, then key A will exist in two partitions. Which one should be read after the other? There's no ordering, and reading between partitions could be non-deterministic.

To fix this, you have to basically arrive at the approach that you propose, I think: a read-only store, where the writes are handled by sending messages to a stream. The trade-off with this approach is that it makes writes asynchronous, which is a little weird. I haven't fully considered the problems that this might cause.

bq. If the primary interface to the global state is a key-value interface (rather than a stream that is consumed), we may still want some way for a StreamTask to be notified when something changes in the state. For example, if the StreamTask needs to maintain some internal state that is derived from the global state, it would be good if every state change was also sent to the process() method.

Yea, this seems like it'd be useful. I think it also might fix some of the risk of having an asynchronous write to the KV store (see above), since you could be notified when your write comes through.

bq. An example of a use case that would benefit from a "global stream" model: Take the use case of a daily Hadoop job generating some global state, pushing it out through a stream, and the stream task performing a map-side join against it. While the global state is in the middle of being updated, the stream task will see some values from the previous day's state, and some values from the current day. In applications where such inconsistency is unacceptable, it would be better if each day had a separate copy of the state store, and the stream task did an atomic switchover to the new day's state when the push is complete. In a "global stream" model, a stream task could implement this atomic switchover itself, but in a "global state" model, the switchover would have to be part of the framework. And since it's a somewhat application-specific use case, we probably wouldn't want to make it part of the framework.

Yes, this use case is problematic. Even if we have "global streams", I think the actual StreamTask implementation is pretty ugly. You'd have to have either a control message topic, or a timeout that defines the edge of a store push.

bq. I think that global state should use the same message broker communication mechanism as we use for all other streams. I think it would be short-sighted to introduce a new communication mechanism (e.g. RPC to AM, or querying a remote database) for the purpose of global state. There's nothing preventing jobs from making RPC/database queries in application code if they want to, but I believe that framework-managed state should be a stream (see also comment above on continuing to consume a stream after initial bootstrapping).

I agree.

bq. Multi-subscriber partitions: I think the use case you give in the design doc (hot standby) is a good and valid one. I see great potential for Samza as a framework for maintaining materialised views of data in streams, and making it queryable by external processes (cf. SAMZA-316). However, I have been worried about the lack of high availability in such an architecture (after container failure, its partitions become unavailable for a while as they are re-bootstrapped). Thus it would be valuable to have the option of implementing hot standby (even with the caveat that you need to ensure that replicas of the same partition treat the input messages deterministically).

I agree hot standby seems desirable in the long run. I do question this implementation style, though. It seems more elegant to simply implement a hot standby by having it consume from the changelog and checkpoint topics for all StreamTasks in a container. In such a case, you don't need to have multi-subscriber streams. I really struggled to come up with any use case where multi-subscriber streams would be desirable.

bq. Regarding ordered offsets: I would prefer a solution which doesn't require offsets to be totally ordered. For example, I was thinking about implementing a MySQL system consumer (SAMZA-200) in a way that didn't have totally ordered offsets. I thought about using the MySQL GTID as offset, which consists of two parts: a UUID and a transaction number. The transaction number is sequential, but every time the master changes (failover), the transaction number sequence is reset and restarted with a new UUID. This means that in order to establish a total ordering of GTIDs, you need a lookup table on the side, telling you which UUID corresponds to which period of mastership; although there is a total ordering of transactions, it is not apparent from the GTIDs themselves.

That's really good to know. I don't think we'd had any non-ordered offset examples yet, so this is really helpful. I'd prefer more general, rather than more restrictive, all things being equal.

bq. Regarding the problem of SamzaContainer consuming the same stream at different offsets: as a suggestion, we could specify that if several tasks within the same container consume the same SSP, they will always consume at the same position. That may still require changing MessageChooser, but would probably be less intrusive than consuming at multiple offsets or requiring totally ordered offsets. For example, multi-subscriber partitions may be special-cased in MessageChooser (if one task instance chooses to process a message from a multi-subscriber partition, then all tasks which are assigned that SSP within that container must also process the message).

I thought this initially, too. The problem that I see is that it's actually impossible to enforce this. There are two cases where your StreamTasks can diverge within a single container:

# Checkpoints are per-StreamTask, so a failure could occur half-way through checkpointing all StreamTasks. Some will be left with the old SSP offset, while some with the new.
# The container count could change, which would involve moving StreamTasks from one container to another. You could end up with some StreamTasks on one offset while others are on a separate one, since we are only talking about container-level lock-step offsets.

This line of thinking is what led me to the ordered offset approach. With ordering, we can have a single offset (the oldest one), and just strip process() calls for StreamTasks that were ahead.

bq. I like the fact that global state can be shared amongst all tasks in the same container. That seems like a worthwhile optimisation. This leads me to think that there are two kinds of state stores: (a) task-local stores, whose changelog stream is only read during container startup, and which thereafter are read-writable by one particular task instance; and (b) global (container-local) stores, which continually consume the changelog stream, which are readable by all tasks in the container, but which are not writable through the key-value interface (only asynchronously writable by sending a message to the changelog stream).

Agreed. This seems pretty reasonable.

bq. Regarding "shared state store": I'm not keen on using a partition per container, because containers are supposed to be purely a unit of computational resource, and not have any semantic importance for a job. Partition-per-container would violate that principle, and would make it harder to change the number of containers used by a job. (Also, restoring from all partitions would require some kind of cross-partition ordering.)

You're correct. My most recently proposed approach seems broken in this regard. I think your proposed read-only store with asynch writes is the best we can do.

bq. Allow assigning the same SSP to multiple tasks by removing the check.

I'd rather leave this feature out for now, as it seems almost totally orthogonal to global state at this point (as you say at the end of your comment).

bq. Add configuration for global state stores, which are read-only, continually consume the changelog, and are shared between all tasks in a container.

Yes, this seems to be what I'm arriving at as well.

bq. Looking at it this way, the global state and the multi-subscriber partitions seem somewhat orthogonal. We could, for example, do only global state stores on this ticket, and multi-subscriber partitions separately on another ticket.

I agree. I was actually thinking that I'd open up a separate global state ticket, and leave this as the multi-subscriber ticket. I'm mostly interested in tackling global state right now, not multi-subscriber streams.

The two sticking points to think through seem to be:

* How to implement atomic swap.
* Are asynchronous writes a problem in shared state stores.
* Should we implement callbacks for state change on the shared stores? This seems hacky.


> Support assigning the same SSP to multiple tasknames
> ----------------------------------------------------
>
>                 Key: SAMZA-353
>                 URL: https://issues.apache.org/jira/browse/SAMZA-353
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Jakob Homan
>         Attachments: DESIGN-SAMZA-353-0.md, DESIGN-SAMZA-353-0.pdf
>
>
> Post SAMZA-123, it is possible to add the same SSP to multiple tasknames, although currently we check for this and error out if this is done.  We should think through the implications of having the same SSP appear in multiple tasknames and support this if it makes sense.  
> This could be used as a broadcast stream that's either added by Samza itself to each taskname, or individual groupers could do this as makes sense.  Right now the container maintains a map of SSP to TaskInstance and delivers the ssp to that task instance.  With this change, we'd need to change the map to SSP to Set[TaskInstance] and deliver the message to each TI in the set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)