You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/07 11:28:00 UTC

[camel] 02/02: (chores) camel-kafka: cleanup base test classes for easier reuse

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 91007e7544e075116d4c55ed076c4c5d15f31da5
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Dec 7 11:05:29 2022 +0100

    (chores) camel-kafka: cleanup base test classes for easier reuse
---
 ...tSupport.java => AbstractKafkaTestSupport.java} | 51 +++++-----------------
 .../BaseEmbeddedKafkaAuthTestSupport.java          | 44 ++-----------------
 .../integration/BaseEmbeddedKafkaTestSupport.java  | 43 ++----------------
 .../integration/KafkaConsumerHealthCheckIT.java    |  4 +-
 .../health/KafkaConsumerBadPortHealthCheckIT.java  |  6 +--
 ...fkaConsumerBadPortSupervisingHealthCheckIT.java |  6 +--
 .../KafkaConsumerUnresolvableHealthCheckIT.java    |  6 +--
 .../KafkaPausableConsumerCircuitBreakerIT.java     |  5 ++-
 .../integration/pause/KafkaPausableConsumerIT.java |  5 ++-
 9 files changed, 35 insertions(+), 135 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
similarity index 67%
copy from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
copy to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
index d0d76d5eb9d..f02c28e1b6d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.kafka.integration;
 
 import java.util.Properties;
@@ -22,37 +23,26 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
-    // Note: you must use Strimzi on MacOS with Apple Silicon (and possibly other ARM-based architectures)
-    @RegisterExtension
-    public static KafkaService service = KafkaServiceFactory.createSingletonService();
-
-    protected static AdminClient kafkaAdminClient;
-
-    private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTestSupport.class);
+public abstract class AbstractKafkaTestSupport extends CamelTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTestSupport.class);
 
-    @BeforeAll
-    public static void beforeClass() {
-        LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
+    protected static void setServiceProperties(KafkaService service) {
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
         System.setProperty("bootstrapServers", service.getBootstrapServers());
     }
 
-    @BeforeEach
-    public void setKafkaAdminClient() {
-        if (kafkaAdminClient == null) {
-            kafkaAdminClient = createAdminClient();
-        }
+    public static AdminClient createAdminClient(KafkaService service) {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
+
+        return KafkaAdminClient.create(properties);
     }
 
     public static Properties getDefaultProperties(KafkaService service) {
@@ -67,12 +57,7 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
         return props;
     }
 
-    protected Properties getDefaultProperties() {
-        return getDefaultProperties(service);
-    }
-
-    @Override
-    protected CamelContext createCamelContext() throws Exception {
+    protected CamelContext createCamelContextFromService(KafkaService service) throws Exception {
         CamelContext context = super.createCamelContext();
         context.getPropertiesComponent().setLocation("ref:prop");
 
@@ -84,18 +69,6 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
         return context;
     }
 
-    protected static String getBootstrapServers() {
-        return service.getBootstrapServers();
-    }
-
-    private static AdminClient createAdminClient() {
-        return createAdminClient(service);
-    }
+    protected abstract Properties getDefaultProperties();
 
-    public static AdminClient createAdminClient(KafkaService service) {
-        final Properties properties = new Properties();
-        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-
-        return KafkaAdminClient.create(properties);
-    }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
index 6e7f11f8c96..c888ff8c4bf 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
@@ -19,35 +19,24 @@ package org.apache.camel.component.kafka.integration;
 import java.util.Properties;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.test.infra.kafka.services.ContainerLocalAuthKafkaService;
-import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Tags;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Tags({ @Tag("non-abstract") })
-public abstract class BaseEmbeddedKafkaAuthTestSupport extends CamelTestSupport {
+public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTestSupport {
     @RegisterExtension
     public static ContainerLocalAuthKafkaService service = new ContainerLocalAuthKafkaService("/kafka-jaas.config");
 
     protected static AdminClient kafkaAdminClient;
 
-    private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaAuthTestSupport.class);
-
     @BeforeAll
     public static void beforeClass() {
-        LOG.info("Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
-        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        AbstractKafkaTestSupport.setServiceProperties(service);
     }
 
     @BeforeEach
@@ -57,33 +46,13 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends CamelTestSupport
         }
     }
 
-    public static Properties getDefaultProperties(KafkaService service) {
-        LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
-
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
-        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
-        props.put(ProducerConfig.ACKS_CONFIG, "1");
-        return props;
-    }
-
     protected Properties getDefaultProperties() {
         return getDefaultProperties(service);
     }
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        context.getPropertiesComponent().setLocation("ref:prop");
-
-        KafkaComponent kafka = new KafkaComponent(context);
-        kafka.init();
-        kafka.getConfiguration().setBrokers(service.getBootstrapServers());
-        context.addComponent("kafka", kafka);
-
-        return context;
+        return createCamelContextFromService(service);
     }
 
     protected static String getBootstrapServers() {
@@ -93,11 +62,4 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends CamelTestSupport
     private static AdminClient createAdminClient() {
         return createAdminClient(service);
     }
-
-    public static AdminClient createAdminClient(KafkaService service) {
-        final Properties properties = new Properties();
-        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-
-        return KafkaAdminClient.create(properties);
-    }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
index d0d76d5eb9d..02e27dbefb2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
@@ -19,33 +19,22 @@ package org.apache.camel.component.kafka.integration;
 import java.util.Properties;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
-import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
-    // Note: you must use Strimzi on MacOS with Apple Silicon (and possibly other ARM-based architectures)
+public abstract class BaseEmbeddedKafkaTestSupport extends AbstractKafkaTestSupport {
     @RegisterExtension
     public static KafkaService service = KafkaServiceFactory.createSingletonService();
 
     protected static AdminClient kafkaAdminClient;
 
-    private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTestSupport.class);
-
     @BeforeAll
     public static void beforeClass() {
-        LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
-        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        AbstractKafkaTestSupport.setServiceProperties(service);
     }
 
     @BeforeEach
@@ -55,33 +44,13 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
         }
     }
 
-    public static Properties getDefaultProperties(KafkaService service) {
-        LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
-
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
-        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
-        props.put(ProducerConfig.ACKS_CONFIG, "1");
-        return props;
-    }
-
     protected Properties getDefaultProperties() {
         return getDefaultProperties(service);
     }
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        context.getPropertiesComponent().setLocation("ref:prop");
-
-        KafkaComponent kafka = new KafkaComponent(context);
-        kafka.init();
-        kafka.getConfiguration().setBrokers(service.getBootstrapServers());
-        context.addComponent("kafka", kafka);
-
-        return context;
+        return createCamelContextFromService(service);
     }
 
     protected static String getBootstrapServers() {
@@ -92,10 +61,4 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
         return createAdminClient(service);
     }
 
