You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2015/10/09 09:59:26 UTC
[jira] [Resolved] (SAMZA-789) CoordinatorStreamSystemConsumer
should not register the coordinator stream partition after the
SystemConsumer has already started
[ https://issues.apache.org/jira/browse/SAMZA-789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yi Pan (Data Infrastructure) resolved SAMZA-789.
------------------------------------------------
Resolution: Fixed
> CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: SAMZA-789
> URL: https://issues.apache.org/jira/browse/SAMZA-789
> Project: Samza
> Issue Type: Bug
> Affects Versions: 0.10.0
> Reporter: Yi Pan (Data Infrastructure)
> Assignee: Yi Pan (Data Infrastructure)
> Fix For: 0.10.0
>
> Attachments: SAMZA-789-0.patch
>
>
> The following bug has been observed in 0.10:
> 1) when coordinator stream is enabled in 0.10, the OffsetManager starts CheckpointManager internally and register/start the enclosed SystemConsumer first
> 2) after the container is started, the LocalityManager starts and register the enclosed SystemConsumer again, after it has already started.
> This creates the problem in KafkaSystemConsumer and generates the following warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() function:
> {code}
> 2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition [__samza_coordinator_acg-test_i001,0] to queue for lca1-app0908.corp.linkedin.com
> 2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for [__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. Retrying.
> 2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]
> at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
> at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
> at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
> at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
> at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> The quick fix here is to avoid registering the same coordinate stream system partition after the CoordinatorStreamSystemConsumer has already started.
> Long term fix is to manage the life-cycle states of the SystemConsumers more rigorously in a state machine model that only allows certain state transitions: e.g. only from init -> registered -> started -> stopped, etc.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)