You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/23 07:59:23 UTC

[camel-kafka-connector] branch master updated: Switches the Kafka test infra-structure

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 644a52d  Switches the Kafka test infra-structure
644a52d is described below

commit 644a52d41977d66dfd5d874451de9b0f4cb51e4f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Sep 17 18:01:55 2020 +0200

    Switches the Kafka test infra-structure
    
    Replace the current Kafka test services with the reusable ones that were moved to Camel core
---
 .../kafkaconnector/common/AbstractKafkaTest.java   |   2 +-
 .../services/kafka/ContainerLocalKafkaService.java |  43 --------
 .../services/kafka/EmbeddedKafkaService.java       |   1 +
 .../common/services/kafka/KafkaService.java        |  68 -------------
 .../common/services/kafka/KafkaServiceFactory.java |  17 +---
 .../common/services/kafka/RemoteKafkaService.java  |  40 --------
 .../common/services/kafka/StrimziContainer.java    |  66 -------------
 .../common/services/kafka/StrimziService.java      | 109 ---------------------
 .../common/services/kafka/ZookeeperContainer.java  |  53 ----------
 .../kafkaconnect/KafkaConnectEmbedded.java         |   2 +-
 .../kafkaconnect/KafkaConnectRunnerFactory.java    |   2 +-
 .../kafkaconnect/KafkaConnectRunnerService.java    |   2 +-
 tests/itests-parent/pom.xml                        |   8 ++
 13 files changed, 15 insertions(+), 398 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index db9e2d2..e876b8d 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.camel.kafkaconnector.common;
 
-import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
 import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
 import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ContainerLocalKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ContainerLocalKafkaService.java
