You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "joseph m'bimbi-bene (Jira)" <ji...@apache.org> on 2020/05/04 15:15:00 UTC
[jira] [Comment Edited] (CAMEL-14980) camel-kafka -
SerializationException - consumer keeps leaving and rejoining the group
[ https://issues.apache.org/jira/browse/CAMEL-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099026#comment-17099026 ]
joseph m'bimbi-bene edited comment on CAMEL-14980 at 5/4/20, 3:14 PM:
----------------------------------------------------------------------
Hi.
Indeed the serialization error is due to an avro message that the consumer cannot deserialize and it is the value.
Here are the logs from Camel :
{code:java}
2020-05-04 16:55:18.768 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:18.768 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:18.866 INFO 13972 — [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = http://localhost:9092 [...]
2020-05-04 16:55:18.925 WARN 13972 — [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : The configuration 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known config.2020-05-04 16:55:18.925 INFO 13972 — [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.12020-05-04 16:55:18.925 INFO 13972 — [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c012020-05-04 16:55:18.925 INFO 13972 — [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 15886041189252020-05-04 16:55:18.927 INFO 13972 — [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 to topic cont_hist after 5000 ms2020-05-04 16:55:23.928 INFO 13972 — [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:23.928 INFO 13972 — [umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed to topic(s): cont_hist2020-05-04 16:55:23.934 INFO 13972 — [umer[cont_hist]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID: RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:23.936 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.937 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking previously assigned partitions []2020-05-04 16:55:23.937 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:23.943 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:23.949 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with generation 32020-05-04 16:55:23.949 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned partitions: cont_hist-02020-05-04 16:55:23.954 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting offset for partition cont_hist-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}2020-05-04 16:55:23.978 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-2-54cba12d-1a38-4a5d-8b50-26f2a6b3868a sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.998 INFO 13972 — [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = http://localhost:9092 [...]
2020-05-04 16:55:24.014 WARN 13972 — [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : The configuration 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known config.2020-05-04 16:55:24.014 INFO 13972 — [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.12020-05-04 16:55:24.014 INFO 13972 — [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c012020-05-04 16:55:24.015 INFO 13972 — [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 15886041240142020-05-04 16:55:24.015 INFO 13972 — [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 to topic cont_hist after 5000 ms2020-05-04 16:55:29.015 INFO 13972 — [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:29.016 INFO 13972 — [umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed to topic(s): cont_hist2020-05-04 16:55:29.032 INFO 13972 — [umer[cont_hist]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID: RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:29.032 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.033 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking previously assigned partitions []2020-05-04 16:55:29.034 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:29.039 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:29.048 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with generation 52020-05-04 16:55:29.048 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned partitions: cont_hist-02020-05-04 16:55:29.050 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting offset for partition cont_hist-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}2020-05-04 16:55:29.066 INFO 13972 — [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-3-3da59636-0a83-4abc-9293-a7e456d65574 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.090 INFO 13972 — [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true{code}
There are no stack trace, it just keep leaving and joining the group.
But here is the message when i place a breakpoint in KafkaConsumer lign 409, then in the debugger, evaluate `e.printStackTrace` :
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition cont_hist-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class fr.ing.payment.cardeventnotifier.secure3d.generated.avro.PocContHist2 specified in writer's schema whilst finding reader's schema for a SpecificRecord.
In my re-creation of the error, I deliberately changed the name of the schema avro of the producer.
Another case: when i send garbage through the console-producer. Here is what i get when i evaluate `e.printStackTrace()` on the breakpoint:
{code:java}
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition cont_hist-0 at offset 5. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!{code}
In any case, the consumer leaves and joins the group indefinitely withou any informative message.
Is that enough or do you need any other information ?
was (Author: cless91):
Hi.
Indeed the serialization error is due to an avro message that the consumer cannot deserialize and it is the value.
Here are the logs from Camel :
```
2020-05-04 16:55:18.768 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:18.768 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:18.866 INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [http://localhost:9092] [...]
2020-05-04 16:55:18.925 WARN 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : The configuration 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known config.2020-05-04 16:55:18.925 INFO 13972 --- [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.12020-05-04 16:55:18.925 INFO 13972 --- [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c012020-05-04 16:55:18.925 INFO 13972 --- [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 15886041189252020-05-04 16:55:18.927 INFO 13972 --- [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 to topic cont_hist after 5000 ms2020-05-04 16:55:23.928 INFO 13972 --- [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:23.928 INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed to topic(s): cont_hist2020-05-04 16:55:23.934 INFO 13972 --- [umer[cont_hist]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID: RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:23.936 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.937 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking previously assigned partitions []2020-05-04 16:55:23.937 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:23.943 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:23.949 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with generation 32020-05-04 16:55:23.949 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned partitions: cont_hist-02020-05-04 16:55:23.954 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting offset for partition cont_hist-0 to the committed offset FetchPosition\{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}2020-05-04 16:55:23.978 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-2-54cba12d-1a38-4a5d-8b50-26f2a6b3868a sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.998 INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [http://localhost:9092] [...]
2020-05-04 16:55:24.014 WARN 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : The configuration 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known config.2020-05-04 16:55:24.014 INFO 13972 --- [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.12020-05-04 16:55:24.014 INFO 13972 --- [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c012020-05-04 16:55:24.015 INFO 13972 --- [umer[cont_hist]] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 15886041240142020-05-04 16:55:24.015 INFO 13972 --- [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 to topic cont_hist after 5000 ms2020-05-04 16:55:29.015 INFO 13972 --- [umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:29.016 INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed to topic(s): cont_hist2020-05-04 16:55:29.032 INFO 13972 --- [umer[cont_hist]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID: RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:29.032 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.033 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking previously assigned partitions []2020-05-04 16:55:29.034 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:29.039 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 16:55:29.048 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with generation 52020-05-04 16:55:29.048 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned partitions: cont_hist-02020-05-04 16:55:29.050 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting offset for partition cont_hist-0 to the committed offset FetchPosition\{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}2020-05-04 16:55:29.066 INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member consumer-3-3da59636-0a83-4abc-9293-a7e456d65574 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.090 INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true
```
There are no stack trace, it just keep leaving and joining the group.
But here is the message when i place a breakpoint in KafkaConsumer lign 409, then in the debugger, evaluate `e.printStackTrace` :
```
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition cont_hist-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class fr.ing.payment.cardeventnotifier.secure3d.generated.avro.PocContHist2 specified in writer's schema whilst finding reader's schema for a SpecificRecord.
```
In my re-creation of the error, I deliberately changed the name of the schema avro of the producer.
Another case: when i send garbage through the console-producer. Here is what i get when i evaluate `e.printStackTrace()` on the breakpoint:
```
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition cont_hist-0 at offset 5. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
```
In any case, the consumer leaves and joins the group indefinitely withou any informative message.
Is that enough or do you need any other information ?
> camel-kafka - SerializationException - consumer keeps leaving and rejoining the group
> -------------------------------------------------------------------------------------
>
> Key: CAMEL-14980
> URL: https://issues.apache.org/jira/browse/CAMEL-14980
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.2.0
> Reporter: joseph m'bimbi-bene
> Assignee: Ramu
> Priority: Major
> Fix For: 3.3.0
>
>
> Hello everyone,
>
> I found out i few days ago that if a `SerializationException` is thrown when the consumer tries to poll messages, it will keep leaving and joining the consumer-group indefinitely and without any informative log.
> The exception cannot either be handled by any camel exception handler.
> After some searching in the code i found out the culprit:
> ``` java
> // org.apache.camel.component.kafka.KafkaConsumer (ligns 406-415):
> catch (KafkaException e) {
> // some kind of error in kafka, it may happen during
> // unsubscribing or during normal processing
> if (unsubscribing) {
> getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e); }
> else {
> LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", threadId, topicName, e.getMessage());
> reConnect = true;
> }
> }
> ```
> `SerializationException` extends from `KafkaException`, but it is definitely not a recoverable exception.
> It logs with debug level, which makes it hard to track, there are SO many things logging in debug.
> It it cannot be handled by any camel exception handling mechanism.
> I think it would be better to either:
> - change that catch so that it pinpoints the subclasses of `KafkaException` that are actually recoverable from rejoining (maybe `WakeupException` and a couple others)
> - add a `catch` block for `SerializationException` and maybe `ConfigException` and `OAuthBearerConfigException` before, with a log error andallow the user to handle those exceptions
> - remove that catch block entirely and let users handle any KafkaException however they see fit.
> Thank you
--
This message was sent by Atlassian Jira
(v8.3.4#803005)