You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2013/11/05 18:56:17 UTC

[jira] [Commented] (SAMZA-71) Support new partitioning strategies

    [ https://issues.apache.org/jira/browse/SAMZA-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13814073#comment-13814073 ] 

Chris Riccomini commented on SAMZA-71:
--------------------------------------

Several things to think about here:

 1. The existing proposal does not include a setting that mimics the current partitioning behavior.
 2. We are potentially increasing the number of partitions of a Samza job quite dramatically when max is set.
 3. We are creating a configuration that must remain static after the first time a job is run.

Right now, partitioning (1) is done by getting the largest topic (by partition count) and using that topic's partition count as the partition count for the Samza job. For example, if the largest input topic has a partition count of 48, then a Samza job that consumes this topic will have 48 partitions. Adding more input topics does not increase the partition count, provided that the new input topics have a partition count <= the max partition count.

With the new strategy, adding any new topic to the input stream would result in an increase in the partition count (2) on the job. This is a new behavior, and could impact Samza jobs in the following way:

 a. The checkpoint topic would no longer be valid, since it would not have partitions for the newly added partitions. For example, if a Samza job were set to max, and had 48 partitions, and a new 16 partition input topic were added, the checkpoint topic would have only 48 partitions, but the new Samza job would be running with 48 + 16 = 64 partitions.
 b. In cases where there are a large number of input topics, even if they individually only have a few partitions each, we could end up with 1000s of partitions in total. This is problematic for our checkpointing and state management, as we have per-partition topics for each of these stream types. Consider the case where a Samza job has a single state store, and 1000 input streams each with 4 partitions. In the existing model, this would result in 4 partitions for the Samza job (max(input stream partitions) = 4), which in turn results in 4 partitions for the checkpoint topic and 4 partitions for the state topic. If the new `max` configuration is set, instead, then the Samza job will have 1000 * 4 = 4000 partitions. This results in a 4000 partition checkpoint topic, and a 4000 partition state topic. Partitions in Kafka are not cheap (ZK overhead, inodes on disk, etc). This partition count is too large to support.

Lastly, adjusting the input topics or partitioning strategy after a job has already been (3) run can result in unexpected behavior. Adding a topic, as already described above, might accidentally increase the partition count of the Samza job, which Samza assumes will never happen. Changing the partitioning strategy from max to co-group (or vice-versa) will result in state topics and checkpoint topics getting incorrectly assigned to new partitions.

> Support new partitioning strategies
> -----------------------------------
>
>                 Key: SAMZA-71
>                 URL: https://issues.apache.org/jira/browse/SAMZA-71
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Jakob Homan
>              Labels: project
>
> Currently, the number of stream tasks instances that are created for a Samza job are equal to the max number of partitions across all input streams. For example, if your Samza job is using two YARN containers, and has two input streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions, then the Samza job will have a total of max(4,8)=8 partitions. Therefore, 8 StreamTask instances would be created (spread as evenly as possible across two YARN containers).
> This scheme works for co-grouping when both input streams have the same number of partitions, and are partitioned using the same partitioning scheme (e.g. a Kafka partitioner that partitions by member ID). The parallelism of the job is limited by the number of StreamTask instances, which means that the parallelism is limited by the max number of partitions across all input streams (8 in the example above).
> We can actually do better than these guarantees. We should change Samza's partitioning style to behave in the following way:
> 1. Support a task.partition.scheme=max config. Samza will create one stream task instance for each input stream partition. In the example above, IS1 has 4 partitions, and IS2 has 8 partitions, so Samza would create 4+8=12 StreamTasks. Each input stream's partition would be mapped to a unique StreamTask that would receive messages ONLY from that input stream/partition pair, and from no other. Using this style of partitioning increases a Samza job's possible parallelism to be the absolute maximum (based on Kafka semantics, which limit a single consumer for each input stream/partition pair).
> 2. Support a task.partition.scheme=cogroup config. Samza will create one stream task instance for the greatest common denominator of all stream task partition counts. For example, in the example above, IS1 has 4 partitions, and IS2 has 8. GCD(4,8)=4, so Samza would create four partitions. If IS1 had 4 partitions, and IS2 had 6, then GCD(4,6)=2, so the Samza job would have two StreamTask instances. Using this style can decrease a Samza job's parallelism, but provides the guarantee that a StreamTask instance will receive all messages across all input streams for a key that it's in charge of. For example, if a StreamTask is consuming AdViews and AdClicks, and both are partitioned by member ID, but AdViews has 12 partitions, and AdClicks has 8 partitions, then there will be 4 StreamTask instances, and each instance will receive rougly 1/4th of all clicks and views, and all clicks and views for a given member ID will be mapped to just one of the StreamTask, so aggregation/joining will be possible.
> The default task.partition.scheme will be max, when the user hasn't specified a partition scheme. Thus, the default will not allow any aggregation or joining across input streams.
> With both of these styles, we can still use the Partition class (and getPartitionId) to identify each StreamTask instance, but we will need to devise a deterministic way to map from each input stream/partition pair to each StreamTask partition.
> In the case of style #1 (max), consider the case where we have IS1 with 4 partitions and IS2 with 8 partitions. We can use the order of task.inputs to define an ordering across stream names. We can then instantiate all 12 StreamTasks, and simply iterate over all input stream's based on their task.inputs order and sorted partition sets to do the mapping. If we had task.inputs=IS2,IS1, the mapping would look like this:
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:4
> IS2:5 - StreamTask:5
> IS2:6 - StreamTask:6
> IS2:7 - StreamTask:7
> IS1:0 - StreamTask:8
> IS1:1 - StreamTask:9
> IS1:2 - StreamTask:10
> IS1:3 - StreamTask:11
> In the case of style #2 (cogroup), consider the case where IS1 has 8 partitions and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be created. The mapping in this case should then be:
> IS1:0 - StreamTask:0
> IS1:1 - StreamTask:1
> IS1:2 - StreamTask:2
> IS1:3 - StreamTask:3
> IS1:4 - StreamTask:0
> IS1:5 - StreamTask:1
> IS1:6 - StreamTask:2
> IS1:7 - StreamTask:3
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:0
> IS2:5 - StreamTask:1
> IS2:6 - StreamTask:2
> IS2:7 - StreamTask:3
> IS2:8 - StreamTask:0
> IS2:9 - StreamTask:1
> IS2:10 - StreamTask:2
> IS2:11 - StreamTask:3
> As you can see, the assignment is done by modding each input stream's partition number by the GCD value (4, in this case). This assignment strategy has the nice guarantee that keys will map to the same StreamTask across input streams with different partition counts (provided that they're partitioned by the same key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213 %12 = partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and 1%4=1. The same holds true for other keys, as well.
> 1211%8=3 .. 3%4=3
> 1211%12=11 .. 11%4=3
> Both of these partition assignment schemes work only as long as the guarantee that the task.inputs stream order is static (or new streams are appended to the end), and that each input stream's partition count is static, and will never change.
> You can use the Euclidean algorithm to find the GCD:
> http://www-math.ucdenver.edu/~wcherowi/courses/m5410/exeucalg.html



--
This message was sent by Atlassian JIRA
(v6.1#6144)