You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Eugen (JIRA)" <ji...@apache.org> on 2019/03/05 10:01:00 UTC

[jira] [Commented] (CAMEL-12031) KafkaConsumer stops consuming messages when exception occurs during offset commit

    [ https://issues.apache.org/jira/browse/CAMEL-12031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784275#comment-16784275 ] 

Eugen commented on CAMEL-12031:
-------------------------------

It seams I can replicate this issue with camel-kafka 3.0.0.M1 release.

On some topics on kafka I get reconnect issue when either changing the offset manually (start from beginning, or when new message comes in).
{code:java}
2019-03-05 10:51:36.168 INFO 5400 --- [carbpandastest]] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.0
2019-03-05 10:51:36.168 INFO 5400 --- [carbpandastest]] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 3402a8361b734732
2019-03-05 10:51:36.168 INFO 5400 --- [carbpandastest]] o.a.camel.component.kafka.KafkaConsumer : Reconnecting carbpandastest-Thread 0 to topic carbpandastest after 5000 ms
2019-03-05 10:51:41.182 INFO 5400 --- [carbpandastest]] o.a.camel.component.kafka.KafkaConsumer : Subscribing carbpandastest-Thread 0 to topic carbpandastest
2019-03-05 10:51:41.229 INFO 5400 --- [carbpandastest]] org.apache.kafka.clients.Metadata : Cluster ID: SV04zC4aSJuPFIuouM0CZA
2019-03-05 10:51:41.229 INFO 5400 --- [carbpandastest]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-47, groupId=1a981fbb-60c4-41b9-9051-8696a2b84cfc] Discovered group coordinator atgrzsl2929.avl01.avlcorp.lan:9092 (id: 2147483646 rack: null)
2019-03-05 10:51:41.229 INFO 5400 --- [carbpandastest]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-47, groupId=1a981fbb-60c4-41b9-9051-8696a2b84cfc] Revoking previously assigned partitions []
2019-03-05 10:51:41.229 INFO 5400 --- [carbpandastest]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-47, groupId=1a981fbb-60c4-41b9-9051-8696a2b84cfc] (Re-)joining group
2019-03-05 10:51:41.307 INFO 5400 --- [carbpandastest]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-47, groupId=1a981fbb-60c4-41b9-9051-8696a2b84cfc] Successfully joined group with generation 93
2019-03-05 10:51:41.307 INFO 5400 --- [carbpandastest]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-47, groupId=1a981fbb-60c4-41b9-9051-8696a2b84cfc] Setting newly assigned partitions [carbpandastest-0]
2019-03-05 10:51:41.354 WARN 5400 --- [carbpandastest]] o.a.camel.component.kafka.KafkaConsumer : KafkaException consuming carbpandastest-Thread 0 from topic carbpandastest. Will attempt to re-connect on next run
2019-03-05 10:51:41.385 INFO 5400 --- [carbpandastest]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [XXXXX:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 1a981fbb-60c4-41b9-9051-8696a2b84cfc
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer

2019-03-05 10:51:41.385 INFO 5400 --- [carbpandastest]] i.c.k.s.KafkaAvroDeserializerConfig : KafkaAvroDeserializerConfig values:
schema.registry.url = [http://XXXX:30002]
basic.auth.user.info = [hidden]
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
specific.avro.reader = true
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
{code}

> KafkaConsumer stops consuming messages when exception occurs during offset commit
> ---------------------------------------------------------------------------------
>
>                 Key: CAMEL-12031
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12031
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.20.0
>            Reporter: Rafał Gała
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 2.19.5, 2.20.2, 2.21.0
>
>
> When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the *org.apache.kafka.clients.consumer.CommitFailedException*.
> {code:java}
>        @Override
>         public void run() {
>             boolean first = true;
>             boolean reConnect = true;
>             while (reConnect) {
>                 // create consumer
>                 ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
>                 try {
>                     // Kafka uses reflection for loading authentication settings, use its classloader
>                     Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
>                     this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
>                 } finally {
>                     Thread.currentThread().setContextClassLoader(threadClassLoader);
>                 }
>                 if (!first) {
>                     // skip one poll timeout before trying again
>                     long delay = endpoint.getConfiguration().getPollTimeoutMs();
>                     log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
>                     try {
>                         Thread.sleep(delay);
>                     } catch (InterruptedException e) {
>                         Thread.currentThread().interrupt();
>                     }
>                 }
>                 first = false;
>                 // doRun keeps running until we either shutdown or is told to re-connect
>                 reConnect = doRun();
>             }
>         }
> {code}
> The *doRun()* method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.
> I think the catch block inside *doRun* method should look like this:
> {code:java}
>            ...
>             } catch (InterruptException e) {
>                 getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
>                 log.info("Unsubscribing {} from topic {}", threadId, topicName);
>                 consumer.unsubscribe();
>                 Thread.currentThread().interrupt();
>             } catch (org.apache.kafka.clients.consumer.CommitFailedException e) { //or even org.apache.kafka.common.KafkaException
>                 getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
>                 reConnect = true;
>             } catch (Exception e) {
>                 getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
>             } finally {
>                 log.debug("Closing {} ", threadId);
>                 IOHelper.close(consumer);
>             }
>             ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)