You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/07 12:30:30 UTC
[camel] branch master updated: Replaces the kafka container in
camel-kafka test (#4380)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 36487d0 Replaces the kafka container in camel-kafka test (#4380)
36487d0 is described below
commit 36487d0fa4ce353a9c63dd7c5faa250e2d975d20
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Oct 7 14:30:15 2020 +0200
Replaces the kafka container in camel-kafka test (#4380)
Switches to the infra provided by camel-test-infra-kafka instead of
directly using the KafkaContainer.
This removes the need of camel-testcontainers-junit5 and allows
targetting remote kafka instances as part of the test execution.
---
components/camel-kafka/pom.xml | 21 ++++++++---
.../component/kafka/BaseEmbeddedKafkaTest.java | 42 +++++++++++-----------
test-infra/camel-test-infra-kafka/pom.xml | 6 +++-
.../infra/services/kafka/KafkaServiceFactory.java | 8 ++---
4 files changed, 47 insertions(+), 30 deletions(-)
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 7af053b..f2b5f31 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -51,15 +51,27 @@
<!-- test -->
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-testcontainers-junit5</artifactId>
+ <artifactId>camel-test-infra-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-kafka</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Required by the admin client-->
<dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>${testcontainers-version}</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
@@ -92,7 +104,6 @@
<version>${commons-lang3-version}</version>
<scope>test</scope>
</dependency>
-
</dependencies>
<profiles>
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 7aeb29d..e859cab 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -19,43 +19,44 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
import org.apache.camel.CamelContext;
+import org.apache.camel.test.infra.services.kafka.KafkaService;
+import org.apache.camel.test.infra.services.kafka.KafkaServiceFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
public abstract class BaseEmbeddedKafkaTest extends CamelTestSupport {
- protected static AdminClient kafkaAdminClient;
+ @RegisterExtension
+ public static KafkaService service = KafkaServiceFactory.createService();
- private static final String CONFLUENT_PLATFORM_VERSION = "6.0.0";
+ protected static AdminClient kafkaAdminClient;
private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
- protected static KafkaContainer kafkaBroker = new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
- .withEmbeddedZookeeper()
- .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181")
- .waitingFor(Wait.forListeningPort());
-
- static {
- kafkaBroker.start();
- kafkaAdminClient = createAdminClient();
- }
-
@BeforeAll
public static void beforeClass() {
- LOG.info("### Embedded Kafka cluster broker list: " + kafkaBroker.getBootstrapServers());
+ LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
+ System.setProperty("bootstrapServers", service.getBootstrapServers());
+ }
+
+ @BeforeEach
+ public void setKafkaAdminClient() {
+ if (kafkaAdminClient == null) {
+ kafkaAdminClient = createAdminClient();
+ }
}
protected Properties getDefaultProperties() {
- LOG.info("Connecting to Kafka {}", kafkaBroker.getBootstrapServers());
+ LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBootstrapServers());
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
@@ -70,19 +71,20 @@ public abstract class BaseEmbeddedKafkaTest extends CamelTestSupport {
KafkaComponent kafka = new KafkaComponent(context);
kafka.init();
- kafka.getConfiguration().setBrokers(kafkaBroker.getBootstrapServers());
+ kafka.getConfiguration().setBrokers(service.getBootstrapServers());
context.addComponent("kafka", kafka);
return context;
}
+ @Deprecated
protected static String getBootstrapServers() {
- return kafkaBroker.getBootstrapServers();
+ return service.getBootstrapServers();
}
private static AdminClient createAdminClient() {
final Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBootstrapServers());
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
return KafkaAdminClient.create(properties);
}
diff --git a/test-infra/camel-test-infra-kafka/pom.xml b/test-infra/camel-test-infra-kafka/pom.xml
index 495d825..761a6be 100644
--- a/test-infra/camel-test-infra-kafka/pom.xml
+++ b/test-infra/camel-test-infra-kafka/pom.xml
@@ -43,8 +43,12 @@
<dependency>
<groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
- <scope>test</scope>
</dependency>
</dependencies>
diff --git a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/services/kafka/KafkaServiceFactory.java b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/services/kafka/KafkaServiceFactory.java
index 9f544de..c554184 100644
--- a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/services/kafka/KafkaServiceFactory.java
+++ b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/services/kafka/KafkaServiceFactory.java
@@ -30,12 +30,12 @@ public final class KafkaServiceFactory {
public static KafkaService createService() {
String kafkaInstanceType = System.getProperty("kafka.instance.type");
- if (kafkaInstanceType.equals("local-strimzi-container")) {
- return new StrimziService();
+ if (kafkaInstanceType == null || kafkaInstanceType.isEmpty() || kafkaInstanceType.equals("local-kafka-container")) {
+ return new ContainerLocalKafkaService();
}
- if (kafkaInstanceType.equals("local-kafka-container")) {
- return new ContainerLocalKafkaService();
+ if (kafkaInstanceType.equals("local-strimzi-container")) {
+ return new StrimziService();
}
if (kafkaInstanceType.equals("remote")) {