You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/12/12 00:19:13 UTC

[jira] [Commented] (SAMZA-489) Support Amazon Kinesis

    [ https://issues.apache.org/jira/browse/SAMZA-489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14243334#comment-14243334 ] 

Chris Riccomini commented on SAMZA-489:
---------------------------------------

Thanks for looking into this. The sharding strategy that Kinesis uses seems quite interesting and unique.

bq. However, there are still some tricky edge cases to handle.

I was actually thinking the reverse of your edge case: what happens when a shard overlaps to StreamTasks, or do we even allow this? If a shard's range is 0-100, and we have two StreamTasks responsible for ranges 0-75 and 76-150, respectively, who processes the shard? We can suppose that we always bind to the lowest of the stream tasks, by range (0-75), but what happens if 0-100 splits into 0-20,21-40,41-60,61-80, and 81-100 (5x split)? Should 81-100 continue being assigned to the 0-75 StreamTask? How can we know, next time around, that this is the assignment we should make? Perhaps the assignments could be done via the CoordinatorStream, the same way that we handle changelog-partition assignments now.

We could also force the StreamTask count to == the shard count at the time the job first starts. This would fix shard splitting, since you can continue having the single StreamTasks process all sub-shards that fall out of the initial shard that existed when you started up, but it doesn't fix the problem you mention about shard merging.

Also, what happens to a Kinesis consumer in a case where it's consuming a shard and the shard suddenly splits? In Sama's model, we'd want the JobCoordinator to shut down all containers, re-assign partitions (shards), then start the containers back up again.

> Support Amazon Kinesis
> ----------------------
>
>                 Key: SAMZA-489
>                 URL: https://issues.apache.org/jira/browse/SAMZA-489
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Martin Kleppmann
>              Labels: project
>
> [AWS Kinesis|http://aws.amazon.com/kinesis/] is a publish-subscribe message broker service quite similar to Kafka, provided as a hosted service by Amazon. I have spoken to a few people who are interested in using Kinesis with Samza, since the options for stateful stream processing with Kinesis are currently quite limited. Samza's local state support would be great for Kinesis users.
> I've looked a little into what it would take to support Kinesis in Samza. Useful resources:
> * [Kinesis Client Library for Java|https://github.com/awslabs/amazon-kinesis-client]
> * [Kinesis developer guide|http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html]
> * [Description of resharding|http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-api-java.html#kinesis-using-api-java-resharding]
> Kinesis is similar to Kafka in that it has total ordering of messages within a partition (which Kinesis calls a "shard"). The most notable differences I noticed are:
> * Kinesis does not support compaction by key, and only keeps messages for 24 hours (the "trim horizon"). Thus it cannot be used for checkpointing and state store changelogging. Another service must be used for durable storage (Amazon recommends DynamoDB).
> * It is common for the number of shards in a stream to change ("resharding"), because a Kinesis shard is a unit of resourcing, not a logical grouping. A Kinesis shard is more like a Kafka broker node, not like a Kafka partition.
> The second point suggests that Kinesis shards should not be mapped 1:1 to Samza StreamTasks like we do for Kafka, because whenever the number of shards changes, any state associated with a StreamTask would no longer be in the right place.
> Kinesis assigns a message to a shard based on the MD5 hash of the message's partition key (so all messages with the same partition key are guaranteed to be in the same shard). Each shard owns a continuous range of the MD5 hash space. When the number of shards is increased by one, a shard's hash range is subdivided into two sub-ranges. When the number of shards is decreased by one, two adjacent shards' hash ranges are merged into a single range.
> I think the nicest way of modelling this in Samza would be to create a fixed number of StreamTasks (e.g. 256, but make it configurable), and to assign each task a fixed slice of this MD5 hash space. Each Kinesis shard then corresponds to a subset of these StreamTasks, and the SystemConsumer implementation routes messages from a shard to the appropriate StreamTask based on the hash of the message's partition key. This implies that all the StreamTasks for a particular Kinesis shard should be processed within the same container. This is not unlike the Kafka consumer in Samza, which fetches messages for all of a container's Kafka partitions in one go.
> This solves removes the semantic problem of resharding: we can ensure that messages with the same partition key are always routed to the same StreamTask, even across shard splits and merges.
> However, there are still some tricky edge cases to handle. For example, if Kinesis decides to merge two shards that are currently processed by two different Samza containers, what should Samza do? A simple (but perhaps a bit wasteful) solution would be for both containers to continue consuming the merged shard. Alternatively, Samza could reassign some StreamTasks from one container to another, but that would require any state to be moved or rebuilt. Probably double-consuming would make most sense for a first implementation.
> In summary, it looks like Kinesis support is feasible, and would be a fun challenge for someone to take on. Contributions welcome :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)