You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2021/04/16 09:54:42 UTC

[camel] 01/02: CAMEL-16500: Fix autowire of KafkaClientFactory

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5ae05407c02dbcf06db87585672187c3e4c05bdb
Author: James Netherton <ja...@gmail.com>
AuthorDate: Fri Apr 16 08:26:07 2021 +0100

    CAMEL-16500: Fix autowire of KafkaClientFactory
---
 .../camel/component/kafka/KafkaComponent.java      |  2 +-
 .../camel/component/kafka/KafkaConsumer.java       |  4 +--
 .../camel/component/kafka/KafkaEndpoint.java       | 16 +++++++++
 .../camel/component/kafka/KafkaProducer.java       |  4 +--
 .../camel/component/kafka/KafkaAutowireTest.java   | 42 ++++++++++++++++++++++
 .../camel/component/kafka/KafkaConsumerTest.java   |  1 +
 .../camel/component/kafka/KafkaProducerTest.java   |  5 +++
 7 files changed, 69 insertions(+), 5 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c39200b..bfa944f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -36,7 +36,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     @Metadata(label = "consumer,advanced")
     private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
     @Metadata(autowired = true, label = "advanced")
-    private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
+    private KafkaClientFactory kafkaClientFactory;
     @Metadata(autowired = true, label = "consumer,advanced")
     private PollExceptionStrategy pollExceptionStrategy;
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 9a97dde..3336f19 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -95,7 +95,7 @@ public class KafkaConsumer extends DefaultConsumer {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        String brokers = endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
         if (brokers != null) {
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
@@ -255,7 +255,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                 // this may throw an exception if something is wrong with kafka
                 // consumer
-                this.consumer = endpoint.getComponent().getKafkaClientFactory().getConsumer(kafkaProps);
+                this.consumer = endpoint.getKafkaClientFactory().getConsumer(kafkaProps);
             } finally {
                 Thread.currentThread().setContextClassLoader(threadClassLoader);
             }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index c2466f7..eb20f41 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -50,6 +50,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     private static final String CALLBACK_HANDLER_CLASS_CONFIG = "sasl.login.callback.handler.class";
 
+    private KafkaClientFactory kafkaClientFactory;
+
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
 
@@ -73,6 +75,20 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         this.configuration = configuration;
     }
 
+    public KafkaClientFactory getKafkaClientFactory() {
+        return this.kafkaClientFactory;
+    }
+
+    @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+
+        kafkaClientFactory = getComponent().getKafkaClientFactory();
+        if (kafkaClientFactory == null) {
+            kafkaClientFactory = new DefaultKafkaClientFactory();
+        }
+    }
+
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         KafkaConsumer consumer = new KafkaConsumer(this, processor);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index f720f2b..7b73036 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -70,7 +70,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Properties props = endpoint.getConfiguration().createProducerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        String brokers = endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
         if (brokers != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
@@ -111,7 +111,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 Thread.currentThread()
                         .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
                 LOG.trace("Creating KafkaProducer");
-                kafkaProducer = endpoint.getComponent().getKafkaClientFactory().getProducer(props);
+                kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props);
                 closeKafkaProducer = true;
             } finally {
                 Thread.currentThread().setContextClassLoader(threadClassLoader);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaAutowireTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaAutowireTest.java
new file mode 100644
index 0000000..81bf1fb
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaAutowireTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+public class KafkaAutowireTest extends CamelTestSupport {
+
+    @BindToRegistry
+    KafkaClientFactory clientFactory = new TestKafkaClientFactory();
+
+    @Test
+    public void testKafkaComponentAutowiring() {
+        KafkaComponent component = context.getComponent("kafka", KafkaComponent.class);
+        assertSame(clientFactory, component.getKafkaClientFactory());
+
+        KafkaEndpoint endpoint = context.getEndpoint("kafka:foo", KafkaEndpoint.class);
+        assertSame(clientFactory, endpoint.getKafkaClientFactory());
+    }
+
+    static final class TestKafkaClientFactory extends DefaultKafkaClientFactory {
+
+    }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 844bfe8..d523240 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -45,6 +45,7 @@ public class KafkaConsumerTest {
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
+        when(endpoint.getKafkaClientFactory()).thenReturn(clientFactory);
         when(component.getKafkaClientFactory()).thenReturn(clientFactory);
         when(clientFactory.getBrokers(any())).thenThrow(new IllegalArgumentException());
         final KafkaConsumer kafkaConsumer = new KafkaConsumer(endpoint, processor);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 765c561..cdd7284 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -79,9 +79,14 @@ public class KafkaProducerTest {
         kafka.init();
 
         endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap());
+        endpoint.doBuild();
+        assertTrue(endpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory);
+
         producer = new KafkaProducer(endpoint);
 
         fromEndpoint = kafka.createEndpoint("kafka:fromtopic", "fromtopic", new HashMap());
+        fromEndpoint.doBuild();
+        assertTrue(fromEndpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory);
 
         RecordMetadata rm = new RecordMetadata(null, 0, 0, 0, 0L, 0, 0);
         Future future = Mockito.mock(Future.class);