You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Aleksei Vasilevskii (JIRA)" <ji...@apache.org> on 2019/01/29 09:28:00 UTC

[jira] [Created] (CAMEL-13140) camel-kafka - consumer does not respect auto.commit.interval.ms with AutoCommitEnabled=true

Aleksei Vasilevskii created CAMEL-13140:
-------------------------------------------

             Summary: camel-kafka - consumer does not respect auto.commit.interval.ms with AutoCommitEnabled=true
                 Key: CAMEL-13140
                 URL: https://issues.apache.org/jira/browse/CAMEL-13140
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 2.23.0
            Reporter: Aleksei Vasilevskii
         Attachments: Kafka_Brokers_CPU.png

This is probably a side effect of CAMEL-12454: when auto commit enabled is enabled kafka consumer commits offsets as soon as an exchange is complete with no regard to the auto.commit.interval.ms setting, which may cause additional non-balanced load on the kafka cluster (see attached screenshot depicting kafka brokers cpu load right after upgrade from 2.20.3 to 2.23.0).

Here is an example with debugging turned on for org.apache.camel.component.kafka.KafkaConsumer, as you can see Kafka consumer commits every 10-500 ms instead of once per 5 seconds:
{code:java}
2019-01-28 10:46:55.025  INFO 3324 --- [ontext_Worker-3] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.56.10:9093]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = service_new
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 1000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 40000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = SSL
	send.buffer.bytes = 131072
	session.timeout.ms = 30000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = /usr/files/server.jks
	ssl.keystore.password = [hidden]
	ssl.keystore.type = JCEKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = /usr/files/truststore.jks
	ssl.truststore.password = [hidden]
	ssl.truststore.type = JCEKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: kafka:topic1,topic2,topic3?brokers=192.168.56.10:9093
2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
2019-01-28 10:46:55.313  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Discovered group coordinator 192.168.56.10:9093 (id: 2147483646 rack: null)
2019-01-28 10:46:55.315  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Revoking previously assigned partitions []
2019-01-28 10:46:55.316  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] (Re-)joining group
2019-01-28 10:46:58.469  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Successfully joined group with generation 3
2019-01-28 10:46:58.470  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Setting newly assigned partitions [topic3-1, topic3-0, topic2-1, topic1-0, topic2-0, topic1-1]
2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221
2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221
2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222
2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222
2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325
2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325
2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223
2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223
2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326
2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326
2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224
2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224
2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327
2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327
2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225
2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225
2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328
2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328
2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226
2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226
2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329
2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329
2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330
2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330
2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227
2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227
2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
{code}

When downgraded to camel-kafka 2.20.3 commits are done correctly (once every auto.commit.interval.ms) and there are no messages from o.a.camel.component.kafka.KafkaConsumer in the debug log.
Running with debugger showed that this code actually never gets executed in 2.20.3: https://github.com/apache/camel/blob/2.20.x/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L382 and we seem to rely on the auto-commit feature of the Kafka client itself, not the camel-kafka wrapper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)