You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2015/10/07 03:02:27 UTC

[jira] [Updated] (SAMZA-789) CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started

     [ https://issues.apache.org/jira/browse/SAMZA-789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yi Pan (Data Infrastructure) updated SAMZA-789:
-----------------------------------------------
    Attachment: SAMZA-789-0.patch

RB is online as well: https://reviews.apache.org/r/39072/

> CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-789
>                 URL: https://issues.apache.org/jira/browse/SAMZA-789
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.10.0
>            Reporter: Yi Pan (Data Infrastructure)
>            Assignee: Yi Pan (Data Infrastructure)
>             Fix For: 0.10.0
>
>         Attachments: SAMZA-789-0.patch
>
>
> The following bug has been observed in 0.10:
> 1) when coordinator stream is enabled in 0.10, the OffsetManager starts CheckpointManager internally and register/start the enclosed SystemConsumer first
> 2) after the container is started, the LocalityManager starts and register the enclosed SystemConsumer again, after it has already started.
> This creates the problem in KafkaSystemConsumer and generates the following warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() function:
> {code}
> 2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition [__samza_coordinator_acg-test_i001,0] to queue for lca1-app0908.corp.linkedin.com
> 2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for [__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. Retrying.
> 2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]
> 	at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
> 	at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
> 	at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
> 	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
> 	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
> 	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
> 	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> 	at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
> 	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
> 	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> 	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> The quick fix here is to avoid registering the same coordinate stream system partition after the CoordinatorStreamSystemConsumer has already started.
> Long term fix is to manage the life-cycle states of the SystemConsumers more rigorously in a state machine model that only allows certain state transitions: e.g. only from init -> registered -> started -> stopped, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)