You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/22 11:06:31 UTC
[camel] 02/02: (chores) camel-kafka: rework the KafkaConsumerHealthCheckIT
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5ad73bdd87b9fb6cf521ea1cc1c02ffd602d56d0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 22 11:45:20 2022 +0200
(chores) camel-kafka: rework the KafkaConsumerHealthCheckIT
It manages the service instance, therefore it should not use the singleton service shared by other services via BaseEmbeddedKafkaTestSupport
---
.../integration/KafkaConsumerHealthCheckIT.java | 48 ++++++++++++++++++++--
1 file changed, 44 insertions(+), 4 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
index 576e0118d3f..dcb52c44f6e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -29,6 +29,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
@@ -37,14 +38,21 @@ import org.apache.camel.health.HealthCheck;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.HealthCheckRegistry;
import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,9 +63,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
public static final String TOPIC = "test-health";
+ public static KafkaService service = KafkaServiceFactory.createService();
+
+ protected static AdminClient kafkaAdminClient;
+
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class);
@BindToRegistry("myHeaderDeserializer")
@@ -68,7 +81,6 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
+ "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
private Endpoint from;
-
@EndpointInject("mock:result")
private MockEndpoint to;
@@ -76,11 +88,32 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
@BeforeEach
public void before() {
- Properties props = getDefaultProperties();
+ Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
MockConsumerInterceptor.recordsCaptured.clear();
}
+ @BeforeAll
+ public static void beforeClass() {
+ service.initialize();
+
+ LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
+ System.setProperty("bootstrapServers", service.getBootstrapServers());
+ System.setProperty("brokers", service.getBootstrapServers());
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ service.shutdown();
+ }
+
+ @BeforeEach
+ public void setKafkaAdminClient() {
+ if (kafkaAdminClient == null) {
+ kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+ }
+ }
+
@AfterEach
public void after() {
if (producer != null) {
@@ -93,6 +126,12 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
+ context.getPropertiesComponent().setLocation("ref:prop");
+
+ KafkaComponent kafka = new KafkaComponent(context);
+ kafka.init();
+ kafka.getConfiguration().setBrokers(service.getBootstrapServers());
+ context.addComponent("kafka", kafka);
// install health check manually (yes a bit cumbersome)
HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
@@ -114,7 +153,8 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
@Override
public void configure() {
- from(from).process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+ from(from)
+ .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
.routeId("test-health-it").to(to);
}
};