You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/03/04 14:38:10 UTC
[camel] 03/03: CAMEL-17727: camel-kafka - Use a single health-check repository to hold all consumer/producer checks.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 36c3acbddf3ad608e859562cdd975848cddd08af
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 4 15:37:21 2022 +0100
CAMEL-17727: camel-kafka - Use a single health-check repository to hold all consumer/producer checks.
---
.../{camel-kafka => camel-kafka-repository} | 0
.../camel/component/kafka/KafkaConsumer.java | 54 +++++++++++++---------
.../kafka/KafkaHealthCheckRepository.java | 2 +-
.../camel/component/kafka/KafkaProducer.java | 38 +++++++++------
.../java/org/apache/camel/main/KameletMain.java | 2 +
5 files changed, 61 insertions(+), 35 deletions(-)
diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka-repository
similarity index 100%
rename from components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka
rename to components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka-repository
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 03c9ecb..1fe4d7e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -18,17 +18,21 @@ package org.apache.camel.component.kafka;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.ResumeAware;
import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.health.HealthCheckRepository;
+import org.apache.camel.health.HealthCheckResolver;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
@@ -46,6 +50,7 @@ public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaC
protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private KafkaConsumerHealthCheck consumerHealthCheck;
+ private KafkaHealthCheckRepository healthCheckRepository;
// This list helps to work around the infinite loop of KAFKA-1894
private final List<KafkaFetchRecords> tasks = new ArrayList<>();
private volatile boolean stopOffsetRepo;
@@ -115,21 +120,6 @@ public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaC
endpoint.getConfiguration().isBreakOnFirstError());
super.doStart();
- HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
- if (hcr != null) {
- String rid = getRouteId();
- if (rid == null) {
- // not from a route so need some other uuid
- rid = endpoint.getCamelContext().getUuidGenerator().generateUuid();
- }
- consumerHealthCheck = new KafkaConsumerHealthCheck(this, rid);
-
- hcr.getRepository("camel-kafka").ifPresent(r -> {
- KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
- kr.addHealthCheck(consumerHealthCheck);
- });
- }
-
// is the offset repository already started?
StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository();
if (repo instanceof ServiceSupport) {
@@ -158,18 +148,40 @@ public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaC
tasks.add(task);
}
+
+ // health-check is optional so discover and resolve
+ HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
+ if (hcr != null) {
+ Optional<HealthCheckRepository> hrc = hcr.getRepository("camel-kafka");
+ if (hrc.isEmpty()) {
+ // use resolver to load from classpath if needed
+ HealthCheckResolver resolver
+ = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getHealthCheckResolver();
+ HealthCheckRepository hr = resolver.resolveHealthCheckRepository("camel-kafka");
+ if (hr != null) {
+ hrc = Optional.of(hr);
+ hcr.register(hr);
+ }
+ }
+ if (hrc.isPresent()) {
+ healthCheckRepository = (KafkaHealthCheckRepository) hrc.get();
+ String rid = getRouteId();
+ if (rid == null) {
+ // not from a route so need some other uuid
+ rid = endpoint.getCamelContext().getUuidGenerator().generateUuid();
+ }
+ consumerHealthCheck = new KafkaConsumerHealthCheck(this, rid);
+ healthCheckRepository.addHealthCheck(consumerHealthCheck);
+ }
+ }
}
@Override
protected void doStop() throws Exception {
LOG.info("Stopping Kafka consumer on topic: {}", endpoint.getConfiguration().getTopic());
- HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
- if (hcr != null) {
- hcr.getRepository("camel-kafka").ifPresent(r -> {
- KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
- kr.removeHealthCheck(consumerHealthCheck);
- });
+ if (healthCheckRepository != null && consumerHealthCheck != null) {
+ healthCheckRepository.removeHealthCheck(consumerHealthCheck);
consumerHealthCheck = null;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java
index be54414..7517605 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java
@@ -32,7 +32,7 @@ import org.apache.camel.support.service.ServiceSupport;
/**
* Repository for camel-kafka {@link HealthCheck}s.
*/
-@org.apache.camel.spi.annotations.HealthCheck("camel-kafka")
+@org.apache.camel.spi.annotations.HealthCheck("camel-kafka-repository")
@DeferredContextBinding
public class KafkaHealthCheckRepository extends ServiceSupport
implements CamelContextAware, HealthCheckRepository, StaticService, NonManagedService {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 8256697..acd020d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -29,6 +30,7 @@ import java.util.concurrent.Future;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
@@ -37,6 +39,8 @@ import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
import org.apache.camel.component.kafka.producer.support.ProducerUtil;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.health.HealthCheckRepository;
+import org.apache.camel.health.HealthCheckResolver;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.KeyValueHolder;
@@ -62,6 +66,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
@SuppressWarnings("rawtypes")
private org.apache.kafka.clients.producer.Producer kafkaProducer;
private KafkaProducerHealthCheck producerHealthCheck;
+ private KafkaHealthCheckRepository healthCheckRepository;
private String clientId;
private final KafkaEndpoint endpoint;
private final KafkaConfiguration configuration;
@@ -175,25 +180,32 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
}
- // install producer health-check
- HealthCheckRegistry hcr = getEndpoint().getCamelContext().getExtension(HealthCheckRegistry.class);
+ // health-check is optional so discover and resolve
+ HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
if (hcr != null) {
- producerHealthCheck = new KafkaProducerHealthCheck(this, clientId);
- hcr.getRepository("camel-kafka").ifPresent(r -> {
- KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
- kr.addHealthCheck(producerHealthCheck);
- });
+ Optional<HealthCheckRepository> hrc = hcr.getRepository("camel-kafka");
+ if (hrc.isEmpty()) {
+ // use resolver to load from classpath if needed
+ HealthCheckResolver resolver
+ = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getHealthCheckResolver();
+ HealthCheckRepository hr = resolver.resolveHealthCheckRepository("camel-kafka");
+ if (hr != null) {
+ hrc = Optional.of(hr);
+ hcr.register(hr);
+ }
+ }
+ if (hrc.isPresent()) {
+ healthCheckRepository = (KafkaHealthCheckRepository) hrc.get();
+ producerHealthCheck = new KafkaProducerHealthCheck(this, clientId);
+ healthCheckRepository.addHealthCheck(producerHealthCheck);
+ }
}
}
@Override
protected void doStop() throws Exception {
- HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
- if (hcr != null) {
- hcr.getRepository("camel-kafka").ifPresent(r -> {
- KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
- kr.removeHealthCheck(producerHealthCheck);
- });
+ if (healthCheckRepository != null && producerHealthCheck != null) {
+ healthCheckRepository.removeHealthCheck(producerHealthCheck);
producerHealthCheck = null;
}
diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
index 3637142..665c872 100644
--- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
+++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
@@ -161,6 +161,8 @@ public class KameletMain extends MainCommandLineSupport {
}
answer.setApplicationContextClassLoader(kameletClassLoader);
answer.setRegistry(registry);
+ // load camel component and custom health-checks
+ answer.setLoadHealthChecks(true);
// embed HTTP server if port is specified
Object port = getInitialProperties().get("camel.jbang.platform-http.port");