You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/09/18 09:21:38 UTC
[camel] 01/01: [CAMEL-19875]HealthCheck is broken for KafkaConsumer (#11422)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch 4.0.x-CAMEL-19875
in repository https://gitbox.apache.org/repos/asf/camel.git
commit db31916c7685b1f5bdc0e6c8ca260965a48edef4
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Mon Sep 18 05:20:29 2023 -0400
[CAMEL-19875]HealthCheck is broken for KafkaConsumer (#11422)
---
.../org/apache/camel/component/kafka/KafkaConsumer.java | 11 +++++------
.../integration/health/KafkaConsumerHealthCheckIT.java | 16 +++++++++++++---
2 files changed, 18 insertions(+), 9 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 92cd99e3d4f..4a84066737c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -30,7 +30,7 @@ import org.apache.camel.Suspendable;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckHelper;
-import org.apache.camel.health.WritableHealthCheckRepository;
+import org.apache.camel.health.HealthCheckRepository;
import org.apache.camel.resume.ConsumerListenerAware;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
@@ -53,7 +53,7 @@ public class KafkaConsumer extends DefaultConsumer
protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private KafkaConsumerHealthCheck consumerHealthCheck;
- private WritableHealthCheckRepository healthCheckRepository;
+ private HealthCheckRepository healthCheckRepository;
// This list helps to work around the infinite loop of KAFKA-1894
private final List<KafkaFetchRecords> tasks = new ArrayList<>();
private volatile boolean stopOffsetRepo;
@@ -126,13 +126,13 @@ public class KafkaConsumer extends DefaultConsumer
// health-check is optional so discover and resolve
healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
endpoint.getCamelContext(),
- "components",
- WritableHealthCheckRepository.class);
+ "consumers",
+ HealthCheckRepository.class);
if (healthCheckRepository != null) {
consumerHealthCheck = new KafkaConsumerHealthCheck(this, getRouteId());
consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckConsumerEnabled());
- healthCheckRepository.addHealthCheck(consumerHealthCheck);
+ setHealthCheck(consumerHealthCheck);
}
// is the offset repository already started?
@@ -175,7 +175,6 @@ public class KafkaConsumer extends DefaultConsumer
}
if (healthCheckRepository != null && consumerHealthCheck != null) {
- healthCheckRepository.removeHealthCheck(consumerHealthCheck);
consumerHealthCheck = null;
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
index 030c578ff73..47292f4405b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.integration.health;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
@@ -166,9 +167,18 @@ public class KafkaConsumerHealthCheckIT extends KafkaHealthCheckTestSupport {
serviceShutdown = true;
// health-check readiness should be DOWN
- final Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context);
- final boolean down = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN));
- Assertions.assertTrue(down, "readiness check");
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+ Assertions.assertTrue(res2.size() > 0);
+ Optional<HealthCheck.Result> down
+ = res2.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN)).findFirst();
+ Assertions.assertTrue(down.isPresent());
+ String msg = down.get().getMessage().get();
+ Assertions.assertTrue(msg.contains("KafkaConsumer is not ready"));
+ Map<String, Object> map = down.get().getDetails();
+ Assertions.assertEquals(TOPIC, map.get("topic"));
+ Assertions.assertEquals("test-health-it", map.get("route.id"));
+ });
}
}