You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2015/10/07 02:49:27 UTC

[jira] [Created] (SAMZA-789) CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started

Yi Pan (Data Infrastructure) created SAMZA-789:
--------------------------------------------------

             Summary: CoordinatorStreamSystemConsumer should not register the coordinator stream partition after the SystemConsumer has already started
                 Key: SAMZA-789
                 URL: https://issues.apache.org/jira/browse/SAMZA-789
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.10.0
            Reporter: Yi Pan (Data Infrastructure)
            Assignee: Yi Pan (Data Infrastructure)
             Fix For: 0.10.0


The following bug has been observed in 0.10:
1) when coordinator stream is enabled in 0.10, the OffsetManager starts CheckpointManager internally and register/start the enclosed SystemConsumer first
2) after the container is started, the LocalityManager starts and register the enclosed SystemConsumer again, after it has already started.

This creates the problem in KafkaSystemConsumer and generates the following warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() function:

{code}
2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition [__samza_coordinator_acg-test_i001,0] to queue for lca1-app0908.corp.linkedin.com
2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for [__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. Retrying.
2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
org.apache.samza.SamzaException: Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]
	at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
	at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
	at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
	at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
	at java.lang.Thread.run(Thread.java:745)
{code}

The quick fix here is to avoid registering the same coordinate stream system partition after the CoordinatorStreamSystemConsumer has already started.

Long term fix is to manage the life-cycle states of the SystemConsumers more rigorously in a state machine model that only allows certain state transitions: e.g. only from init -> registered -> started -> stopped, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)