You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/07 11:28:00 UTC
[camel] 02/02: (chores) camel-kafka: cleanup base test classes for easier reuse
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 91007e7544e075116d4c55ed076c4c5d15f31da5
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Dec 7 11:05:29 2022 +0100
(chores) camel-kafka: cleanup base test classes for easier reuse
---
...tSupport.java => AbstractKafkaTestSupport.java} | 51 +++++-----------------
.../BaseEmbeddedKafkaAuthTestSupport.java | 44 ++-----------------
.../integration/BaseEmbeddedKafkaTestSupport.java | 43 ++----------------
.../integration/KafkaConsumerHealthCheckIT.java | 4 +-
.../health/KafkaConsumerBadPortHealthCheckIT.java | 6 +--
...fkaConsumerBadPortSupervisingHealthCheckIT.java | 6 +--
.../KafkaConsumerUnresolvableHealthCheckIT.java | 6 +--
.../KafkaPausableConsumerCircuitBreakerIT.java | 5 ++-
.../integration/pause/KafkaPausableConsumerIT.java | 5 ++-
9 files changed, 35 insertions(+), 135 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
similarity index 67%
copy from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
copy to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
index d0d76d5eb9d..f02c28e1b6d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.camel.component.kafka.integration;
import java.util.Properties;
@@ -22,37 +23,26 @@ import org.apache.camel.CamelContext;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
- // Note: you must use Strimzi on MacOS with Apple Silicon (and possibly other ARM-based architectures)
- @RegisterExtension
- public static KafkaService service = KafkaServiceFactory.createSingletonService();
-
- protected static AdminClient kafkaAdminClient;
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTestSupport.class);
+public abstract class AbstractKafkaTestSupport extends CamelTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTestSupport.class);
- @BeforeAll
- public static void beforeClass() {
- LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
+ protected static void setServiceProperties(KafkaService service) {
+ LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
System.setProperty("bootstrapServers", service.getBootstrapServers());
}
- @BeforeEach
- public void setKafkaAdminClient() {
- if (kafkaAdminClient == null) {
- kafkaAdminClient = createAdminClient();
- }
+ public static AdminClient createAdminClient(KafkaService service) {
+ final Properties properties = new Properties();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
+
+ return KafkaAdminClient.create(properties);
}
public static Properties getDefaultProperties(KafkaService service) {
@@ -67,12 +57,7 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
return props;
}
- protected Properties getDefaultProperties() {
- return getDefaultProperties(service);
- }
-
- @Override
- protected CamelContext createCamelContext() throws Exception {
+ protected CamelContext createCamelContextFromService(KafkaService service) throws Exception {
CamelContext context = super.createCamelContext();
context.getPropertiesComponent().setLocation("ref:prop");
@@ -84,18 +69,6 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
return context;
}
- protected static String getBootstrapServers() {
- return service.getBootstrapServers();
- }
-
- private static AdminClient createAdminClient() {
- return createAdminClient(service);
- }
+ protected abstract Properties getDefaultProperties();
- public static AdminClient createAdminClient(KafkaService service) {
- final Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-
- return KafkaAdminClient.create(properties);
- }
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
index 6e7f11f8c96..c888ff8c4bf 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
@@ -19,35 +19,24 @@ package org.apache.camel.component.kafka.integration;
import java.util.Properties;
import org.apache.camel.CamelContext;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.test.infra.kafka.services.ContainerLocalAuthKafkaService;
-import org.apache.camel.test.infra.kafka.services.KafkaService;
-import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Tags({ @Tag("non-abstract") })
-public abstract class BaseEmbeddedKafkaAuthTestSupport extends CamelTestSupport {
+public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTestSupport {
@RegisterExtension
public static ContainerLocalAuthKafkaService service = new ContainerLocalAuthKafkaService("/kafka-jaas.config");
protected static AdminClient kafkaAdminClient;
- private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaAuthTestSupport.class);
-
@BeforeAll
public static void beforeClass() {
- LOG.info("Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
- System.setProperty("bootstrapServers", service.getBootstrapServers());
+ AbstractKafkaTestSupport.setServiceProperties(service);
}
@BeforeEach
@@ -57,33 +46,13 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends CamelTestSupport
}
}
- public static Properties getDefaultProperties(KafkaService service) {
- LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
- props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
- props.put(ProducerConfig.ACKS_CONFIG, "1");
- return props;
- }
-
protected Properties getDefaultProperties() {
return getDefaultProperties(service);
}
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- context.getPropertiesComponent().setLocation("ref:prop");
-
- KafkaComponent kafka = new KafkaComponent(context);
- kafka.init();
- kafka.getConfiguration().setBrokers(service.getBootstrapServers());
- context.addComponent("kafka", kafka);
-
- return context;
+ return createCamelContextFromService(service);
}
protected static String getBootstrapServers() {
@@ -93,11 +62,4 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends CamelTestSupport
private static AdminClient createAdminClient() {
return createAdminClient(service);
}
-
- public static AdminClient createAdminClient(KafkaService service) {
- final Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-
- return KafkaAdminClient.create(properties);
- }
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
index d0d76d5eb9d..02e27dbefb2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
@@ -19,33 +19,22 @@ package org.apache.camel.component.kafka.integration;
import java.util.Properties;
import org.apache.camel.CamelContext;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
-import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
- // Note: you must use Strimzi on MacOS with Apple Silicon (and possibly other ARM-based architectures)
+public abstract class BaseEmbeddedKafkaTestSupport extends AbstractKafkaTestSupport {
@RegisterExtension
public static KafkaService service = KafkaServiceFactory.createSingletonService();
protected static AdminClient kafkaAdminClient;
- private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTestSupport.class);
-
@BeforeAll
public static void beforeClass() {
- LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
- System.setProperty("bootstrapServers", service.getBootstrapServers());
+ AbstractKafkaTestSupport.setServiceProperties(service);
}
@BeforeEach
@@ -55,33 +44,13 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
}
}
- public static Properties getDefaultProperties(KafkaService service) {
- LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
- props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
- props.put(ProducerConfig.ACKS_CONFIG, "1");
- return props;
- }
-
protected Properties getDefaultProperties() {
return getDefaultProperties(service);
}
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- context.getPropertiesComponent().setLocation("ref:prop");
-
- KafkaComponent kafka = new KafkaComponent(context);
- kafka.init();
- kafka.getConfiguration().setBrokers(service.getBootstrapServers());
- context.addComponent("kafka", kafka);
-
- return context;
+ return createCamelContextFromService(service);
}
protected static String getBootstrapServers() {
@@ -92,10 +61,4 @@ public abstract class BaseEmbeddedKafkaTestSupport extends CamelTestSupport {
return createAdminClient(service);
}
- public static AdminClient createAdminClient(KafkaService service) {
- final Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
-
- return KafkaAdminClient.create(properties);
- }
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
index 910d5820dfb..e6fbcd24d6b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -87,7 +87,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
@BeforeEach
public void before() {
- Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
}
@@ -109,7 +109,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
@BeforeEach
public void setKafkaAdminClient() {
if (kafkaAdminClient == null) {
- kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+ kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java
index c01ebcf9e7d..c0e680cb0e7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortHealthCheckIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
-import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.health.HealthCheck;
@@ -85,7 +85,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
@BeforeEach
public void before() {
- Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
}
@@ -100,7 +100,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
@BeforeEach
public void setKafkaAdminClient() {
if (kafkaAdminClient == null) {
- kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+ kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java
index 43783972e03..052f81674e4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerBadPortSupervisingHealthCheckIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
-import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.health.HealthCheck;
@@ -87,7 +87,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo
@BeforeEach
public void before() {
- Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
}
@@ -102,7 +102,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo
@BeforeEach
public void setKafkaAdminClient() {
if (kafkaAdminClient == null) {
- kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+ kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java
index 1eb60997993..5b6977e15bc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerUnresolvableHealthCheckIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
-import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.health.HealthCheck;
@@ -84,7 +84,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
@BeforeEach
public void before() {
- Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
}
@@ -106,7 +106,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
@BeforeEach
public void setKafkaAdminClient() {
if (kafkaAdminClient == null) {
- kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+ kafkaAdminClient = AbstractKafkaTestSupport.createAdminClient(service);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
index b0b4b51ddd6..0bf9911001b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
@@ -32,6 +32,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -78,7 +79,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest
@BeforeEach
public void before() {
- Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
}
@@ -89,7 +90,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest
producer.close();
}
// clean all test topics
- BaseEmbeddedKafkaTestSupport.createAdminClient(service)
+ AbstractKafkaTestSupport.createAdminClient(service)
.deleteTopics(Collections.singletonList(SOURCE_TOPIC)).all();
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
index 3932798b83d..4a6d30debc8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -32,6 +32,7 @@ import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.camel.component.kafka.integration.AbstractKafkaTestSupport;
import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -98,7 +99,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
@BeforeEach
public void before() {
- Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ Properties props = AbstractKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
@@ -111,7 +112,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
producer.close();
}
// clean all test topics
- BaseEmbeddedKafkaTestSupport.createAdminClient(service)
+ AbstractKafkaTestSupport.createAdminClient(service)
.deleteTopics(Collections.singletonList(SOURCE_TOPIC)).all();
executorService.shutdownNow();