You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2021/04/16 09:54:42 UTC
[camel] 01/02: CAMEL-16500: Fix autowire of KafkaClientFactory
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5ae05407c02dbcf06db87585672187c3e4c05bdb
Author: James Netherton <ja...@gmail.com>
AuthorDate: Fri Apr 16 08:26:07 2021 +0100
CAMEL-16500: Fix autowire of KafkaClientFactory
---
.../camel/component/kafka/KafkaComponent.java | 2 +-
.../camel/component/kafka/KafkaConsumer.java | 4 +--
.../camel/component/kafka/KafkaEndpoint.java | 16 +++++++++
.../camel/component/kafka/KafkaProducer.java | 4 +--
.../camel/component/kafka/KafkaAutowireTest.java | 42 ++++++++++++++++++++++
.../camel/component/kafka/KafkaConsumerTest.java | 1 +
.../camel/component/kafka/KafkaProducerTest.java | 5 +++
7 files changed, 69 insertions(+), 5 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c39200b..bfa944f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -36,7 +36,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
@Metadata(label = "consumer,advanced")
private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
@Metadata(autowired = true, label = "advanced")
- private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
+ private KafkaClientFactory kafkaClientFactory;
@Metadata(autowired = true, label = "consumer,advanced")
private PollExceptionStrategy pollExceptionStrategy;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 9a97dde..3336f19 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -95,7 +95,7 @@ public class KafkaConsumer extends DefaultConsumer {
Properties props = endpoint.getConfiguration().createConsumerProperties();
endpoint.updateClassProperties(props);
- String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+ String brokers = endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
if (brokers != null) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
}
@@ -255,7 +255,7 @@ public class KafkaConsumer extends DefaultConsumer {
.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
// this may throw an exception if something is wrong with kafka
// consumer
- this.consumer = endpoint.getComponent().getKafkaClientFactory().getConsumer(kafkaProps);
+ this.consumer = endpoint.getKafkaClientFactory().getConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index c2466f7..eb20f41 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -50,6 +50,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
private static final String CALLBACK_HANDLER_CLASS_CONFIG = "sasl.login.callback.handler.class";
+ private KafkaClientFactory kafkaClientFactory;
+
@UriParam
private KafkaConfiguration configuration = new KafkaConfiguration();
@@ -73,6 +75,20 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
this.configuration = configuration;
}
+ public KafkaClientFactory getKafkaClientFactory() {
+ return this.kafkaClientFactory;
+ }
+
+ @Override
+ protected void doBuild() throws Exception {
+ super.doBuild();
+
+ kafkaClientFactory = getComponent().getKafkaClientFactory();
+ if (kafkaClientFactory == null) {
+ kafkaClientFactory = new DefaultKafkaClientFactory();
+ }
+ }
+
@Override
public Consumer createConsumer(Processor processor) throws Exception {
KafkaConsumer consumer = new KafkaConsumer(this, processor);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index f720f2b..7b73036 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -70,7 +70,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
Properties props = endpoint.getConfiguration().createProducerProperties();
endpoint.updateClassProperties(props);
- String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+ String brokers = endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
if (brokers != null) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
}
@@ -111,7 +111,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
Thread.currentThread()
.setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
LOG.trace("Creating KafkaProducer");
- kafkaProducer = endpoint.getComponent().getKafkaClientFactory().getProducer(props);
+ kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props);
closeKafkaProducer = true;
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaAutowireTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaAutowireTest.java
new file mode 100644
index 0000000..81bf1fb
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaAutowireTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+public class KafkaAutowireTest extends CamelTestSupport {
+
+ @BindToRegistry
+ KafkaClientFactory clientFactory = new TestKafkaClientFactory();
+
+ @Test
+ public void testKafkaComponentAutowiring() {
+ KafkaComponent component = context.getComponent("kafka", KafkaComponent.class);
+ assertSame(clientFactory, component.getKafkaClientFactory());
+
+ KafkaEndpoint endpoint = context.getEndpoint("kafka:foo", KafkaEndpoint.class);
+ assertSame(clientFactory, endpoint.getKafkaClientFactory());
+ }
+
+ static final class TestKafkaClientFactory extends DefaultKafkaClientFactory {
+
+ }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 844bfe8..d523240 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -45,6 +45,7 @@ public class KafkaConsumerTest {
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(configuration);
when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
+ when(endpoint.getKafkaClientFactory()).thenReturn(clientFactory);
when(component.getKafkaClientFactory()).thenReturn(clientFactory);
when(clientFactory.getBrokers(any())).thenThrow(new IllegalArgumentException());
final KafkaConsumer kafkaConsumer = new KafkaConsumer(endpoint, processor);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 765c561..cdd7284 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -79,9 +79,14 @@ public class KafkaProducerTest {
kafka.init();
endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap());
+ endpoint.doBuild();
+ assertTrue(endpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory);
+
producer = new KafkaProducer(endpoint);
fromEndpoint = kafka.createEndpoint("kafka:fromtopic", "fromtopic", new HashMap());
+ fromEndpoint.doBuild();
+ assertTrue(fromEndpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory);
RecordMetadata rm = new RecordMetadata(null, 0, 0, 0, 0L, 0, 0);
Future future = Mockito.mock(Future.class);