You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Ramu (Jira)" <ji...@apache.org> on 2020/06/29 03:16:00 UTC

[jira] [Commented] (CAMEL-14980) camel-kafka - SerializationException - consumer keeps leaving and rejoining the group

    [ https://issues.apache.org/jira/browse/CAMEL-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147518#comment-17147518 ] 

Ramu commented on CAMEL-14980:
------------------------------

we can get rid of these type of issues by implementing deadletterqueue for kafka .

we can implement a failure-strategy  similar to kafka-connect

failure-strategy  : Specify the failure strategy to apply when a message produced from a record is nacked. Values can be fail (default), ignore, or dead-letter-queue

Type: string

supports 3 strategies:

fail - fail the application, no more records will be processed. (default) The offset of the record that has not been processed correctly is not committed.

ignore - the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed.

dead-letter-queue - the offset of the record that has not been processed correctly is committed, but the record is written to a (Kafka) dead letter topic.

The strategy is selected using the failure-strategy attribute.

In the case of dead-letter-queue, you can configure the following attributes:

dead-letter-queue.topic: the topic to use to write the records not processed correctly, default is dead-letter-topic-$channel, with $channel being the name of the channel.

dead-letter-queue.key.serializer: the serializer used to write the record key on the dead letter queue. By default, it deduces the serializer from the key deserializer.

dead-letter-queue.value.serializer: the serializer used to write the record value on the dead letter queue. By default, it deduces the serializer from the value deserializer.

The record written on the dead letter queue contains the dead-letter-reason header with the nack reason (message from the exception passed to the nack method). It may also contain the dead-letter-cause with the message from the cause, if any.

your comments welcome

> camel-kafka - SerializationException - consumer keeps leaving and rejoining the group
> -------------------------------------------------------------------------------------
>
>                 Key: CAMEL-14980
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14980
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.2.0
>            Reporter: joseph m'bimbi-bene
>            Assignee: Ramu
>            Priority: Major
>             Fix For: 3.5.0
>
>         Attachments: camel-kafka-errors.txt, poc_camel_kafka.tar.gz
>
>
> Hello everyone,
>  
> I found out i few days ago that if a `SerializationException` is thrown when the consumer tries to poll messages, it will keep leaving and joining the consumer-group indefinitely and without any informative log.
>  The exception cannot either be handled by any camel exception handler.
> After some searching in the code i found out the culprit:
> {code:java}
> // org.apache.camel.component.kafka.KafkaConsumer (ligns 406-415):
> catch (KafkaException e) {
>   // some kind of error in kafka, it may happen during
>   // unsubscribing or during normal processing
>   if (unsubscribing){             
>     getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e);   
>   }else {
>     LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", threadId, topicName, e.getMessage());
>     reConnect = true;
>   }
> }
> {code}
>  
> `SerializationException` extends from `KafkaException`, but it is definitely not a recoverable exception.
> It logs with debug level, which makes it hard to track, there are SO many things logging in debug.
> It it cannot be handled by any camel exception handling mechanism.
> I think it would be better to either:
>  - change that catch so that it pinpoints the subclasses of `KafkaException` that are actually recoverable from rejoining (maybe `WakeupException` and a couple others)
>  - add a `catch` block for `SerializationException` and maybe `ConfigException` and `OAuthBearerConfigException` before, with a log error andallow the user to handle those exceptions
>  - remove that catch block entirely and let users handle any KafkaException however they see fit.
> Thank you



--
This message was sent by Atlassian Jira
(v8.3.4#803005)