You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Navina Ramesh (JIRA)" <ji...@apache.org> on 2015/04/28 23:39:07 UTC

[jira] [Commented] (SAMZA-662) Samza auto-creates changelog stream without sufficient partitions when container number > 1

    [ https://issues.apache.org/jira/browse/SAMZA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518131#comment-14518131 ] 

Navina Ramesh commented on SAMZA-662:
-------------------------------------

bq. The maxChangeLogStreamPartition is 1.
Based on your example, isn't maxChangeLogStreamPartition = 3? 
{code}
containerModel.getTasks.values.max(Ordering.by { task: TaskModel => task.getChangelogPartition.getPartitionId }) 
{code}

should return TaskModel (containerId=2, inputStreamPartition=2, changelogPartition=2). Hence, maxChangeLogStreamPartitions should be 3. 

The order in which the containers are brought shouldn't matter as long as the ContainerModel remains the same. Please correct me if I have misunderstood something here.


> Samza auto-creates changelog stream without sufficient partitions when container number > 1
> -------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-662
>                 URL: https://issues.apache.org/jira/browse/SAMZA-662
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Yan Fang
>
> We currently provide auto-create for changelog streams. However, when there are more than 1 containers, it is possible that Samza creates a changelog stream with insufficient partitions. 
> Reason:
> assume we have an input stream with 3 partitions and then we assign 3 containers for this job. According to the [JobCoordinator|https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala], we will get:
> ||Container(Model) || InputStream Partition || Changelog Partition ||
> |0 | 0 | 0|
> |1 | 1 | 1|
> |2 | 2 | 2|
> If Container 0 is brought up first, it calls 
> {code}
>     val maxChangeLogStreamPartitions = containerModel.getTasks.values
>             .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
>             .getChangelogPartition.getPartitionId + 1
> {code}
> in [SamzaContainer|https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala].
> The maxChangeLogStreamPartition is 1. So we will auto-create a changelog stream with only 1 partitions.
> Similarly, if the Container 2 is brought up first, we will get a stream with 2 partitions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)