You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Vincent Fumo (JIRA)" <ji...@apache.org> on 2016/07/13 20:34:20 UTC

[jira] [Closed] (KAFKA-3957) consumer timeout not being respected when kafka broker is not available

     [ https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vincent Fumo closed KAFKA-3957.
-------------------------------

> consumer timeout not being respected when kafka broker is not available
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-3957
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3957
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.1
>            Reporter: Vincent Fumo
>            Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call like
> consumer.poll(10000)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
>    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
>        // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
>        coordinator.ensureCoordinatorKnown();
>        // ensure we have partitions assigned if we expect to
>        if (subscriptions.partitionsAutoAssigned())
>            coordinator.ensurePartitionAssignment();
>        // fetch positions if we have partitions we're subscribed to that we
>        // don't know the offset for
>        if (!subscriptions.hasAllFetchPositions())
>            updateFetchPositions(this.subscriptions.missingFetchPositions());
>        // init any new fetches (won't resend pending fetches)
>        Cluster cluster = this.metadata.fetch();
>        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
>        // if data is available already, e.g. from a previous network client poll() call to commit,
>        // then just return it immediately
>        if (!records.isEmpty()) {
>            return records;
>        }
>        fetcher.initFetches(cluster);
>        client.poll(timeout);
>        return fetcher.fetchedRecords();
>    }
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
>    public void ensureCoordinatorKnown() {
>        while (coordinatorUnknown()) {
>            RequestFuture<Void> future = sendGroupMetadataRequest();
>            client.poll(future);
>            if (future.failed()) {
>                if (future.isRetriable())
>                    client.awaitMetadataUpdate();
>                else
>                    throw future.exception();
>            }
>        }
>    }
> in this case the Future fails (since the broker is down) and then a call to client.awaitMetadataUpdate() is made which in the case of the ConsumerNetworkClient will block forever :
>    public void awaitMetadataUpdate() {
>        int version = this.metadata.requestUpdate();
>        do {
>            poll(Long.MAX_VALUE);
>        } while (this.metadata.version() == version);
>    }
> I feel that this is a bug. When you set a timeout on a call to a blocking method, that timeout should be respected and an exception should be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)