You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2018/09/06 04:15:00 UTC

[jira] [Resolved] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

     [ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax resolved KAFKA-7365.
------------------------------------
    Resolution: Invalid

> max.poll.records setting in Kafka Consumer is not working
> ---------------------------------------------------------
>
>                 Key: KAFKA-7365
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7365
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Kashyap Ivaturi
>            Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which takes longer time to process and I encounter CommitFailed exception for the last set of records even though they were processed. While i'am able to reconnect back its picking some messages that I had already processed. I don't want this to happen as its creating duplicates in target systems that I integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 'max.poll.records' to '1' and then pushed 4 messages into the Topic the consumer is listening.
> I expected that the consumer will only process 1 message because of my 'max.poll.records' setting but the consumer has processed all the 4 messages in single poll. Any idea why did it not consider 'max.poll.records' setting or is some other setting overriding this setting?. Appreciate your help or guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 300000
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000
> partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 40000
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
> ssl.keystore.password = [hidden]
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = /kafka/certs/empestor/certificates/kafka.client.truststore.jks
> ssl.truststore.password = [hidden]
> ssl.truststore.type = JKS
> value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>  
> 2018-08-28 08:29:48.079  INFO 91121 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)