You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Joseph M'BIMBI-BENE <jo...@gmail.com> on 2020/04/26 18:45:07 UTC
Kafka component error handling - consumer keeps leaving and rejoining
the group
Hello everyone,
I'm having a problem with the Kafka component:
When the kafka consumer can't read a message (caused by some avro errors
after investigation), it continuously leaves the group and joins again.
I would like it to just throw an exception and let me decide how to handle
it: dlq, ignore, etc.
I configured the parameter `bridgeErrorHandler` but ot no avail. The
behaviour is still the same
Am i doing something wrong? Please help. Thank you
----------
Here is the route definition :
@Component
public class CamelConfiguration extends RouteBuilder {
@Override
public void configure() throws Exception {
LocalDateTime now = LocalDateTime.now();
String kafkaCamelUri = String.format("kafka:cont_hist" +
"?brokers={{bootstrap-servers}}" +
"&schemaRegistryURL=http://localhost:8081" +
"&specificAvroReader=true" +
"&bridgeErrorHandler=true" +
"&keyDeserializer=%s" +
"&valueDeserializer=%s",
StringDeserializer.class.getName(),
KafkaAvroDeserializer.class.getName());
from(kafkaCamelUri)
.errorHandler(defaultErrorHandler().disableRedelivery())
.to("log:coucou")
.to("sql-stored:classpath:procstoc.sql" +
"?outputHeader=outError"
)
.to("log:output")
.log("coucou ${headers.outError}");
}
}
-----------
And here are some log excerpts :
2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
to topic cont_hist after 5000 ms
2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
to topic cont_hist
2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
cont_hist
2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
yyC1KuR2Sv2BVVRNLdTnsg
2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
partitions []
2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
with generation 19
2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
partitions: cont_hist-0
2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
cont_hist-0 to the committed offset FetchPosition{offset=4,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}
2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
to coordinator localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [http://localhost:9092]
[...]
2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig : The configuration
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
config.
2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294
2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
to topic cont_hist after 5000 ms
2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
to topic cont_hist
2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
cont_hist
2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
yyC1KuR2Sv2BVVRNLdTnsg
2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
partitions []
2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
with generation 21
2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
partitions: cont_hist-0
2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
cont_hist-0 to the committed offset FetchPosition{offset=4,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}
2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
to coordinator localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
Re: Kafka component error handling - consumer keeps leaving and
rejoining the group
Posted by Claus Ibsen <cl...@gmail.com>.
Hi
Thanks for all your findings, this is great insight. You are surely
welcome to create a JIRA ticket about this bug.
We can then together work on a fix, and you can help test it.
Yeah it seems catching KafkaException is maybe too wide. And lets see
if we also can incorporate bridge error handler.
On Mon, Apr 27, 2020 at 10:06 AM Joseph M'BIMBI-BENE
<jo...@gmail.com> wrote:
>
> I also realize that the property "bridgeErrorHandler" seems to never be
> used, while another one, like "breakOnFirstError".
>
> Also, going back to the exception handling, at least a couple other
> subclasses of KafkaException would deserve not to be retried on. Just a few
> examples:
>
>
> - ConfigException: "Thrown if the user supplies an invalid
> configuration" -> a retry will not solve that
> - OAuthBearerConfigException: "Exception thrown when there is a problem
> with the configuration (an invalid option in a JAAS config, for example)" :
> this one seems to fall under the same category
> - and obviously the serializationException
>
>
>
> On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <jo...@gmail.com>
> wrote:
>
> > digging into the code (version 3.2.0 i repeat),
> >
> > i can see in the class
> > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> > lign 406:
> >
> > ```
> > catch (KafkaException e) {
> > // some kind of error in kafka, it may happen during
> > // unsubscribing or during normal processing
> > if (unsubscribing) {
> > getExceptionHandler().handleException("Error unsubscribing " +
> > threadId + " from kafka topic " + topicName, e);
> > } else {
> > LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> > attempt to re-connect on next run", threadId, topicName, e.getMessage());
> > reConnect = true;
> > }
> > }
> > ```
> >
> > A `SerializationException` occurs, which extends KafkaException.
> > It definitely is not normal processing. And logging with debug level hides
> > the true cause.
> > I guess one would have to narrow down the classes of exception to be
> > caught in that catch clause, or as a quickfix, explicitly catch the
> > serializationException.
> >
> > How to proceed ?
> > I am not super familiar with Camel, and overall with open source
> > contributions ^^.
> >
> > Do i just open a ticket in some bug tracker, github maybe ?
> > Do you want me to open a pull request ?
> >
> > I am toying with camel for a couple of weeks now, i would like to
> > introduce it in the projects i work with. But i am by no mean a camel guru,
> >
> > And this bug might be a showstopper, so i would like to help fix it
> >
> > Thank you
> >
> > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <jo...@gmail.com>
> > wrote:
> >
> >> I forgot to tell i am using version 3.2.0
> >>
> >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
> >> joseph.mbimbi@gmail.com> wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> I'm having a problem with the Kafka component:
> >>> When the kafka consumer can't read a message (caused by some avro errors
> >>> after investigation), it continuously leaves the group and joins again.
> >>>
> >>> I would like it to just throw an exception and let me decide how to
> >>> handle it: dlq, ignore, etc.
> >>>
> >>> I configured the parameter `bridgeErrorHandler` but ot no avail. The
> >>> behaviour is still the same
> >>>
> >>> Am i doing something wrong? Please help. Thank you
> >>>
> >>> ----------
> >>>
> >>> Here is the route definition :
> >>>
> >>> @Component
> >>> public class CamelConfiguration extends RouteBuilder {
> >>>
> >>> @Override
> >>> public void configure() throws Exception {
> >>> LocalDateTime now = LocalDateTime.now();
> >>> String kafkaCamelUri = String.format("kafka:cont_hist" +
> >>> "?brokers={{bootstrap-servers}}" +
> >>> "&schemaRegistryURL=http://localhost:8081" +
> >>> "&specificAvroReader=true" +
> >>> "&bridgeErrorHandler=true" +
> >>> "&keyDeserializer=%s" +
> >>> "&valueDeserializer=%s",
> >>> StringDeserializer.class.getName(),
> >>> KafkaAvroDeserializer.class.getName());
> >>> from(kafkaCamelUri)
> >>> .errorHandler(defaultErrorHandler().disableRedelivery())
> >>> .to("log:coucou")
> >>> .to("sql-stored:classpath:procstoc.sql" +
> >>> "?outputHeader=outError"
> >>> )
> >>> .to("log:output")
> >>> .log("coucou ${headers.outError}");
> >>> }
> >>>
> >>> }
> >>>
> >>> -----------
> >>>
> >>> And here are some log excerpts :
> >>>
> >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
> >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
> >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
> >>> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
> >>> to topic cont_hist after 5000 ms
> >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
> >>> to topic cont_hist
> >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> >>> cont_hist
> >>> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]]
> >>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> >>> yyC1KuR2Sv2BVVRNLdTnsg
> >>> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> >>> localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> >>> partitions []
> >>> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> >>> with generation 19
> >>> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> >>> partitions: cont_hist-0
> >>> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> >>> cont_hist-0 to the committed offset FetchPosition{offset=4,
> >>> offsetEpoch=Optional.empty,
> >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> >>> epoch=0}}
> >>> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> >>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
> >>> to coordinator localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
> >>> allow.auto.create.topics = true
> >>> auto.commit.interval.ms = 5000
> >>> auto.offset.reset = latest
> >>> bootstrap.servers = [http://localhost:9092]
> >>>
> >>> [...]
> >>>
> >>> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.ConsumerConfig : The configuration
> >>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
> >>> config.
> >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
> >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
> >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294
> >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
> >>> to topic cont_hist after 5000 ms
> >>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
> >>> to topic cont_hist
> >>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> >>> cont_hist
> >>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
> >>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> >>> yyC1KuR2Sv2BVVRNLdTnsg
> >>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> >>> localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> >>> partitions []
> >>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> >>> with generation 21
> >>> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> >>> partitions: cont_hist-0
> >>> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> >>> cont_hist-0 to the committed offset FetchPosition{offset=4,
> >>> offsetEpoch=Optional.empty,
> >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> >>> epoch=0}}
> >>> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> >>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
> >>> to coordinator localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
> >>> allow.auto.create.topics = true
> >>>
> >>
--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
Re: Kafka component error handling - consumer keeps leaving and
rejoining the group
Posted by Joseph M'BIMBI-BENE <jo...@gmail.com>.
I also realize that the property "bridgeErrorHandler" seems to never be
used, while another one, like "breakOnFirstError".
Also, going back to the exception handling, at least a couple other
subclasses of KafkaException would deserve not to be retried on. Just a few
examples:
- ConfigException: "Thrown if the user supplies an invalid
configuration" -> a retry will not solve that
- OAuthBearerConfigException: "Exception thrown when there is a problem
with the configuration (an invalid option in a JAAS config, for example)" :
this one seems to fall under the same category
- and obviously the serializationException
On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <jo...@gmail.com>
wrote:
> digging into the code (version 3.2.0 i repeat),
>
> i can see in the class
> `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> lign 406:
>
> ```
> catch (KafkaException e) {
> // some kind of error in kafka, it may happen during
> // unsubscribing or during normal processing
> if (unsubscribing) {
> getExceptionHandler().handleException("Error unsubscribing " +
> threadId + " from kafka topic " + topicName, e);
> } else {
> LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
> reConnect = true;
> }
> }
> ```
>
> A `SerializationException` occurs, which extends KafkaException.
> It definitely is not normal processing. And logging with debug level hides
> the true cause.
> I guess one would have to narrow down the classes of exception to be
> caught in that catch clause, or as a quickfix, explicitly catch the
> serializationException.
>
> How to proceed ?
> I am not super familiar with Camel, and overall with open source
> contributions ^^.
>
> Do i just open a ticket in some bug tracker, github maybe ?
> Do you want me to open a pull request ?
>
> I am toying with camel for a couple of weeks now, i would like to
> introduce it in the projects i work with. But i am by no mean a camel guru,
>
> And this bug might be a showstopper, so i would like to help fix it
>
> Thank you
>
> On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <jo...@gmail.com>
> wrote:
>
>> I forgot to tell i am using version 3.2.0
>>
>> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
>> joseph.mbimbi@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> I'm having a problem with the Kafka component:
>>> When the kafka consumer can't read a message (caused by some avro errors
>>> after investigation), it continuously leaves the group and joins again.
>>>
>>> I would like it to just throw an exception and let me decide how to
>>> handle it: dlq, ignore, etc.
>>>
>>> I configured the parameter `bridgeErrorHandler` but ot no avail. The
>>> behaviour is still the same
>>>
>>> Am i doing something wrong? Please help. Thank you
>>>
>>> ----------
>>>
>>> Here is the route definition :
>>>
>>> @Component
>>> public class CamelConfiguration extends RouteBuilder {
>>>
>>> @Override
>>> public void configure() throws Exception {
>>> LocalDateTime now = LocalDateTime.now();
>>> String kafkaCamelUri = String.format("kafka:cont_hist" +
>>> "?brokers={{bootstrap-servers}}" +
>>> "&schemaRegistryURL=http://localhost:8081" +
>>> "&specificAvroReader=true" +
>>> "&bridgeErrorHandler=true" +
>>> "&keyDeserializer=%s" +
>>> "&valueDeserializer=%s",
>>> StringDeserializer.class.getName(),
>>> KafkaAvroDeserializer.class.getName());
>>> from(kafkaCamelUri)
>>> .errorHandler(defaultErrorHandler().disableRedelivery())
>>> .to("log:coucou")
>>> .to("sql-stored:classpath:procstoc.sql" +
>>> "?outputHeader=outError"
>>> )
>>> .to("log:output")
>>> .log("coucou ${headers.outError}");
>>> }
>>>
>>> }
>>>
>>> -----------
>>>
>>> And here are some log excerpts :
>>>
>>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
>>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
>>> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>>> cont_hist
>>> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]]
>>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>>> yyC1KuR2Sv2BVVRNLdTnsg
>>> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>>> localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>>> partitions []
>>> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>>> with generation 19
>>> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>>> partitions: cont_hist-0
>>> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>>> offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>>> epoch=0}}
>>> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
>>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
>>> allow.auto.create.topics = true
>>> auto.commit.interval.ms = 5000
>>> auto.offset.reset = latest
>>> bootstrap.servers = [http://localhost:9092]
>>>
>>> [...]
>>>
>>> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig : The configuration
>>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
>>> config.
>>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
>>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294
>>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>>> cont_hist
>>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
>>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>>> yyC1KuR2Sv2BVVRNLdTnsg
>>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>>> localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>>> partitions []
>>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>>> with generation 21
>>> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>>> partitions: cont_hist-0
>>> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>>> offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>>> epoch=0}}
>>> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
>>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
>>> allow.auto.create.topics = true
>>>
>>
Re: Kafka component error handling - consumer keeps leaving and
rejoining the group
Posted by Joseph M'BIMBI-BENE <jo...@gmail.com>.
digging into the code (version 3.2.0 i repeat),
i can see in the class
`org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
lign 406:
```
catch (KafkaException e) {
// some kind of error in kafka, it may happen during
// unsubscribing or during normal processing
if (unsubscribing) {
getExceptionHandler().handleException("Error unsubscribing " + threadId
+ " from kafka topic " + topicName, e);
} else {
LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
attempt to re-connect on next run", threadId, topicName, e.getMessage());
reConnect = true;
}
}
```
A `SerializationException` occurs, which extends KafkaException.
It definitely is not normal processing. And logging with debug level hides
the true cause.
I guess one would have to narrow down the classes of exception to be caught
in that catch clause, or as a quickfix, explicitly catch the
serializationException.
How to proceed ?
I am not super familiar with Camel, and overall with open source
contributions ^^.
Do i just open a ticket in some bug tracker, github maybe ?
Do you want me to open a pull request ?
I am toying with camel for a couple of weeks now, i would like to introduce
it in the projects i work with. But i am by no mean a camel guru,
And this bug might be a showstopper, so i would like to help fix it
Thank you
On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <jo...@gmail.com>
wrote:
> I forgot to tell i am using version 3.2.0
>
> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <jo...@gmail.com>
> wrote:
>
>> Hello everyone,
>>
>> I'm having a problem with the Kafka component:
>> When the kafka consumer can't read a message (caused by some avro errors
>> after investigation), it continuously leaves the group and joins again.
>>
>> I would like it to just throw an exception and let me decide how to
>> handle it: dlq, ignore, etc.
>>
>> I configured the parameter `bridgeErrorHandler` but ot no avail. The
>> behaviour is still the same
>>
>> Am i doing something wrong? Please help. Thank you
>>
>> ----------
>>
>> Here is the route definition :
>>
>> @Component
>> public class CamelConfiguration extends RouteBuilder {
>>
>> @Override
>> public void configure() throws Exception {
>> LocalDateTime now = LocalDateTime.now();
>> String kafkaCamelUri = String.format("kafka:cont_hist" +
>> "?brokers={{bootstrap-servers}}" +
>> "&schemaRegistryURL=http://localhost:8081" +
>> "&specificAvroReader=true" +
>> "&bridgeErrorHandler=true" +
>> "&keyDeserializer=%s" +
>> "&valueDeserializer=%s",
>> StringDeserializer.class.getName(),
>> KafkaAvroDeserializer.class.getName());
>> from(kafkaCamelUri)
>> .errorHandler(defaultErrorHandler().disableRedelivery())
>> .to("log:coucou")
>> .to("sql-stored:classpath:procstoc.sql" +
>> "?outputHeader=outError"
>> )
>> .to("log:output")
>> .log("coucou ${headers.outError}");
>> }
>>
>> }
>>
>> -----------
>>
>> And here are some log excerpts :
>>
>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
>> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>> with generation 19
>> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>> partitions: cont_hist-0
>> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>> offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>> epoch=0}}
>> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
>> allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [http://localhost:9092]
>>
>> [...]
>>
>> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig : The configuration
>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
>> config.
>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294
>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>> with generation 21
>> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>> partitions: cont_hist-0
>> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>> offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>> epoch=0}}
>> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
>> allow.auto.create.topics = true
>>
>
Re: Kafka component error handling - consumer keeps leaving and
rejoining the group
Posted by Joseph M'BIMBI-BENE <jo...@gmail.com>.
I forgot to tell i am using version 3.2.0
On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <jo...@gmail.com>
wrote:
> Hello everyone,
>
> I'm having a problem with the Kafka component:
> When the kafka consumer can't read a message (caused by some avro errors
> after investigation), it continuously leaves the group and joins again.
>
> I would like it to just throw an exception and let me decide how to handle
> it: dlq, ignore, etc.
>
> I configured the parameter `bridgeErrorHandler` but ot no avail. The
> behaviour is still the same
>
> Am i doing something wrong? Please help. Thank you
>
> ----------
>
> Here is the route definition :
>
> @Component
> public class CamelConfiguration extends RouteBuilder {
>
> @Override
> public void configure() throws Exception {
> LocalDateTime now = LocalDateTime.now();
> String kafkaCamelUri = String.format("kafka:cont_hist" +
> "?brokers={{bootstrap-servers}}" +
> "&schemaRegistryURL=http://localhost:8081" +
> "&specificAvroReader=true" +
> "&bridgeErrorHandler=true" +
> "&keyDeserializer=%s" +
> "&valueDeserializer=%s",
> StringDeserializer.class.getName(),
> KafkaAvroDeserializer.class.getName());
> from(kafkaCamelUri)
> .errorHandler(defaultErrorHandler().disableRedelivery())
> .to("log:coucou")
> .to("sql-stored:classpath:procstoc.sql" +
> "?outputHeader=outError"
> )
> .to("log:output")
> .log("coucou ${headers.outError}");
> }
>
> }
>
> -----------
>
> And here are some log excerpts :
>
> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
> to topic cont_hist after 5000 ms
> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
> to topic cont_hist
> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> cont_hist
> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]]
> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> yyC1KuR2Sv2BVVRNLdTnsg
> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> partitions []
> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> with generation 19
> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> partitions: cont_hist-0
> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> cont_hist-0 to the committed offset FetchPosition{offset=4,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> epoch=0}}
> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
> to coordinator localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = latest
> bootstrap.servers = [http://localhost:9092]
>
> [...]
>
> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.ConsumerConfig : The configuration
> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
> config.
> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294
> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0
> to topic cont_hist after 5000 ms
> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0
> to topic cont_hist
> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> cont_hist
> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> yyC1KuR2Sv2BVVRNLdTnsg
> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> partitions []
> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> with generation 21
> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> partitions: cont_hist-0
> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> cont_hist-0 to the committed offset FetchPosition{offset=4,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> epoch=0}}
> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
> to coordinator localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
> allow.auto.create.topics = true
>