deleted file mode 100644
index 1924dc8..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ContainerLocalKafkaService.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-public class ContainerLocalKafkaService implements KafkaService {
-    private static final Logger LOG = LoggerFactory.getLogger(ContainerLocalKafkaService.class);
-    private KafkaContainer kafka = new KafkaContainer().withEmbeddedZookeeper();
-
-    public String getBootstrapServers() {
-        return kafka.getBootstrapServers();
-    }
-
-    @Override
-    public void initialize() {
-        kafka.start();
-
-        LOG.info("Kafka bootstrap server running at address {}", kafka.getBootstrapServers());
-    }
-
-    @Override
-    public void shutdown() {
-        kafka.stop();
-    }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
index d22ed6c..caffbf1 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/EmbeddedKafkaService.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.PluginPathHelper;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.junit.jupiter.api.extension.ExtensionContext;
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaService.java
deleted file mode 100644
index e804118..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaService.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-/**
- * Provides an interface for any type of Kafka service: remote instances, local container, etc
- */
-public interface KafkaService extends BeforeAllCallback, BeforeTestExecutionCallback, AfterAllCallback, AfterTestExecutionCallback {
-
-
-    /**
-     * Gets the addresses of the bootstrap servers in the format host1:port,host2:port,etc
-     * @return
-     */
-    String getBootstrapServers();
-
-
-    /**
-     * Perform any initialization necessary
-     */
-    void initialize();
-
-    /**
-     * Shutdown the service
-     */
-    void shutdown();
-
-    @Override
-    default void beforeAll(ExtensionContext extensionContext) throws Exception {
-        initialize();
-    }
-
-    @Override
-    default void beforeTestExecution(ExtensionContext extensionContext) throws Exception {
-        //no op
-    }
-
-    @Override
-    default void afterAll(ExtensionContext extensionContext) throws Exception {
-        shutdown();
-    }
-
-    @Override
-    default void afterTestExecution(ExtensionContext context) throws Exception {
-        //no op
-    }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
index 0b671a6..c1b51c4 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.kafkaconnector.common.services.kafka;
 
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,25 +31,11 @@ public final class KafkaServiceFactory {
     public static KafkaService createService() {
         String kafkaInstanceType = System.getProperty("kafka.instance.type");
 
-
         if (kafkaInstanceType == null || kafkaInstanceType.equals("embedded")) {
             return new EmbeddedKafkaService();
         }
 
-        if (kafkaInstanceType.equals("local-strimzi-container")) {
-            return new StrimziService();
-        }
-
-        if (kafkaInstanceType.equals("local-kafka-container")) {
-            return new ContainerLocalKafkaService();
-        }
-
-        if (kafkaInstanceType.equals("remote")) {
-            return new RemoteKafkaService();
-        }
-
-        LOG.error("Kafka instance must be one of 'local-strimzi-container', 'local-kafka-container', 'embedded' or 'remote");
-        throw new UnsupportedOperationException("Invalid Kafka instance type");
+        return org.apache.camel.test.infra.kafka.services.KafkaServiceFactory.createService();
     }
 
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/RemoteKafkaService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/RemoteKafkaService.java
deleted file mode 100644
index be7460a..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/RemoteKafkaService.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RemoteKafkaService implements KafkaService {
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteKafkaService.class);
-
-    @Override
-    public String getBootstrapServers() {
-        return System.getProperty("kafka.bootstrap.servers");
-    }
-
-    @Override
-    public void initialize() {
-        LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
-    }
-
-    @Override
-    public void shutdown() {
-        // NO-OP
-    }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
deleted file mode 100644
index e877a7e..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziContainer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.wait.strategy.Wait;
-
-public class StrimziContainer extends GenericContainer<StrimziContainer> {
-    private static final String STRIMZI_CONTAINER = System.getProperty("itest.strimzi.container.image");
-    private static final int KAFKA_PORT = 9092;
-
-    public StrimziContainer(Network network, String name, String zookeeperInstanceName) {
-        super(STRIMZI_CONTAINER);
-
-        withEnv("LOG_DIR", "/tmp/logs");
-        withExposedPorts(KAFKA_PORT);
-        withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("PLAINTEXT://%s:9092", getContainerIpAddress()));
-        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092");
-        withEnv("KAFKA_ZOOKEEPER_CONNECT", zookeeperInstanceName + ":2181");
-        withNetwork(network);
-
-        withCreateContainerCmdModifier(
-            createContainerCmd -> {
-                createContainerCmd.withHostName(name);
-                createContainerCmd.withName(name);
-            }
-        );
-
-        withCommand("sh", "-c",
-                "bin/kafka-server-start.sh config/server.properties "
-                        + "--override listeners=${KAFKA_LISTENERS} "
-                        + "--override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} "
-                        + "--override zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}");
-
-        waitingFor(Wait.forListeningPort());
-    }
-
-
-
-    public int getKafkaPort() {
-        return getMappedPort(KAFKA_PORT);
-    }
-
-
-    @Override
-    public void start() {
-        addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
-        super.start();
-    }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
deleted file mode 100644
index 3ee5f9c..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Network;
-
-public class StrimziService implements KafkaService {
-    private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class);
-    private static ZookeeperContainer zookeeperContainer;
-    private static StrimziContainer strimziContainer;
-    private static String zookeeperInstanceName;
-    private static String strimziInstanceName;
-
-    public StrimziService() {
-
-        if (zookeeperContainer == null && strimziContainer == null) {
-
-            Network network = Network.newNetwork();
-
-
-            if (zookeeperContainer == null) {
-                zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100);
-                zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName);
-            }
-
-            if (strimziContainer == null) {
-                strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100);
-                strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName);
-            }
-        }
-    }
-
-    private Integer getKafkaPort() {
-        return strimziContainer.getKafkaPort();
-    }
-
-    @Override
-    public String getBootstrapServers() {
-        return strimziContainer.getContainerIpAddress() + ":" + getKafkaPort();
-    }
-
-    @Override
-    public void initialize() {
-        if (!zookeeperContainer.isRunning()) {
-            /*
-             When running multiple tests at once, this throttles the startup to give
-             time for docker to fully shutdown previously running instances (which
-             happens asynchronously). This prevents problems with false positive errors
-             such as docker complaining of multiple containers with the same name or
-             trying to reuse port numbers too quickly.
-             */
-            throttle();
-            zookeeperContainer.start();
-        }
-
-        String zookeeperConnect = zookeeperInstanceName + ":" + zookeeperContainer.getZookeeperPort();
-        LOG.info("Apache Zookeeper running at address {}", zookeeperConnect);
-
-        if (!strimziContainer.isRunning()) {
-            strimziContainer.start();
-        }
-
-        LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers());
-    }
-
-    private void throttle() {
-        try {
-            String throttleDelay = System.getProperty("itest.strimzi.throttle.delay", "10000");
-            Thread.sleep(Integer.parseInt(throttleDelay));
-        } catch (InterruptedException e) {
-            LOG.warn("Strimzi startup interrupted");
-        }
-    }
-
-    private boolean stopped() {
-        return !strimziContainer.isRunning() && !zookeeperContainer.isRunning();
-    }
-
-    @Override
-    public void shutdown() {
-        try {
-            LOG.info("Stopping Kafka container");
-            strimziContainer.stop();
-        } finally {
-            LOG.info("Stopping Zookeeper container");
-            zookeeperContainer.stop();
-
-            TestUtils.waitFor(this::stopped);
-        }
-    }
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
deleted file mode 100644
index 4307e38..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/ZookeeperContainer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.common.services.kafka;
-
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.wait.strategy.Wait;
-
-public class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
-    private static final String ZOOKEEPER_CONTAINER = System.getProperty("itest.zookeeper.container.image");
-    private static final int ZOOKEEPER_PORT = 2181;
-
-    public ZookeeperContainer(Network network, String name) {
-        super(ZOOKEEPER_CONTAINER);
-
-        withEnv("LOG_DIR", "/tmp/logs");
-        withExposedPorts(ZOOKEEPER_PORT);
-        withNetwork(network);
-
-        withCreateContainerCmdModifier(
-            createContainerCmd -> {
-                createContainerCmd.withHostName(name);
-                createContainerCmd.withName(name);
-            }
-        );
-
-        withCommand("sh", "-c",
-                "bin/zookeeper-server-start.sh config/zookeeper.properties");
-
-        waitingFor(Wait.forListeningPort());
-    }
-
-
-    public int getZookeeperPort() {
-        return getMappedPort(ZOOKEEPER_PORT);
-    }
-
-}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index dadab5a..c53a568 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -22,7 +22,7 @@ import java.util.Map;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
-import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerFactory.java
index 5d7e974..34c436e 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerFactory.java
@@ -18,7 +18,7 @@
 package org.apache.camel.kafkaconnector.common.services.kafkaconnect;
 
 import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
-import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
index cc83459..8fadc26 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/tests/itests-parent/pom.xml b/tests/itests-parent/pom.xml
index 18c310f..08ff4bd 100644
--- a/tests/itests-parent/pom.xml
+++ b/tests/itests-parent/pom.xml
@@ -58,6 +58,14 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-kafka</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-api</artifactId>
             <scope>test</scope>