You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Vahid Hashemian (JIRA)" <ji...@apache.org> on 2016/07/12 22:28:20 UTC
[jira] [Commented] (KAFKA-3957) consumer timeout not being
respected when kafka broker is not available
[ https://issues.apache.org/jira/browse/KAFKA-3957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15373823#comment-15373823 ]
Vahid Hashemian commented on KAFKA-3957:
----------------------------------------
This is already filed under an earlier JIRA ([KAFKA-1894|https://issues.apache.org/jira/browse/KAFKA-1894]).
> consumer timeout not being respected when kafka broker is not available
> -----------------------------------------------------------------------
>
> Key: KAFKA-3957
> URL: https://issues.apache.org/jira/browse/KAFKA-3957
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.9.0.1
> Reporter: Vincent Fumo
> Priority: Minor
>
> KafkaConsumer v0.9::
> I have a consumer set up with session.timeout.ms set to 30s. I make a call like
> consumer.poll(10000)
> but if the kafka broker is down, that call will hang indefinitely.
> Digging into the code it seems that the timeout isn't respected:
> KafkaConsumer calls out to pollOnce() as seen below::
> private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
> // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
> coordinator.ensureCoordinatorKnown();
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
> coordinator.ensurePartitionAssignment();
> // fetch positions if we have partitions we're subscribed to that we
> // don't know the offset for
> if (!subscriptions.hasAllFetchPositions())
> updateFetchPositions(this.subscriptions.missingFetchPositions());
> // init any new fetches (won't resend pending fetches)
> Cluster cluster = this.metadata.fetch();
> Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
> // if data is available already, e.g. from a previous network client poll() call to commit,
> // then just return it immediately
> if (!records.isEmpty()) {
> return records;
> }
> fetcher.initFetches(cluster);
> client.poll(timeout);
> return fetcher.fetchedRecords();
> }
> and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
> AbstractCoordinator ::
> public void ensureCoordinatorKnown() {
> while (coordinatorUnknown()) {
> RequestFuture<Void> future = sendGroupMetadataRequest();
> client.poll(future);
> if (future.failed()) {
> if (future.isRetriable())
> client.awaitMetadataUpdate();
> else
> throw future.exception();
> }
> }
> }
> in this case the Future fails (since the broker is down) and then a call to client.awaitMetadataUpdate() is made which in the case of the ConsumerNetworkClient will block forever :
> public void awaitMetadataUpdate() {
> int version = this.metadata.requestUpdate();
> do {
> poll(Long.MAX_VALUE);
> } while (this.metadata.version() == version);
> }
> I feel that this is a bug. When you set a timeout on a call to a blocking method, that timeout should be respected and an exception should be thrown.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)