You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/08/03 04:18:00 UTC

[camel] branch camel-3.18.x updated: Backport several Kafka fixes (#8098)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new c2f89ddfb6b Backport several Kafka fixes (#8098)
c2f89ddfb6b is described below

commit c2f89ddfb6b62777f58b90f3ba043fedb1c83c14
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Aug 3 06:17:55 2022 +0200

    Backport several Kafka fixes (#8098)
    
    * CAMEL-18327: remove unnecessary consumer unsubscription
    
    Unsubscribing the consumer is already done when handling poll() exceptions. This removes the redundant unsubscription calls.
    
    * CAMEL-17947: remove unnecessary consumer close
    
    Closing the consumer is already handled by the KafkaConsumer if necessary or by evaluating the poll exception strategy
    
    * CAMEL-18333: camel-kafka - avoid NPE during health-check
    
    The pollExceptionStrategy might be null if, for example, the
    KafkaConsumer has not been created due to an exception. In those cases,
    a NPE was being thrown during the health check.
    
    * CAMEL-18333: camel-kafka: add better error message to the health-check
    
    This extracts the message from the root cause exception and adds it to
    the general health check message. This message should contain more
    relevant information to the user, like telling that the boostrap servers
    are incorrect or unreachable, or that a port number is invalid. Those
    messages would not be present in the default health check message,
    making it harder for the user to diagnose, especially in environtments
    where the component is running inside a pod and the user doesn't
    necessarily have access to the logs.
    
    Co-authored-by: Rinaldo Pitzer JĂșnior <16...@users.noreply.github.com>
---
 .../camel/component/kafka/KafkaFetchRecords.java   |   8 +-
 .../camel/component/kafka/TaskHealthState.java     |  13 ++
 .../KafkaConsumerBadPortHealthCheckIT.java         | 193 ++++++++++++++++++++
 ...fkaConsumerBadPortSupervisingHealthCheckIT.java | 201 +++++++++++++++++++++
 .../KafkaConsumerUnresolvableHealthCheckIT.java    | 193 ++++++++++++++++++++
 5 files changed, 602 insertions(+), 6 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index e92d08f4c19..266bf7a8f11 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -161,9 +161,6 @@ public class KafkaFetchRecords implements Runnable {
         if (LOG.isInfoEnabled()) {
             LOG.info("Terminating KafkaConsumer thread {} receiving from {}", threadId, getPrintableTopic());
         }
-
-        safeUnsubscribe();
-        IOHelper.close(consumer);
     }
 
     private void setupInitializeErrorException(ForegroundTask task, int max) {
@@ -350,8 +347,6 @@ public class KafkaFetchRecords implements Runnable {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", threadId, getPrintableTopic());
             }
-
-            safeUnsubscribe();
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
                 LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}",
@@ -520,7 +515,8 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     private boolean isRecoverable() {
-        return (pollExceptionStrategy.canContinue() || isReconnect()) && isKafkaConsumerRunnable();
+        return (pollExceptionStrategy != null && pollExceptionStrategy.canContinue() || isReconnect())
+                && isKafkaConsumerRunnable();
     }
 
     // concurrent access happens here
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
index 1ce9a130d6e..f8f0f74437a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
@@ -93,6 +93,19 @@ public class TaskHealthState {
             msg += " (recovery in progress using " + time + " intervals).";
         }
 
+        if (lastError != null) {
+            msg += " - Error: " + extractRootCause(lastError).getMessage();
+        }
+
         return msg;
     }
+
+    private Throwable extractRootCause(Throwable throwable) {
+        Throwable rootCause = throwable;
+        while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
+            rootCause = rootCause.getCause();
+        }
+        return rootCause;
+    }
+
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
new file mode 100644
index 00000000000..0ca75d0b2b4
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
+    public static final String TOPIC = "test-health";
+
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerBadPortHealthCheckIT.class);
+
+    @BindToRegistry("myHeaderDeserializer")
+    private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                    + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers() + 123);
+        context.addComponent("kafka", kafka);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                        .routeId("test-health-it").to(to);
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> {
+                return r.getState().equals(HealthCheck.State.DOWN) &&
+                        r.getMessage().stream().allMatch(msg -> msg.contains("port"));
+            });
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        to.expectedMessageCount(0);
+        to.expectedMinimumMessageCount(0);
+        to.expectedNoHeaderReceived();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
new file mode 100644
index 00000000000..31727cae3fd
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.engine.DefaultSupervisingRouteController;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSupport {
+    public static final String TOPIC = "test-health";
+
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerBadPortSupervisingHealthCheckIT.class);
+
+    @BindToRegistry("myHeaderDeserializer")
+    private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                    + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        context.setRouteController(new DefaultSupervisingRouteController());
+        SupervisingRouteController src = context.getRouteController().supervising();
+        src.setBackOffDelay(3);
+        src.setBackOffMaxAttempts(3);
+        src.setInitialDelay(3);
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers() + 123);
+        context.addComponent("kafka", kafka);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                        .routeId("test-health-it").to(to);
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> {
+                return r.getState().equals(HealthCheck.State.DOWN) &&
+                        r.getMessage().stream().allMatch(msg -> msg.contains("port"));
+            });
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        to.expectedMessageCount(0);
+        to.expectedMinimumMessageCount(0);
+        to.expectedNoHeaderReceived();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
new file mode 100644
index 00000000000..c0395ec7f6c
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
+    public static final String TOPIC = "test-health";
+
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUnresolvableHealthCheckIT.class);
+
+    @BindToRegistry("myHeaderDeserializer")
+    private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                    + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @BeforeAll
+    public static void beforeClass() {
+        service.initialize();
+
+        LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+        System.setProperty("brokers", service.getBootstrapServers());
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        service.shutdown();
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service);
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+
+        KafkaComponent kafka = new KafkaComponent(context);
+        kafka.init();
+        kafka.getConfiguration().setBrokers(service.getBootstrapServers().replace("localhost", "locaIhost"));
+        context.addComponent("kafka", kafka);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
+                        .routeId("test-health-it").to(to);
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> {
+                return r.getState().equals(HealthCheck.State.DOWN) &&
+                        r.getMessage().stream().allMatch(msg -> msg.contains("bootstrap"));
+            });
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        to.expectedMessageCount(0);
+        to.expectedMinimumMessageCount(0);
+        to.expectedNoHeaderReceived();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
+}