You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/02 11:28:12 UTC

[camel] 02/02: CAMEL-18333: camel-kafka: add better error message to the health-check

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fcb501324fb55b7083a86d47d80fd7c59e680043
Author: Rinaldo Pitzer JĂșnior <16...@users.noreply.github.com>
AuthorDate: Mon Aug 1 15:07:42 2022 -0300

    CAMEL-18333: camel-kafka: add better error message to the health-check
    
    This extracts the message from the root cause exception and adds it to
    the general health check message. This message should contain more
    relevant information to the user, like telling that the boostrap servers
    are incorrect or unreachable, or that a port number is invalid. Those
    messages would not be present in the default health check message,
    making it harder for the user to diagnose, especially in environtments
    where the component is running inside a pod and the user doesn't
    necessarily have access to the logs.
---
 .../camel/component/kafka/TaskHealthState.java     |  13 ++
 .../KafkaConsumerBadPortHealthCheckIT.java         | 193 ++++++++++++++++++++
 ...fkaConsumerBadPortSupervisingHealthCheckIT.java | 201 +++++++++++++++++++++
 .../KafkaConsumerUnresolvableHealthCheckIT.java    | 193 ++++++++++++++++++++
 4 files changed, 600 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
index 1ce9a130d6e..f8f0f74437a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
@@ -93,6 +93,19 @@ public class TaskHealthState {
             msg += " (recovery in progress using " + time + " intervals).";
         }
 
+        if (lastError != null) {
+            msg += " - Error: " + extractRootCause(lastError).getMessage();
+        }
+
         return msg;
     }
+
+    private Throwable extractRootCause(Throwable throwable) {
+        Throwable rootCause = throwable;
+        while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
+            rootCause = rootCause.getCause();
+        }
+        return rootCause;
+    }
+
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
new file mode 100644
index 00000000000..0ca75d0b2b4
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
+    public static final String TOPIC = "test-health";
+
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerBadPortHealthCheckIT.class);
+
+    @BindToRegistry("myHeaderDeserializer")
+    private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                    + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers() + 123);
+        context.addComponent("kafka", kafka);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                        .routeId("test-health-it").to(to);
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> {
+                return r.getState().equals(HealthCheck.State.DOWN) &&
+                        r.getMessage().stream().allMatch(msg -> msg.contains("port"));
+            });
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        to.expectedMessageCount(0);
+        to.expectedMinimumMessageCount(0);
+        to.expectedNoHeaderReceived();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
new file mode 100644
index 00000000000..31727cae3fd
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.engine.DefaultSupervisingRouteController;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSupport {
+    public static final String TOPIC = "test-health";
+
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerBadPortSupervisingHealthCheckIT.class);
+
+    @BindToRegistry("myHeaderDeserializer")
+    private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                    + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        context.setRouteController(new DefaultSupervisingRouteController());
+        SupervisingRouteController src = context.getRouteController().supervising();
+        src.setBackOffDelay(3);
+        src.setBackOffMaxAttempts(3);
+        src.setInitialDelay(3);
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers() + 123);
+        context.addComponent("kafka", kafka);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                        .routeId("test-health-it").to(to);
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> {
+                return r.getState().equals(HealthCheck.State.DOWN) &&
+                        r.getMessage().stream().allMatch(msg -> msg.contains("port"));
+            });
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        to.expectedMessageCount(0);
+        to.expectedMinimumMessageCount(0);
+        to.expectedNoHeaderReceived();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
new file mode 100644
index 00000000000..c0395ec7f6c
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
+    public static final String TOPIC = "test-health";
+
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUnresolvableHealthCheckIT.class);
+
+    @BindToRegistry("myHeaderDeserializer")
+    private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                    + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers().replace("localhost", "locaIhost"));
+        context.addComponent("kafka", kafka);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                        .routeId("test-health-it").to(to);
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> {
+                return r.getState().equals(HealthCheck.State.DOWN) &&
+                        r.getMessage().stream().allMatch(msg -> msg.contains("bootstrap"));
+            });
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        to.expectedMessageCount(0);
+        to.expectedMinimumMessageCount(0);
+        to.expectedNoHeaderReceived();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
+}