You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/03/04 11:18:11 UTC

[camel] branch main updated: CAMEL-17727: camel-kafka - Add readiness health check for kafka producer

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 33d8d50  CAMEL-17727: camel-kafka - Add readiness health check for kafka producer
33d8d50 is described below

commit 33d8d5050ac7c4dc0b08080ea986e0b3495c2da7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 4 12:12:00 2022 +0100

    CAMEL-17727: camel-kafka - Add readiness health check for kafka producer
---
 .../component/kafka/KafkaConsumerHealthCheck.java  |  5 +-
 .../camel/component/kafka/KafkaFetchRecords.java   | 21 ++++-
 .../camel/component/kafka/KafkaProducer.java       | 94 ++++++++++++++++++----
 .../component/kafka/KafkaProducerHealthCheck.java  | 64 +++++++++++++++
 4 files changed, 165 insertions(+), 19 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java
index b20115f..19458e2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java
@@ -56,10 +56,7 @@ public class KafkaConsumerHealthCheck extends AbstractHealthCheck {
                 Properties props = task.getKafkaProps();
 
                 builder.detail("bootstrap.servers", props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-                String cid = props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG);
-                if (cid != null) {
-                    builder.detail("client.id", cid);
-                }
+                builder.detail("client.id", task.getClientId());
                 String gid = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
                 if (gid != null) {
                     builder.detail("group.id", gid);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 6bbb135..63c4a61 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -36,6 +36,7 @@ import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ReflectionHelper;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.TopicPartition;
@@ -49,6 +50,7 @@ class KafkaFetchRecords implements Runnable {
 
     private final KafkaConsumer kafkaConsumer;
     private org.apache.kafka.clients.consumer.Consumer consumer;
+    private String clientId;
     private final String topicName;
     private final Pattern topicPattern;
     private final String threadId;
@@ -124,6 +126,20 @@ class KafkaFetchRecords implements Runnable {
 
             // this may throw an exception if something is wrong with kafka consumer
             this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps);
+
+            // init client id which we may need to get from the kafka producer via reflection
+            if (clientId == null) {
+                clientId = getKafkaProps().getProperty(CommonClientConfigs.CLIENT_ID_CONFIG);
+                if (clientId == null) {
+                    try {
+                        clientId = (String) ReflectionHelper
+                                .getField(consumer.getClass().getDeclaredField("clientId"), consumer);
+                    } catch (Exception e) {
+                        // ignore
+                        clientId = "";
+                    }
+                }
+            }
         } finally {
             Thread.currentThread().setContextClassLoader(threadClassLoader);
         }
@@ -140,7 +156,6 @@ class KafkaFetchRecords implements Runnable {
     }
 
     private void subscribe() {
-
         KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer);
         resumeStrategy.setConsumer(consumer);
 
@@ -445,4 +460,8 @@ class KafkaFetchRecords implements Runnable {
     Properties getKafkaProps() {
         return kafkaProps;
     }
+
+    String getClientId() {
+        return clientId;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 017645e..43abcf9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -36,11 +36,15 @@ import org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCa
 import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
 import org.apache.camel.component.kafka.producer.support.ProducerUtil;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
+import org.apache.camel.health.HealthCheckRegistry;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.URISupport;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -57,6 +61,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
     @SuppressWarnings("rawtypes")
     private org.apache.kafka.clients.producer.Producer kafkaProducer;
+    private KafkaProducerHealthCheck healthCheck;
+    private String clientId;
     private final KafkaEndpoint endpoint;
     private final KafkaConfiguration configuration;
     private ExecutorService workerPool;
@@ -76,6 +82,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
         configKey = configuration.getKey();
     }
 
+    @Override
+    public KafkaEndpoint getEndpoint() {
+        return (KafkaEndpoint) super.getEndpoint();
+    }
+
     Properties getProps() {
         Properties props = configuration.createProducerProperties();
         endpoint.updateClassProperties(props);
@@ -88,6 +99,32 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return props;
     }
 
+    public boolean isReady() {
+        boolean ready = true;
+        try {
+            if (kafkaProducer instanceof org.apache.kafka.clients.producer.KafkaProducer) {
+                // need to use reflection to access the network client which has API to check if the client has ready
+                // connections
+                org.apache.kafka.clients.producer.KafkaProducer kp
+                        = (org.apache.kafka.clients.producer.KafkaProducer) kafkaProducer;
+                org.apache.kafka.clients.producer.internals.Sender sender
+                        = (org.apache.kafka.clients.producer.internals.Sender) ReflectionHelper
+                                .getField(kp.getClass().getDeclaredField("sender"), kp);
+                NetworkClient nc
+                        = (NetworkClient) ReflectionHelper.getField(sender.getClass().getDeclaredField("client"), sender);
+                LOG.trace(
+                        "Health-Check calling org.apache.kafka.clients.NetworkClient.hasReadyNode");
+                ready = nc.hasReadyNodes(System.currentTimeMillis());
+            }
+        } catch (Exception e) {
+            // ignore
+            LOG.debug("Cannot check hasReadyNodes on KafkaConsumer client (ConsumerNetworkClient) due to "
+                      + e.getMessage() + ". This exception is ignored.",
+                    e);
+        }
+        return ready;
+    }
+
     @SuppressWarnings("rawtypes")
     public org.apache.kafka.clients.producer.Producer getKafkaProducer() {
         return kafkaProducer;
@@ -123,26 +160,39 @@ public class KafkaProducer extends DefaultAsyncProducer {
             // we create a thread pool so we should also shut it down
             shutdownWorkerPool = true;
         }
-    }
 
-    private void createProducer(Properties props) {
-        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            // Kafka uses reflection for loading authentication settings,
-            // use its classloader
-            Thread.currentThread()
-                    .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
-            LOG.trace("Creating KafkaProducer");
-            kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props);
-            closeKafkaProducer = true;
-        } finally {
-            Thread.currentThread().setContextClassLoader(threadClassLoader);
+        // init client id which we may need to get from the kafka producer via reflection
+        if (clientId == null) {
+            clientId = getProps().getProperty(CommonClientConfigs.CLIENT_ID_CONFIG);
+            if (clientId == null) {
+                try {
+                    clientId = (String) ReflectionHelper
+                            .getField(kafkaProducer.getClass().getDeclaredField("clientId"), kafkaProducer);
+                } catch (Exception e) {
+                    // ignore
+                    clientId = "";
+                }
+            }
+        }
+
+        // install producer health-check
+        HealthCheckRegistry hcr = getEndpoint().getCamelContext().getExtension(HealthCheckRegistry.class);
+        if (hcr != null) {
+            healthCheck = new KafkaProducerHealthCheck(this, clientId);
+            hcr.register(healthCheck);
         }
-        LOG.debug("Created KafkaProducer: {}", kafkaProducer);
     }
 
     @Override
     protected void doStop() throws Exception {
+        if (healthCheck != null) {
+            HealthCheckRegistry hcr = getEndpoint().getCamelContext().getExtension(HealthCheckRegistry.class);
+            if (hcr != null) {
+                hcr.unregister(healthCheck);
+            }
+            healthCheck = null;
+        }
+
         if (kafkaProducer != null && closeKafkaProducer) {
             LOG.debug("Closing KafkaProducer: {}", kafkaProducer);
             kafkaProducer.close();
@@ -157,6 +207,22 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
     }
 
+    private void createProducer(Properties props) {
+        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            // Kafka uses reflection for loading authentication settings,
+            // use its classloader
+            Thread.currentThread()
+                    .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+            LOG.trace("Creating KafkaProducer");
+            kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props);
+            closeKafkaProducer = true;
+        } finally {
+            Thread.currentThread().setContextClassLoader(threadClassLoader);
+        }
+        LOG.debug("Created KafkaProducer: {}", kafkaProducer);
+    }
+
     protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(
             Exchange exchange, Message message) {
         String topic = evaluateTopic(message);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducerHealthCheck.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducerHealthCheck.java
new file mode 100644
index 0000000..34fde2a
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducerHealthCheck.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.health.HealthCheckResultBuilder;
+import org.apache.camel.impl.health.AbstractHealthCheck;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+/**
+ * Kafka producer readiness health-check
+ */
+public class KafkaProducerHealthCheck extends AbstractHealthCheck {
+
+    private final KafkaProducer kafkaProducer;
+    private final String clientId;
+
+    public KafkaProducerHealthCheck(KafkaProducer kafkaProducer, String clientId) {
+        super("camel", "kafka-producer-" + clientId);
+        this.kafkaProducer = kafkaProducer;
+        this.clientId = clientId;
+    }
+
+    @Override
+    public boolean isLiveness() {
+        // this health check is only readiness
+        return false;
+    }
+
+    @Override
+    protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) {
+        if (!kafkaProducer.isReady()) {
+            builder.down();
+            builder.message("KafkaProducer is not ready");
+
+            KafkaConfiguration cfg = kafkaProducer.getEndpoint().getConfiguration();
+            Properties props = kafkaProducer.getProps();
+
+            builder.detail("bootstrap.servers", props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+            builder.detail("client.id", clientId);
+            String gid = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+            if (gid != null) {
+                builder.detail("group.id", gid);
+            }
+            builder.detail("topic", cfg.getTopic());
+        }
+    }
+}