You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2019/03/18 11:26:00 UTC

[jira] [Updated] (CAMEL-13140) camel-kafka - consumer does not respect auto.commit.interval.ms with AutoCommitEnabled=true

     [ https://issues.apache.org/jira/browse/CAMEL-13140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen updated CAMEL-13140:
--------------------------------
    Fix Version/s: 2.22.4

> camel-kafka - consumer does not respect auto.commit.interval.ms with AutoCommitEnabled=true
> -------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-13140
>                 URL: https://issues.apache.org/jira/browse/CAMEL-13140
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.23.0
>            Reporter: Aleksei Vasilevskii
>            Assignee: Ramu
>            Priority: Major
>             Fix For: 3.0.0, 2.23.2, 2.24.0, 2.22.4, 3.0.0-M2
>
>         Attachments: Kafka_Brokers_CPU.png
>
>
> This is probably a side effect of CAMEL-12454: when auto commit enabled is enabled kafka consumer commits offsets as soon as an exchange is complete with no regard to the auto.commit.interval.ms setting, which may cause additional non-balanced load on the kafka cluster (see attached screenshot depicting kafka brokers cpu load right after upgrade from 2.20.3 to 2.23.0).
> Here is an example with debugging turned on for org.apache.camel.component.kafka.KafkaConsumer, as you can see Kafka consumer commits every 10-500 ms instead of once per 5 seconds:
> {code:java}
> 2019-01-28 10:46:55.025  INFO 3324 --- [ontext_Worker-3] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> 	auto.commit.interval.ms = 5000
> 	auto.offset.reset = latest
> 	bootstrap.servers = [192.168.56.10:9093]
> 	check.crcs = true
> 	client.id = 
> 	connections.max.idle.ms = 540000
> 	enable.auto.commit = true
> 	exclude.internal.topics = true
> 	fetch.max.bytes = 52428800
> 	fetch.max.wait.ms = 500
> 	fetch.min.bytes = 1
> 	group.id = service_new
> 	heartbeat.interval.ms = 3000
> 	interceptor.classes = null
> 	internal.leave.group.on.close = true
> 	isolation.level = read_uncommitted
> 	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> 	max.partition.fetch.bytes = 1048576
> 	max.poll.interval.ms = 300000
> 	max.poll.records = 1000
> 	metadata.max.age.ms = 300000
> 	metric.reporters = []
> 	metrics.num.samples = 2
> 	metrics.recording.level = INFO
> 	metrics.sample.window.ms = 30000
> 	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
> 	receive.buffer.bytes = 65536
> 	reconnect.backoff.max.ms = 1000
> 	reconnect.backoff.ms = 50
> 	request.timeout.ms = 40000
> 	retry.backoff.ms = 100
> 	sasl.jaas.config = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	sasl.kerberos.service.name = null
> 	sasl.kerberos.ticket.renew.jitter = 0.05
> 	sasl.kerberos.ticket.renew.window.factor = 0.8
> 	sasl.mechanism = GSSAPI
> 	security.protocol = SSL
> 	send.buffer.bytes = 131072
> 	session.timeout.ms = 30000
> 	ssl.cipher.suites = null
> 	ssl.enabled.protocols = [TLSv1.2]
> 	ssl.endpoint.identification.algorithm = null
> 	ssl.key.password = null
> 	ssl.keymanager.algorithm = SunX509
> 	ssl.keystore.location = /usr/files/server.jks
> 	ssl.keystore.password = [hidden]
> 	ssl.keystore.type = JCEKS
> 	ssl.protocol = TLS
> 	ssl.provider = null
> 	ssl.secure.random.implementation = null
> 	ssl.trustmanager.algorithm = PKIX
> 	ssl.truststore.location = /usr/files/truststore.jks
> 	ssl.truststore.password = [hidden]
> 	ssl.truststore.type = JCEKS
> 	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> 2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
> 2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
> 2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: kafka:topic1,topic2,topic3?brokers=192.168.56.10:9093
> 2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
> 2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
> 2019-01-28 10:46:55.313  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Discovered group coordinator 192.168.56.10:9093 (id: 2147483646 rack: null)
> 2019-01-28 10:46:55.315  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Revoking previously assigned partitions []
> 2019-01-28 10:46:55.316  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] (Re-)joining group
> 2019-01-28 10:46:58.469  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Successfully joined group with generation 3
> 2019-01-28 10:46:58.470  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Setting newly assigned partitions [topic3-1, topic3-0, topic2-1, topic1-0, topic2-0, topic1-1]
> 2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221
> 2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221
> 2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222
> 2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222
> 2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325
> 2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325
> 2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223
> 2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223
> 2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326
> 2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326
> 2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224
> 2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224
> 2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327
> 2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327
> 2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225
> 2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225
> 2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328
> 2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328
> 2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226
> 2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226
> 2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329
> 2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329
> 2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330
> 2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330
> 2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227
> 2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227
> 2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
> 2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
> {code}
> When downgraded to camel-kafka 2.20.3 commits are done correctly (once every auto.commit.interval.ms) and there are no messages from o.a.camel.component.kafka.KafkaConsumer in the debug log.
> Running with debugger showed that this code actually never gets executed in 2.20.3: https://github.com/apache/camel/blob/2.20.x/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L382 and we seem to rely on the auto-commit feature of the Kafka client itself, not the camel-kafka wrapper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)