You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/22 11:06:31 UTC

[camel] 02/02: (chores) camel-kafka: rework the KafkaConsumerHealthCheckIT

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5ad73bdd87b9fb6cf521ea1cc1c02ffd602d56d0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 22 11:45:20 2022 +0200

    (chores) camel-kafka: rework the KafkaConsumerHealthCheckIT
    
    It manages the service instance, therefore it should not use the singleton service shared by other services via  BaseEmbeddedKafkaTestSupport
---
 .../integration/KafkaConsumerHealthCheckIT.java    | 48 ++++++++++++++++++++--
 1 file changed, 44 insertions(+), 4 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
index 576e0118d3f..dcb52c44f6e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -29,6 +29,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
@@ -37,14 +38,21 @@ import org.apache.camel.health.HealthCheck;
 import org.apache.camel.health.HealthCheckHelper;
 import org.apache.camel.health.HealthCheckRegistry;
 import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestMethodOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,9 +63,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
 
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
     public static final String TOPIC = "test-health";
 
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class);
 
     @BindToRegistry("myHeaderDeserializer")
@@ -68,7 +81,6 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
-
     @EndpointInject("mock:result")
     private MockEndpoint to;
 
@@ -76,11 +88,32 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
 
     @BeforeEach
     public void before() {
-        Properties props = getDefaultProperties();
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
         MockConsumerInterceptor.recordsCaptured.clear();
     }
 
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
     @AfterEach
     public void after() {
         if (producer != null) {
@@ -93,6 +126,12 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers());
+        context.addComponent("kafka", kafka);
 
         // install health check manually (yes a bit cumbersome)
         HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
@@ -114,7 +153,8 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
 
             @Override
             public void configure() {
-                from(from).process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
                         .routeId("test-health-it").to(to);
             }
         };