You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Vincent Fumo (JIRA)" <ji...@apache.org> on 2016/07/12 16:38:20 UTC

[jira] [Created] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

Vincent Fumo created KAFKA-3957:
-----------------------------------

             Summary: consumer timeout not being respected when kafka broker is not available
                 Key: KAFKA-3957
                 URL: https://issues.apache.org/jira/browse/KAFKA-3957
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.9.0.1
            Reporter: Vincent Fumo
            Priority: Minor


KafkaConsumer v0.9::

I have a consumer set up with session.timeout.ms set to 30s. I make a call like

consumer.poll(10000)

but if the kafka broker is down, that call will hang indefinitely.

Digging into the code it seems that the timeout isn't respected:

KafkaConsumer calls out to pollOnce() as seen below::

   private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
       // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
       coordinator.ensureCoordinatorKnown();

       // ensure we have partitions assigned if we expect to
       if (subscriptions.partitionsAutoAssigned())
           coordinator.ensurePartitionAssignment();

       // fetch positions if we have partitions we're subscribed to that we
       // don't know the offset for
       if (!subscriptions.hasAllFetchPositions())
           updateFetchPositions(this.subscriptions.missingFetchPositions());

       // init any new fetches (won't resend pending fetches)
       Cluster cluster = this.metadata.fetch();
       Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

       // if data is available already, e.g. from a previous network client poll() call to commit,
       // then just return it immediately
       if (!records.isEmpty()) {
           return records;
       }

       fetcher.initFetches(cluster);
       client.poll(timeout);
       return fetcher.fetchedRecords();
   }

and we see that we stick on the call to coordinator.ensureCoordinatorKnown();

AbstractCoordinator ::

   public void ensureCoordinatorKnown() {
       while (coordinatorUnknown()) {
           RequestFuture<Void> future = sendGroupMetadataRequest();
           client.poll(future);

           if (future.failed()) {
               if (future.isRetriable())
                   client.awaitMetadataUpdate();
               else
                   throw future.exception();
           }
       }
   }

in this case the Future fails (since the broker is down) and then a call to client.awaitMetadataUpdate() is made which in the case of the ConsumerNetworkClient will block forever :

   public void awaitMetadataUpdate() {
       int version = this.metadata.requestUpdate();
       do {
           poll(Long.MAX_VALUE);
       } while (this.metadata.version() == version);
   }


I feel that this is a bug. When you set a timeout on a call to a blocking method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)