You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/08/13 14:18:00 UTC

[jira] [Commented] (KAFKA-8789) kafka-console-consumer timeout-ms setting behaves incorrectly with older client

    [ https://issues.apache.org/jira/browse/KAFKA-8789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906247#comment-16906247 ] 

ASF GitHub Bot commented on KAFKA-8789:
---------------------------------------

dongjinleekr commented on pull request #7206: KAFKA-8789: kafka-console-consumer timeout-ms setting behaves incorrectly with older client
URL: https://github.com/apache/kafka/pull/7206
 
 
   This is a draft implementation of [KAFKA-8789](https://issues.apache.org/jira/browse/KAFKA-8789). I dug out the problem and found the following.
   
   Before 2.1.0 (Confluent Platform 5.1.x, 8a78d764), `ConsoleConsumer.ConsumerWrapper#recordIter` was initialized with `consumer.poll(0)`, which in turn calls `KafkaConsumer#poll(Timer, includeMetadataInTimeout = false)` and fetches metadata regardless of timeout.
   
   However, this code was removed while removing the deprecated `KafkaConsumer#poll(long)`. So, `ConsoleConsumer.ConsumerWrapper#receive` now fetches metadata just before starting consuming, with timeout limitation. If an insufficient timeout is given, it calls the following methods and finally ends with `TimeoutException`.
   
   `KafkaConsumer#poll(Duration, boolean)` ->
   `KafkaConsumer#updateAssignmentMetadataIfNeeded(Timer
   )`
   -> `KafkaConsumer#updateFetchPositions`
   -> `ConsumerCoordinator#refreshCommittedOffsetsIfNeeded`
   -> `ConsumerCoordinator#fetchCommittedOffsets`
   -> `ConsumerNetworkClient#poll(RequestFuture, Timer)`
   -> `ConsumerNetworkClient#poll(Timer, PollCondition)`
   -> `ConsumerNetworkClient#poll(Timer, PollCondition, boolean)`
   -> `ConsumerNetworkClient#failExpiredRequests`
   
   To make this tool to work like before 2.1.0, we need to add a method to update `KafkaConsumer`'s metadata before starting polling. I can't be certain what approach would be the best, I added `Consumer#poll(Duration, boolean)` as a temporal workaround. (That is, this PR will be updated following the related KIP discussion.)
   
   cc/ @hachikuji @viktorsomogyi @ijuma @HeartSaVioR
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> kafka-console-consumer timeout-ms setting behaves incorrectly with older client
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-8789
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8789
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>    Affects Versions: 2.3.0
>            Reporter: Raman Gupta
>            Assignee: Lee Dongjin
>            Priority: Major
>
> I have a topic with about 20,000 events in it, running on a Kafka 2.3.0 broker. When I run the following tools command using the older Kafka client included in Confluent 5.0.3.
> bin/kafka-console-consumer \ 
>   --bootstrap-server $KAFKA \ 
>   --topic x \ 
>   --from-beginning --max-messages 1 \
>  --timeout-ms 15000
> I get 1 message as expected.
> However, when running the exact same command using the console consumer included with Confluent 5.3.0, I get org.apache.kafka.common.errors.TimeoutException, and 0 messages processed.
> NOTE: I am using the Confluent distribution of Kafka for the client side tools, specifically Confluent 5.0.3 and Confluent 5.3.0. I can certainly try to replicate with a vanilla Kafka if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)