You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ray Chiang (JIRA)" <ji...@apache.org> on 2018/07/19 20:06:00 UTC

[jira] [Updated] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

     [ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ray Chiang updated KAFKA-6449:
------------------------------
    Component/s: consumer

> KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6449
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6449
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.0.1
>            Reporter: zhaoshijie
>            Priority: Major
>
> I use code as as follow consumer a partition of kafka topic, I got 40s latency every poll 
> {code:java}
> @Test
>     public void testTimeOut() throws Exception {
>         String test_topic = "timeOut_test";
>         int test_partition = 1;
>         Map<String, Object> kafkaParams = new HashMap<String, Object>();
>         kafkaParams.put("auto.offset.reset", "earliest");
>         kafkaParams.put("enable.auto.commit", false);
>         kafkaParams.put("bootstrap.servers", "*");
>         kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         kafkaParams.put("group.id", "test-consumer-" + System.currentTimeMillis());
> //        kafkaParams.put("reconnect.backoff.ms", "0");
> //        kafkaParams.put("max.poll.records", "500");
>         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaParams);
>         consumer.assign(Arrays.asList(new TopicPartition(test_topic, test_partition)));
>         Long offset = 0L;
>         while (true) {
>             Long startPollTime = System.currentTimeMillis();
>             consumer.seek(new TopicPartition(test_topic, test_partition), offset);
>             ConsumerRecords<String, String> records = consumer.poll(120000);
>             logger.info("poll take " + (System.currentTimeMillis() - startPollTime) + "ms, MSGCount is " + records.count());
>             Thread.sleep(41000);
>             Iterator<ConsumerRecord<String, String>> consumerRecordIterable = records.records(test_topic).iterator();
>             while (consumerRecordIterable.hasNext()) {
>                 offset = consumerRecordIterable.next().offset();
>             }
>         }
>     }
> {code}
> log as follow:
> {code:java}
> 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = 
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 30000
> metrics.num.samples = 2
> key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = earliest
> | org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = consumer-1
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 30000
> metrics.num.samples = 2
> key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = earliest
> | org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | Kafka version : 0.10.0.1 | org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)
> 2018-01-16 18:53:34,034 |INFO | main | Kafka commitId : a7a17cdec9eaa6c5 | org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)
> 2018-01-16 18:53:34,034 |INFO | main | Discovered coordinator polaris-1:9092 (id: 2147483647 rack: null) for group test-consumer-1516100013868. | org.apache.kafka.clients.consumer.internals.AbstractCoordinator.handleGroupMetadataResponse(AbstractCoordinator.java:505)
> 2018-01-16 18:53:34,034 |INFO | main | Fetch offset 0 is out of range for partition timeOut_test-1, resetting offset | org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:585)
> 2018-01-16 18:53:35,035 |INFO | main | poll take 780ms, MSGCount is 15943 | com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43)
> 2018-01-16 18:54:56,056 |INFO | main | poll take 40186ms, MSGCount is 15887 | com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43)
> 2018-01-16 18:56:17,017 |INFO | main | poll take 40121ms, MSGCount is 15672 | com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43)
> 2018-01-16 18:57:38,038 |INFO | main | poll take 40110ms, MSGCount is 15650 | com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43)
> 2018-01-16 18:58:59,059 |INFO | main | poll take 40269ms, MSGCount is 15650 | com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43)
> 2018-01-16 19:00:20,020 |INFO | main | poll take 40105ms, MSGCount is 15650 | com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43){code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)