You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jakob Homan (JIRA)" <ji...@apache.org> on 2014/06/13 18:25:02 UTC

[jira] [Commented] (SAMZA-289) Job fails with invalid checkpoint topic partition count

    [ https://issues.apache.org/jira/browse/SAMZA-289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14030803#comment-14030803 ] 

Jakob Homan commented on SAMZA-289:
-----------------------------------

+1

> Job fails with invalid checkpoint topic partition count
> -------------------------------------------------------
>
>                 Key: SAMZA-289
>                 URL: https://issues.apache.org/jira/browse/SAMZA-289
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>            Priority: Blocker
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-289.0.patch
>
>
> We've been seeing failures off and on lately with messages like this:
> {noformat}
> org.apache.samza.checkpoint.kafka.KafkaCheckpointException: Checkpoint topic validation failed for topic __samza_checkpoint_samza-perf-store-all-calls_i001 because partition count 8 did not match expected partition count 64.
> {noformat}
> This causes the entire job to fail. It is triggered the first time a Samza job is run in a cluster, and only on jobs with an input stream that has more than the default partition count (num.partitions in the Kafka broker).
> I believe there is a race condition in KafkaCheckpointManager. Right now we only run KafkaCheckpointManager.createTopic in the container that owns the first partition:
> {code}
>   def start {
>     if (partitions.contains(new Partition(0))) {
>       createTopic
>     }
>     validateTopic
>   }
> {code}
> If a container starts before the container with partition 0, then the container without partition 0 will just run validateTopic. This triggers a call to get TopicMetadata from Kafka. If the checkpoint topic doesn't exist, I believe the broker will say so, but it will also create the topic in the background. When it does this, num.partitions (the default partition count) will be used to define how many partitions the new checkpoint topic has.
> If a Samza job's task.input stream list contains a stream with a non-default number of partitions (e.g. num.partitions=8, but task.inputs has a stream with 16 partitions), then this race condition can trigger a checkpoint topic with 8 partitions, and the validation will fail.
> I think the simplest fix is just to strip the if statement from KafkaCheckpointManager.start, and have all containers try and create the checkpoint topic. This will eliminate the race condition, since all containers will try and create the checkpoint topic with the correct number of partitions.



--
This message was sent by Atlassian JIRA
(v6.2#6252)