You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/03 11:24:31 UTC

[camel] branch master updated: CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 46c57a5e CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
46c57a5e is described below

commit 46c57a5e672984df028f55ec3f16e25916df80ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jan 3 11:52:42 2018 +0100

    CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
---
 .../camel/component/kafka/KafkaConsumer.java       | 34 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 769a5b8..72310e5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -111,6 +111,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
             KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
+            // pre-initialize task during startup so if there is any error we have it thrown asap
+            task.preInit();
             executor.submit(task);
             tasks.add(task);
         }
@@ -158,15 +160,14 @@ public class KafkaConsumer extends DefaultConsumer {
             boolean reConnect = true;
 
             while (reConnect) {
-
-                // create consumer
-                ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
                 try {
-                    // Kafka uses reflection for loading authentication settings, use its classloader
-                    Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
-                    this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
-                } finally {
-                    Thread.currentThread().setContextClassLoader(threadClassLoader);
+                    if (!first) {
+                        // re-initialize on re-connect so we have a fresh consumer
+                        doInit();
+                    }
+                } catch (Throwable e) {
+                    // ensure this is logged so users can see the problem
+                    log.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due " + e.getMessage(), e);
                 }
 
                 if (!first) {
@@ -187,6 +188,23 @@ public class KafkaConsumer extends DefaultConsumer {
             }
         }
 
+        void preInit() {
+            doInit();
+        }
+
+        protected void doInit() {
+            // create consumer
+            ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+            try {
+                // Kafka uses reflection for loading authentication settings, use its classloader
+                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+                // this may throw an exception if something is wrong with kafka consumer
+                this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+            } finally {
+                Thread.currentThread().setContextClassLoader(threadClassLoader);
+            }
+        }
+
         @SuppressWarnings("unchecked")
         protected boolean doRun() {
             // allow to re-connect thread in case we use that to retry failed messages

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].