You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/09/18 09:21:38 UTC

[camel] 01/01: [CAMEL-19875]HealthCheck is broken for KafkaConsumer (#11422)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 4.0.x-CAMEL-19875
in repository https://gitbox.apache.org/repos/asf/camel.git

commit db31916c7685b1f5bdc0e6c8ca260965a48edef4
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Mon Sep 18 05:20:29 2023 -0400

    [CAMEL-19875]HealthCheck is broken for KafkaConsumer (#11422)
---
 .../org/apache/camel/component/kafka/KafkaConsumer.java  | 11 +++++------
 .../integration/health/KafkaConsumerHealthCheckIT.java   | 16 +++++++++++++---
 2 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 92cd99e3d4f..4a84066737c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -30,7 +30,7 @@ import org.apache.camel.Suspendable;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import org.apache.camel.health.HealthCheckAware;
 import org.apache.camel.health.HealthCheckHelper;
-import org.apache.camel.health.WritableHealthCheckRepository;
+import org.apache.camel.health.HealthCheckRepository;
 import org.apache.camel.resume.ConsumerListenerAware;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -53,7 +53,7 @@ public class KafkaConsumer extends DefaultConsumer
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private KafkaConsumerHealthCheck consumerHealthCheck;
-    private WritableHealthCheckRepository healthCheckRepository;
+    private HealthCheckRepository healthCheckRepository;
     // This list helps to work around the infinite loop of KAFKA-1894
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
@@ -126,13 +126,13 @@ public class KafkaConsumer extends DefaultConsumer
         // health-check is optional so discover and resolve
         healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
                 endpoint.getCamelContext(),
-                "components",
-                WritableHealthCheckRepository.class);
+                "consumers",
+                HealthCheckRepository.class);
 
         if (healthCheckRepository != null) {
             consumerHealthCheck = new KafkaConsumerHealthCheck(this, getRouteId());
             consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckConsumerEnabled());
-            healthCheckRepository.addHealthCheck(consumerHealthCheck);
+            setHealthCheck(consumerHealthCheck);
         }
 
         // is the offset repository already started?
@@ -175,7 +175,6 @@ public class KafkaConsumer extends DefaultConsumer
         }
 
         if (healthCheckRepository != null && consumerHealthCheck != null) {
-            healthCheckRepository.removeHealthCheck(consumerHealthCheck);
             consumerHealthCheck = null;
         }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
index 030c578ff73..47292f4405b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.integration.health;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.StreamSupport;
@@ -166,9 +167,18 @@ public class KafkaConsumerHealthCheckIT extends KafkaHealthCheckTestSupport {
         serviceShutdown = true;
 
         // health-check readiness should be DOWN
-        final Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context);
-        final boolean down = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN));
-        Assertions.assertTrue(down, "readiness check");
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            Assertions.assertTrue(res2.size() > 0);
+            Optional<HealthCheck.Result> down
+                    = res2.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN)).findFirst();
+            Assertions.assertTrue(down.isPresent());
+            String msg = down.get().getMessage().get();
+            Assertions.assertTrue(msg.contains("KafkaConsumer is not ready"));
+            Map<String, Object> map = down.get().getDetails();
+            Assertions.assertEquals(TOPIC, map.get("topic"));
+            Assertions.assertEquals("test-health-it", map.get("route.id"));
+        });
     }
 
 }