You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/03 11:24:31 UTC
[camel] branch master updated: CAMEL-12110: camel-kafka consumer
swallows exception if error creating KafkaConsumer.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 46c57a5e CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
46c57a5e is described below
commit 46c57a5e672984df028f55ec3f16e25916df80ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jan 3 11:52:42 2018 +0100
CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
---
.../camel/component/kafka/KafkaConsumer.java | 34 +++++++++++++++++-----
1 file changed, 26 insertions(+), 8 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 769a5b8..72310e5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -111,6 +111,8 @@ public class KafkaConsumer extends DefaultConsumer {
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
+ // pre-initialize task during startup so if there is any error we have it thrown asap
+ task.preInit();
executor.submit(task);
tasks.add(task);
}
@@ -158,15 +160,14 @@ public class KafkaConsumer extends DefaultConsumer {
boolean reConnect = true;
while (reConnect) {
-
- // create consumer
- ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
try {
- // Kafka uses reflection for loading authentication settings, use its classloader
- Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
- this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
- } finally {
- Thread.currentThread().setContextClassLoader(threadClassLoader);
+ if (!first) {
+ // re-initialize on re-connect so we have a fresh consumer
+ doInit();
+ }
+ } catch (Throwable e) {
+ // ensure this is logged so users can see the problem
+ log.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due " + e.getMessage(), e);
}
if (!first) {
@@ -187,6 +188,23 @@ public class KafkaConsumer extends DefaultConsumer {
}
}
+ void preInit() {
+ doInit();
+ }
+
+ protected void doInit() {
+ // create consumer
+ ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ // Kafka uses reflection for loading authentication settings, use its classloader
+ Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+ // this may throw an exception if something is wrong with kafka consumer
+ this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+ } finally {
+ Thread.currentThread().setContextClassLoader(threadClassLoader);
+ }
+ }
+
@SuppressWarnings("unchecked")
protected boolean doRun() {
// allow to re-connect thread in case we use that to retry failed messages
--
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].