You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/03/04 14:38:10 UTC

[camel] 03/03: CAMEL-17727: camel-kafka - Use a single health-check repository to hold all consumer/producer checks.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 36c3acbddf3ad608e859562cdd975848cddd08af
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 4 15:37:21 2022 +0100

    CAMEL-17727: camel-kafka - Use a single health-check repository to hold all consumer/producer checks.
---
 .../{camel-kafka => camel-kafka-repository}        |  0
 .../camel/component/kafka/KafkaConsumer.java       | 54 +++++++++++++---------
 .../kafka/KafkaHealthCheckRepository.java          |  2 +-
 .../camel/component/kafka/KafkaProducer.java       | 38 +++++++++------
 .../java/org/apache/camel/main/KameletMain.java    |  2 +
 5 files changed, 61 insertions(+), 35 deletions(-)

diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka-repository
similarity index 100%
rename from components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka
rename to components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/health-check/camel-kafka-repository
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 03c9ecb..1fe4d7e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -18,17 +18,21 @@ package org.apache.camel.component.kafka;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.ResumeAware;
 import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.health.HealthCheckAware;
 import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.health.HealthCheckRepository;
+import org.apache.camel.health.HealthCheckResolver;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
@@ -46,6 +50,7 @@ public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaC
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private KafkaConsumerHealthCheck consumerHealthCheck;
+    private KafkaHealthCheckRepository healthCheckRepository;
     // This list helps to work around the infinite loop of KAFKA-1894
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
@@ -115,21 +120,6 @@ public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaC
                 endpoint.getConfiguration().isBreakOnFirstError());
         super.doStart();
 
-        HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
-        if (hcr != null) {
-            String rid = getRouteId();
-            if (rid == null) {
-                // not from a route so need some other uuid
-                rid = endpoint.getCamelContext().getUuidGenerator().generateUuid();
-            }
-            consumerHealthCheck = new KafkaConsumerHealthCheck(this, rid);
-
-            hcr.getRepository("camel-kafka").ifPresent(r -> {
-                KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
-                kr.addHealthCheck(consumerHealthCheck);
-            });
-        }
-
         // is the offset repository already started?
         StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository();
         if (repo instanceof ServiceSupport) {
@@ -158,18 +148,40 @@ public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaC
 
             tasks.add(task);
         }
+
+        // health-check is optional so discover and resolve
+        HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
+        if (hcr != null) {
+            Optional<HealthCheckRepository> hrc = hcr.getRepository("camel-kafka");
+            if (hrc.isEmpty()) {
+                // use resolver to load from classpath if needed
+                HealthCheckResolver resolver
+                        = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getHealthCheckResolver();
+                HealthCheckRepository hr = resolver.resolveHealthCheckRepository("camel-kafka");
+                if (hr != null) {
+                    hrc = Optional.of(hr);
+                    hcr.register(hr);
+                }
+            }
+            if (hrc.isPresent()) {
+                healthCheckRepository = (KafkaHealthCheckRepository) hrc.get();
+                String rid = getRouteId();
+                if (rid == null) {
+                    // not from a route so need some other uuid
+                    rid = endpoint.getCamelContext().getUuidGenerator().generateUuid();
+                }
+                consumerHealthCheck = new KafkaConsumerHealthCheck(this, rid);
+                healthCheckRepository.addHealthCheck(consumerHealthCheck);
+            }
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
         LOG.info("Stopping Kafka consumer on topic: {}", endpoint.getConfiguration().getTopic());
 
-        HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
-        if (hcr != null) {
-            hcr.getRepository("camel-kafka").ifPresent(r -> {
-                KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
-                kr.removeHealthCheck(consumerHealthCheck);
-            });
+        if (healthCheckRepository != null && consumerHealthCheck != null) {
+            healthCheckRepository.removeHealthCheck(consumerHealthCheck);
             consumerHealthCheck = null;
         }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java
index be54414..7517605 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHealthCheckRepository.java
@@ -32,7 +32,7 @@ import org.apache.camel.support.service.ServiceSupport;
 /**
  * Repository for camel-kafka {@link HealthCheck}s.
  */
-@org.apache.camel.spi.annotations.HealthCheck("camel-kafka")
+@org.apache.camel.spi.annotations.HealthCheck("camel-kafka-repository")
 @DeferredContextBinding
 public class KafkaHealthCheckRepository extends ServiceSupport
         implements CamelContextAware, HealthCheckRepository, StaticService, NonManagedService {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 8256697..acd020d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -29,6 +30,7 @@ import java.util.concurrent.Future;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Message;
 import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
 import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
@@ -37,6 +39,8 @@ import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
 import org.apache.camel.component.kafka.producer.support.ProducerUtil;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.health.HealthCheckRepository;
+import org.apache.camel.health.HealthCheckResolver;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.KeyValueHolder;
@@ -62,6 +66,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings("rawtypes")
     private org.apache.kafka.clients.producer.Producer kafkaProducer;
     private KafkaProducerHealthCheck producerHealthCheck;
+    private KafkaHealthCheckRepository healthCheckRepository;
     private String clientId;
     private final KafkaEndpoint endpoint;
     private final KafkaConfiguration configuration;
@@ -175,25 +180,32 @@ public class KafkaProducer extends DefaultAsyncProducer {
             }
         }
 
-        // install producer health-check
-        HealthCheckRegistry hcr = getEndpoint().getCamelContext().getExtension(HealthCheckRegistry.class);
+        // health-check is optional so discover and resolve
+        HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
         if (hcr != null) {
-            producerHealthCheck = new KafkaProducerHealthCheck(this, clientId);
-            hcr.getRepository("camel-kafka").ifPresent(r -> {
-                KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
-                kr.addHealthCheck(producerHealthCheck);
-            });
+            Optional<HealthCheckRepository> hrc = hcr.getRepository("camel-kafka");
+            if (hrc.isEmpty()) {
+                // use resolver to load from classpath if needed
+                HealthCheckResolver resolver
+                        = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getHealthCheckResolver();
+                HealthCheckRepository hr = resolver.resolveHealthCheckRepository("camel-kafka");
+                if (hr != null) {
+                    hrc = Optional.of(hr);
+                    hcr.register(hr);
+                }
+            }
+            if (hrc.isPresent()) {
+                healthCheckRepository = (KafkaHealthCheckRepository) hrc.get();
+                producerHealthCheck = new KafkaProducerHealthCheck(this, clientId);
+                healthCheckRepository.addHealthCheck(producerHealthCheck);
+            }
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        HealthCheckRegistry hcr = endpoint.getCamelContext().getExtension(HealthCheckRegistry.class);
-        if (hcr != null) {
-            hcr.getRepository("camel-kafka").ifPresent(r -> {
-                KafkaHealthCheckRepository kr = (KafkaHealthCheckRepository) r;
-                kr.removeHealthCheck(producerHealthCheck);
-            });
+        if (healthCheckRepository != null && producerHealthCheck != null) {
+            healthCheckRepository.removeHealthCheck(producerHealthCheck);
             producerHealthCheck = null;
         }
 
diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
index 3637142..665c872 100644
--- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
+++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java
@@ -161,6 +161,8 @@ public class KameletMain extends MainCommandLineSupport {
         }
         answer.setApplicationContextClassLoader(kameletClassLoader);
         answer.setRegistry(registry);
+        // load camel component and custom health-checks
+        answer.setLoadHealthChecks(true);
 
         // embed HTTP server if port is specified
         Object port = getInitialProperties().get("camel.jbang.platform-http.port");