You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/03/04 11:18:11 UTC
[camel] branch main updated: CAMEL-17727: camel-kafka - Add readiness health check for kafka producer
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 33d8d50 CAMEL-17727: camel-kafka - Add readiness health check for kafka producer
33d8d50 is described below
commit 33d8d5050ac7c4dc0b08080ea986e0b3495c2da7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 4 12:12:00 2022 +0100
CAMEL-17727: camel-kafka - Add readiness health check for kafka producer
---
.../component/kafka/KafkaConsumerHealthCheck.java | 5 +-
.../camel/component/kafka/KafkaFetchRecords.java | 21 ++++-
.../camel/component/kafka/KafkaProducer.java | 94 ++++++++++++++++++----
.../component/kafka/KafkaProducerHealthCheck.java | 64 +++++++++++++++
4 files changed, 165 insertions(+), 19 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java
index b20115f..19458e2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerHealthCheck.java
@@ -56,10 +56,7 @@ public class KafkaConsumerHealthCheck extends AbstractHealthCheck {
Properties props = task.getKafkaProps();
builder.detail("bootstrap.servers", props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- String cid = props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG);
- if (cid != null) {
- builder.detail("client.id", cid);
- }
+ builder.detail("client.id", task.getClientId());
String gid = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (gid != null) {
builder.detail("group.id", gid);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 6bbb135..63c4a61 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -36,6 +36,7 @@ import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ReflectionHelper;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.TopicPartition;
@@ -49,6 +50,7 @@ class KafkaFetchRecords implements Runnable {
private final KafkaConsumer kafkaConsumer;
private org.apache.kafka.clients.consumer.Consumer consumer;
+ private String clientId;
private final String topicName;
private final Pattern topicPattern;
private final String threadId;
@@ -124,6 +126,20 @@ class KafkaFetchRecords implements Runnable {
// this may throw an exception if something is wrong with kafka consumer
this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps);
+
+ // init client id which we may need to get from the kafka producer via reflection
+ if (clientId == null) {
+ clientId = getKafkaProps().getProperty(CommonClientConfigs.CLIENT_ID_CONFIG);
+ if (clientId == null) {
+ try {
+ clientId = (String) ReflectionHelper
+ .getField(consumer.getClass().getDeclaredField("clientId"), consumer);
+ } catch (Exception e) {
+ // ignore
+ clientId = "";
+ }
+ }
+ }
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
@@ -140,7 +156,6 @@ class KafkaFetchRecords implements Runnable {
}
private void subscribe() {
-
KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer);
resumeStrategy.setConsumer(consumer);
@@ -445,4 +460,8 @@ class KafkaFetchRecords implements Runnable {
Properties getKafkaProps() {
return kafkaProps;
}
+
+ String getClientId() {
+ return clientId;
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 017645e..43abcf9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -36,11 +36,15 @@ import org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCa
import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
import org.apache.camel.component.kafka.producer.support.ProducerUtil;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
+import org.apache.camel.health.HealthCheckRegistry;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.URISupport;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -57,6 +61,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
@SuppressWarnings("rawtypes")
private org.apache.kafka.clients.producer.Producer kafkaProducer;
+ private KafkaProducerHealthCheck healthCheck;
+ private String clientId;
private final KafkaEndpoint endpoint;
private final KafkaConfiguration configuration;
private ExecutorService workerPool;
@@ -76,6 +82,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
configKey = configuration.getKey();
}
+ @Override
+ public KafkaEndpoint getEndpoint() {
+ return (KafkaEndpoint) super.getEndpoint();
+ }
+
Properties getProps() {
Properties props = configuration.createProducerProperties();
endpoint.updateClassProperties(props);
@@ -88,6 +99,32 @@ public class KafkaProducer extends DefaultAsyncProducer {
return props;
}
+ public boolean isReady() {
+ boolean ready = true;
+ try {
+ if (kafkaProducer instanceof org.apache.kafka.clients.producer.KafkaProducer) {
+ // need to use reflection to access the network client which has API to check if the client has ready
+ // connections
+ org.apache.kafka.clients.producer.KafkaProducer kp
+ = (org.apache.kafka.clients.producer.KafkaProducer) kafkaProducer;
+ org.apache.kafka.clients.producer.internals.Sender sender
+ = (org.apache.kafka.clients.producer.internals.Sender) ReflectionHelper
+ .getField(kp.getClass().getDeclaredField("sender"), kp);
+ NetworkClient nc
+ = (NetworkClient) ReflectionHelper.getField(sender.getClass().getDeclaredField("client"), sender);
+ LOG.trace(
+ "Health-Check calling org.apache.kafka.clients.NetworkClient.hasReadyNode");
+ ready = nc.hasReadyNodes(System.currentTimeMillis());
+ }
+ } catch (Exception e) {
+ // ignore
+ LOG.debug("Cannot check hasReadyNodes on KafkaConsumer client (ConsumerNetworkClient) due to "
+ + e.getMessage() + ". This exception is ignored.",
+ e);
+ }
+ return ready;
+ }
+
@SuppressWarnings("rawtypes")
public org.apache.kafka.clients.producer.Producer getKafkaProducer() {
return kafkaProducer;
@@ -123,26 +160,39 @@ public class KafkaProducer extends DefaultAsyncProducer {
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
}
- }
- private void createProducer(Properties props) {
- ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- // Kafka uses reflection for loading authentication settings,
- // use its classloader
- Thread.currentThread()
- .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
- LOG.trace("Creating KafkaProducer");
- kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props);
- closeKafkaProducer = true;
- } finally {
- Thread.currentThread().setContextClassLoader(threadClassLoader);
+ // init client id which we may need to get from the kafka producer via reflection
+ if (clientId == null) {
+ clientId = getProps().getProperty(CommonClientConfigs.CLIENT_ID_CONFIG);
+ if (clientId == null) {
+ try {
+ clientId = (String) ReflectionHelper
+ .getField(kafkaProducer.getClass().getDeclaredField("clientId"), kafkaProducer);
+ } catch (Exception e) {
+ // ignore
+ clientId = "";
+ }
+ }
+ }
+
+ // install producer health-check
+ HealthCheckRegistry hcr = getEndpoint().getCamelContext().getExtension(HealthCheckRegistry.class);
+ if (hcr != null) {
+ healthCheck = new KafkaProducerHealthCheck(this, clientId);
+ hcr.register(healthCheck);
}
- LOG.debug("Created KafkaProducer: {}", kafkaProducer);
}
@Override
protected void doStop() throws Exception {
+ if (healthCheck != null) {
+ HealthCheckRegistry hcr = getEndpoint().getCamelContext().getExtension(HealthCheckRegistry.class);
+ if (hcr != null) {
+ hcr.unregister(healthCheck);
+ }
+ healthCheck = null;
+ }
+
if (kafkaProducer != null && closeKafkaProducer) {
LOG.debug("Closing KafkaProducer: {}", kafkaProducer);
kafkaProducer.close();
@@ -157,6 +207,22 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
}
+ private void createProducer(Properties props) {
+ ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ // Kafka uses reflection for loading authentication settings,
+ // use its classloader
+ Thread.currentThread()
+ .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+ LOG.trace("Creating KafkaProducer");
+ kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props);
+ closeKafkaProducer = true;
+ } finally {
+ Thread.currentThread().setContextClassLoader(threadClassLoader);
+ }
+ LOG.debug("Created KafkaProducer: {}", kafkaProducer);
+ }
+
protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(
Exchange exchange, Message message) {
String topic = evaluateTopic(message);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducerHealthCheck.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducerHealthCheck.java
new file mode 100644
index 0000000..34fde2a
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducerHealthCheck.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.health.HealthCheckResultBuilder;
+import org.apache.camel.impl.health.AbstractHealthCheck;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+/**
+ * Kafka producer readiness health-check
+ */
+public class KafkaProducerHealthCheck extends AbstractHealthCheck {
+
+ private final KafkaProducer kafkaProducer;
+ private final String clientId;
+
+ public KafkaProducerHealthCheck(KafkaProducer kafkaProducer, String clientId) {
+ super("camel", "kafka-producer-" + clientId);
+ this.kafkaProducer = kafkaProducer;
+ this.clientId = clientId;
+ }
+
+ @Override
+ public boolean isLiveness() {
+ // this health check is only readiness
+ return false;
+ }
+
+ @Override
+ protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) {
+ if (!kafkaProducer.isReady()) {
+ builder.down();
+ builder.message("KafkaProducer is not ready");
+
+ KafkaConfiguration cfg = kafkaProducer.getEndpoint().getConfiguration();
+ Properties props = kafkaProducer.getProps();
+
+ builder.detail("bootstrap.servers", props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ builder.detail("client.id", clientId);
+ String gid = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ if (gid != null) {
+ builder.detail("group.id", gid);
+ }
+ builder.detail("topic", cfg.getTopic());
+ }
+ }
+}