-    public static AdminClient createAdminClient(KafkaService service) {
-        final Properties properties = new Properties();
-        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-
-        return KafkaAdminClient.create(properties);
-    }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
index 910d5820dfb..e6fbcd24d6b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -87,7 +87,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
 
     @BeforeEach
     public void before() {
-        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
     }
@@ -109,7 +109,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
     @BeforeEach
     public void setKafkaAdminClient() {
         if (kafkaAdminClient == null) {
-            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+            kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
         }
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java
index c01ebcf9e7d..c0e680cb0e7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
-import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.health.HealthCheck;
@@ -85,7 +85,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
 
     @BeforeEach
     public void before() {
-        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
     }
@@ -100,7 +100,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
     @BeforeEach
     public void setKafkaAdminClient() {
         if (kafkaAdminClient == null) {
-            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+            kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
         }
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java
index 43783972e03..052f81674e4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
-import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.health.HealthCheck;
@@ -87,7 +87,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo
 
     @BeforeEach
     public void before() {
-        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
     }
@@ -102,7 +102,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo
     @BeforeEach
     public void setKafkaAdminClient() {
         if (kafkaAdminClient == null) {
-            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+            kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
         }
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java
index 1eb60997993..5b6977e15bc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
-import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.health.HealthCheck;
@@ -84,7 +84,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
 
     @BeforeEach
     public void before() {
-        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
     }
@@ -106,7 +106,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
     @BeforeEach
     public void setKafkaAdminClient() {
         if (kafkaAdminClient == null) {
-            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+            kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
         }
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
index b0b4b51ddd6..0bf9911001b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
@@ -32,6 +32,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
 import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -78,7 +79,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest
 
     @BeforeEach
     public void before() {
-        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
     }
@@ -89,7 +90,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest
             producer.close();
         }
         // clean all test topics
-        BaseEmbeddedKafkaTestSupport.createAdminClient(service)
+        AbstractKafkaTestSupport.createAdminClient(service)
                 .deleteTopics(Collections.singletonList(SOURCE_TOPIC)).all();
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
index 3932798b83d..4a6d30debc8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -32,6 +32,7 @@ import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
 import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -98,7 +99,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
 
     @BeforeEach
     public void before() {
-        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
 
@@ -111,7 +112,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
             producer.close();
         }
         // clean all test topics
-        BaseEmbeddedKafkaTestSupport.createAdminClient(service)
+        AbstractKafkaTestSupport.createAdminClient(service)
                 .deleteTopics(Collections.singletonList(SOURCE_TOPIC)).all();
 
         executorService.shutdownNow();