You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/04/15 20:11:14 UTC

[jira] [Updated] (SAMZA-233) BrokerProxies are never started when created during abdication

     [ https://issues.apache.org/jira/browse/SAMZA-233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Riccomini updated SAMZA-233:
----------------------------------

    Attachment: SAMZA-233.0.patch

Attaching patch. RB at https://reviews.apache.org/r/20375/.

Changes are:

1. Make BrokerProxy.start check if its thread is already started, and not start it twice.
2. Change KafkaSystemConsumer to always start broker proxies in refreshBrokers.
3. Remove the BrokerProxy.start calls from KafkaSystemConsumer.start, since they're handled by refreshBrokers now.
4. Add a test to verify that abdication triggers a start call.
5. Make TopicMetadataStore mock-able for KafkaSystemConsumer.

I had to do some back-bending to get (4) done. Mostly had to extract logic from refreshBrokers in order to remove Kafka calls, and avoid having to use Kafka's Broker class, which is package-private.


> BrokerProxies are never started when created during abdication
> --------------------------------------------------------------
>
>                 Key: SAMZA-233
>                 URL: https://issues.apache.org/jira/browse/SAMZA-233
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-233.0.patch
>
>
> We have seen an issue lately where, after some time, a container will stop receiving messages from some partitions.
> After examining the container, it appears that this is triggered by an abdication for a BrokerProxy that was not previously instantiated during the KafkaSystemConsumers.start() method. Here's what we see:
> {noformat}
> 18:12:01,417  INFO KafkaSystemConsumer:128 - Abdicating for [MY-TOPIC,105]
> 18:12:01,418  INFO VerifiableProperties:68 - Verifying properties
> 18:12:01,419  INFO VerifiableProperties:68 - Property client.id is overridden to samza_consumer-job-thingy-i001-1395955794446-1
> 18:12:01,419  INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to localhost:10251
> 18:12:01,419  INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 6000
> 18:12:01,420  INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:localhost,port:10251 with correlation id 0 for 1 topic(s) Set(MY-TOPIC)
> 18:12:01,421  INFO SyncProducer:68 - Connected to localhost:10251 for producing
> 18:12:01,436  INFO SyncProducer:68 - Disconnecting from localhost:10251
> 18:12:01,437  INFO BrokerProxy:128 - Creating new SimpleConsumer for host localhost:10251 for system kafka
> 18:12:01,442  INFO GetOffset:128 - Validating offset 2872503 for topic and partition [MY-TOPIC,105]
> 18:12:01,456  INFO GetOffset:128 - Able to successfully read from offset 2872503 for topic and partition [MY-TOPIC,105]. Using it to instantiate consumer.
> {noformat}
> Notice that this log line never appears from BrokerProxy:
> {code}
>   def start {
>     info("Starting " + toString)
>     ...
>   }
> {code}
> Digging in a bit, KafkaSystemConsumer.refreshBrokers can create a new BrokerProxy that wasn't created in the KafkaSystemConsumer.start() method in cases where a partition was moved to a broker that it hasn't yet created a proxy for.
> {code}
>           brokerOption match {
>             case Some(broker) =>
>               def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
>               brokerProxies
>                 .getOrElseUpdate((broker.host, broker.port), createBrokerProxy)
>                 .addTopicPartition(head, Option(nextOffset))
>             case None => warn("No such topic-partition: %s, dropping." format head)
> {code}
> But it never starts the thread.



--
This message was sent by Atlassian JIRA
(v6.2#6252)