You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/04/10 19:08:15 UTC

[jira] [Created] (SAMZA-233) BrokerProxies are never started when created during abdication

Chris Riccomini created SAMZA-233:
-------------------------------------

             Summary: BrokerProxies are never started when created during abdication
                 Key: SAMZA-233
                 URL: https://issues.apache.org/jira/browse/SAMZA-233
             Project: Samza
          Issue Type: Bug
          Components: kafka
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini
             Fix For: 0.7.0


We have seen an issue lately where, after some time, a container will stop receiving messages from some partitions.

After examining the container, it appears that this is triggered by an abdication for a BrokerProxy that was not previously instantiated during the KafkaSystemConsumers.start() method. Here's what we see:

{noformat}
18:12:01,417  INFO KafkaSystemConsumer:128 - Abdicating for [MY-TOPIC,105]
18:12:01,418  INFO VerifiableProperties:68 - Verifying properties
18:12:01,419  INFO VerifiableProperties:68 - Property client.id is overridden to samza_consumer-job-thingy-i001-1395955794446-1
18:12:01,419  INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to localhost:10251
18:12:01,419  INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 6000
18:12:01,420  INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:localhost,port:10251 with correlation id 0 for 1 topic(s) Set(MY-TOPIC)
18:12:01,421  INFO SyncProducer:68 - Connected to localhost:10251 for producing
18:12:01,436  INFO SyncProducer:68 - Disconnecting from localhost:10251
18:12:01,437  INFO BrokerProxy:128 - Creating new SimpleConsumer for host localhost:10251 for system kafka
18:12:01,442  INFO GetOffset:128 - Validating offset 2872503 for topic and partition [MY-TOPIC,105]
18:12:01,456  INFO GetOffset:128 - Able to successfully read from offset 2872503 for topic and partition [MY-TOPIC,105]. Using it to instantiate consumer.
{noformat}

Notice that this log line never appears from BrokerProxy:

{code}
  def start {
    info("Starting " + toString)
    ...
  }
{code}

Digging in a bit, KafkaSystemConsumer.refreshBrokers can create a new BrokerProxy that wasn't created in the KafkaSystemConsumer.start() method in cases where a partition was moved to a broker that it hasn't yet created a proxy for.

{code}
          brokerOption match {
            case Some(broker) =>
              def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)

              brokerProxies
                .getOrElseUpdate((broker.host, broker.port), createBrokerProxy)
                .addTopicPartition(head, Option(nextOffset))
            case None => warn("No such topic-partition: %s, dropping." format head)
{code}

But it never starts the thread.



--
This message was sent by Atlassian JIRA
(v6.2#6252)