You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2022/03/18 07:34:39 UTC

[camel-quarkus] 03/06: Temporary hacks to handle version misalignment of smallrye-health and smallrye-faulttolerance in Quarkus & Camel

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch camel-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 3c8a2dd00438cd29797143847f1b03777e4f23f9
Author: James Netherton <ja...@gmail.com>
AuthorDate: Fri Feb 11 10:08:03 2022 +0000

    Temporary hacks to handle version misalignment of smallrye-health and smallrye-faulttolerance in Quarkus & Camel
---
 .../MicroprofileFaultToleranceProcessor.java       |   9 +
 .../microprofile-fault-tolerance/runtime/pom.xml   |   2 +
 .../FaultToleranceConfiguration.java               | 120 +++++
 .../faulttolerance/FaultToleranceConstants.java}   |  21 +-
 .../faulttolerance/FaultToleranceProcessor.java    | 536 +++++++++++++++++++++
 .../FaultToleranceProcessorFactory.java}           |  28 +-
 .../faulttolerance/FaultToleranceReifier.java      | 193 ++++++++
 .../apache/camel/model/CircuitBreakerDefinition    |  18 +
 .../runtime/CamelMicroProfileHealthRecorder.java   |   3 +-
 ...amelQuarkusMicroProfileHealthCheckRegistry.java |  62 +++
 10 files changed, 958 insertions(+), 34 deletions(-)

diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
index 05673b1..6e8c382 100644
--- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
@@ -16,9 +16,13 @@
  */
 package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
 
+import java.nio.file.Paths;
+
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
 import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+import org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory;
+import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem;
 
 class MicroprofileFaultToleranceProcessor {
 
@@ -35,4 +39,9 @@ class MicroprofileFaultToleranceProcessor {
                 "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
     }
 
+    @BuildStep
+    CamelServiceBuildItem camelCronServicePattern() {
+        return new CamelServiceBuildItem(Paths.get("META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"),
+                FaultToleranceProcessorFactory.class.getName());
+    }
 }
diff --git a/extensions/microprofile-fault-tolerance/runtime/pom.xml b/extensions/microprofile-fault-tolerance/runtime/pom.xml
index 3401f07..22e3962 100644
--- a/extensions/microprofile-fault-tolerance/runtime/pom.xml
+++ b/extensions/microprofile-fault-tolerance/runtime/pom.xml
@@ -56,10 +56,12 @@
             <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-core</artifactId>
         </dependency>
+        <!-- Not compatible with Quarkus 2.7.x
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-microprofile-fault-tolerance</artifactId>
         </dependency>
+        -->
     </dependencies>
 
     <build>
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
new file mode 100644
index 0000000..7cb3d4d
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+public class FaultToleranceConfiguration {
+
+    private long delay;
+    private int successThreshold;
+    private int requestVolumeThreshold;
+    private float failureRatio;
+    private boolean timeoutEnabled;
+    private long timeoutDuration;
+    private int timeoutPoolSize;
+    private String timeoutExecutorServiceRef;
+    private boolean bulkheadEnabled;
+    private int bulkheadMaxConcurrentCalls;
+    private int bulkheadWaitingTaskQueue;
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public int getSuccessThreshold() {
+        return successThreshold;
+    }
+
+    public void setSuccessThreshold(int successThreshold) {
+        this.successThreshold = successThreshold;
+    }
+
+    public int getRequestVolumeThreshold() {
+        return requestVolumeThreshold;
+    }
+
+    public void setRequestVolumeThreshold(int requestVolumeThreshold) {
+        this.requestVolumeThreshold = requestVolumeThreshold;
+    }
+
+    public float getFailureRatio() {
+        return failureRatio;
+    }
+
+    public void setFailureRatio(float failureRatio) {
+        this.failureRatio = failureRatio;
+    }
+
+    public boolean isTimeoutEnabled() {
+        return timeoutEnabled;
+    }
+
+    public void setTimeoutEnabled(boolean timeoutEnabled) {
+        this.timeoutEnabled = timeoutEnabled;
+    }
+
+    public long getTimeoutDuration() {
+        return timeoutDuration;
+    }
+
+    public void setTimeoutDuration(long timeoutDuration) {
+        this.timeoutDuration = timeoutDuration;
+    }
+
+    public int getTimeoutPoolSize() {
+        return timeoutPoolSize;
+    }
+
+    public void setTimeoutPoolSize(int timeoutPoolSize) {
+        this.timeoutPoolSize = timeoutPoolSize;
+    }
+
+    public String getTimeoutExecutorServiceRef() {
+        return timeoutExecutorServiceRef;
+    }
+
+    public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+        this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+    }
+
+    public boolean isBulkheadEnabled() {
+        return bulkheadEnabled;
+    }
+
+    public void setBulkheadEnabled(boolean bulkheadEnabled) {
+        this.bulkheadEnabled = bulkheadEnabled;
+    }
+
+    public int getBulkheadMaxConcurrentCalls() {
+        return bulkheadMaxConcurrentCalls;
+    }
+
+    public void setBulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) {
+        this.bulkheadMaxConcurrentCalls = bulkheadMaxConcurrentCalls;
+    }
+
+    public int getBulkheadWaitingTaskQueue() {
+        return bulkheadWaitingTaskQueue;
+    }
+
+    public void setBulkheadWaitingTaskQueue(int bulkheadWaitingTaskQueue) {
+        this.bulkheadWaitingTaskQueue = bulkheadWaitingTaskQueue;
+    }
+}
diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
similarity index 53%
copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
index 05673b1..3bb0027 100644
--- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
@@ -14,25 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+package org.apache.camel.component.microprofile.faulttolerance;
 
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+public interface FaultToleranceConstants {
 
-class MicroprofileFaultToleranceProcessor {
-
-    private static final String FEATURE = "camel-microprofile-fault-tolerance";
-
-    @BuildStep
-    FeatureBuildItem feature() {
-        return new FeatureBuildItem(FEATURE);
-    }
-
-    @BuildStep
-    NativeImageResourceBuildItem initResources() {
-        return new NativeImageResourceBuildItem(
-                "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
-    }
+    String DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID = "fault-tolerance-configuration";
 
 }
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
new file mode 100644
index 0000000..2195b89
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
+import io.smallrye.faulttolerance.core.InvocationContext;
+import io.smallrye.faulttolerance.core.bulkhead.FutureThreadPoolBulkhead;
+import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import io.smallrye.faulttolerance.core.fallback.Fallback;
+import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
+import io.smallrye.faulttolerance.core.timeout.ScheduledExecutorTimeoutWatcher;
+import io.smallrye.faulttolerance.core.timeout.Timeout;
+import io.smallrye.faulttolerance.core.timeout.TimeoutWatcher;
+import io.smallrye.faulttolerance.core.util.SetOfThrowables;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
+import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.smallrye.faulttolerance.core.Invocation.invocation;
+
+/**
+ * Implementation of Circuit Breaker EIP using microprofile fault tolerance.
+ */
+@ManagedResource(description = "Managed FaultTolerance Processor")
+public class FaultToleranceProcessor extends AsyncProcessorSupport
+        implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceProcessor.class);
+
+    private volatile CircuitBreaker circuitBreaker;
+    private CamelContext camelContext;
+    private String id;
+    private String routeId;
+    private final FaultToleranceConfiguration config;
+    private final Processor processor;
+    private final Processor fallbackProcessor;
+    private ScheduledExecutorService scheduledExecutorService;
+    private boolean shutdownScheduledExecutorService;
+    private ExecutorService executorService;
+    private boolean shutdownExecutorService;
+    private ProcessorExchangeFactory processorExchangeFactory;
+    private PooledExchangeTaskFactory taskFactory;
+    private PooledExchangeTaskFactory fallbackTaskFactory;
+
+    public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor,
+            Processor fallbackProcessor) {
+        this.config = config;
+        this.processor = processor;
+        this.fallbackProcessor = fallbackProcessor;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
+    public CircuitBreaker getCircuitBreaker() {
+        return circuitBreaker;
+    }
+
+    public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
+        this.circuitBreaker = circuitBreaker;
+    }
+
+    public boolean isShutdownExecutorService() {
+        return shutdownExecutorService;
+    }
+
+    public void setShutdownExecutorService(boolean shutdownExecutorService) {
+        this.shutdownExecutorService = shutdownExecutorService;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    @Override
+    public String getTraceLabel() {
+        return "faultTolerance";
+    }
+
+    @ManagedAttribute(description = "Returns the current delay in milliseconds.")
+    public long getDelay() {
+        return config.getDelay();
+    }
+
+    @ManagedAttribute(description = "Returns the current failure rate in percentage.")
+    public float getFailureRate() {
+        return config.getFailureRatio();
+    }
+
+    @ManagedAttribute(description = "Returns the current request volume threshold.")
+    public int getRequestVolumeThreshold() {
+        return config.getRequestVolumeThreshold();
+    }
+
+    @ManagedAttribute(description = "Returns the current success threshold.")
+    public int getSuccessThreshold() {
+        return config.getSuccessThreshold();
+    }
+
+    @ManagedAttribute(description = "Is timeout enabled")
+    public boolean isTimeoutEnabled() {
+        return config.isTimeoutEnabled();
+    }
+
+    @ManagedAttribute(description = "The timeout wait duration")
+    public long getTimeoutDuration() {
+        return config.getTimeoutDuration();
+    }
+
+    @ManagedAttribute(description = "The timeout pool size for the thread pool")
+    public int getTimeoutPoolSize() {
+        return config.getTimeoutPoolSize();
+    }
+
+    @ManagedAttribute(description = "Is bulkhead enabled")
+    public boolean isBulkheadEnabled() {
+        return config.isBulkheadEnabled();
+    }
+
+    @ManagedAttribute(description = "The max amount of concurrent calls the bulkhead will support.")
+    public int getBulkheadMaxConcurrentCalls() {
+        return config.getBulkheadMaxConcurrentCalls();
+    }
+
+    @ManagedAttribute(description = "The task queue size for holding waiting tasks to be processed by the bulkhead")
+    public int getBulkheadWaitingTaskQueue() {
+        return config.getBulkheadWaitingTaskQueue();
+    }
+
+    @Override
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<>();
+        answer.add(processor);
+        if (fallbackProcessor != null) {
+            answer.add(fallbackProcessor);
+        }
+        return answer;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return true;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // run this as if we run inside try .. catch so there is no regular
+        // Camel error handler
+        exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
+
+        CircuitBreakerFallbackTask fallbackTask = null;
+        CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback);
+
+        // circuit breaker
+        FaultToleranceStrategy target = circuitBreaker;
+
+        // 1. bulkhead
+        if (config.isBulkheadEnabled()) {
+            target = new FutureThreadPoolBulkhead(
+                    target, "bulkhead", config.getBulkheadMaxConcurrentCalls(),
+                    config.getBulkheadWaitingTaskQueue());
+        }
+        // 2. timeout
+        if (config.isTimeoutEnabled()) {
+            TimeoutWatcher watcher = new ScheduledExecutorTimeoutWatcher(scheduledExecutorService);
+            target = new Timeout(target, "timeout", config.getTimeoutDuration(), watcher);
+        }
+        // 3. fallback
+        if (fallbackProcessor != null) {
+            fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback);
+            final CircuitBreakerFallbackTask fFallbackTask = fallbackTask;
+            target = new Fallback(target, "fallback", fallbackContext -> {
+                exchange.setException(fallbackContext.failure);
+                return fFallbackTask.call();
+            }, SetOfThrowables.ALL, SetOfThrowables.EMPTY);
+        }
+
+        try {
+            target.apply(new InvocationContext(task));
+        } catch (CircuitBreakerOpenException e) {
+            // the circuit breaker triggered a call rejected
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true);
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true);
+        } catch (Exception e) {
+            // some other kind of exception
+            exchange.setException(e);
+        } finally {
+            taskFactory.release(task);
+            if (fallbackTask != null) {
+                fallbackTaskFactory.release(fallbackTask);
+            }
+        }
+
+        exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    protected void doBuild() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+        boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            taskFactory.setCapacity(capacity);
+            fallbackTaskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+            fallbackTaskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            fallbackTaskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+        }
+
+        // create a per processor exchange factory
+        this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+                .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.processorExchangeFactory.setRouteId(getRouteId());
+        this.processorExchangeFactory.setId(getId());
+
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void doInit() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        if (circuitBreaker == null) {
+            circuitBreaker = new CircuitBreaker(
+                    invocation(), id, SetOfThrowables.ALL,
+                    SetOfThrowables.EMPTY, config.getDelay(), config.getRequestVolumeThreshold(), config.getFailureRatio(),
+                    config.getSuccessThreshold(), new SystemStopwatch());
+        }
+
+        ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (config.isTimeoutEnabled() && scheduledExecutorService == null) {
+            scheduledExecutorService = getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+                    "CircuitBreakerTimeout", config.getTimeoutPoolSize());
+            shutdownScheduledExecutorService = true;
+        }
+        if (config.isBulkheadEnabled() && executorService == null) {
+            executorService = getCamelContext().getExecutorServiceManager().newThreadPool(this, "CircuitBreakerBulkhead",
+                    config.getBulkheadMaxConcurrentCalls(), config.getBulkheadMaxConcurrentCalls());
+            shutdownExecutorService = true;
+        }
+
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (shutdownScheduledExecutorService && scheduledExecutorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+            scheduledExecutorService = null;
+        }
+        if (shutdownExecutorService && executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+    }
+
+    private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange> {
+
+        private Exchange exchange;
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
+        }
+
+        @Override
+        public Exchange call() throws Exception {
+            Exchange copy = null;
+            UnitOfWork uow = null;
+            Throwable cause;
+
+            // turn of interruption to allow fault tolerance to process the exchange under its handling
+            exchange.adapt(ExtendedExchange.class).setInterruptable(false);
+
+            try {
+                LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
+
+                // prepare a copy of exchange so downstream processors don't
+                // cause side-effects if they mutate the exchange
+                // in case timeout processing and continue with the fallback etc
+                copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+                if (copy.getUnitOfWork() != null) {
+                    uow = copy.getUnitOfWork();
+                } else {
+                    // prepare uow on copy
+                    uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+                    copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+                    // the copy must be starting from the route where its copied from
+                    Route route = ExchangeHelper.getRoute(exchange);
+                    if (route != null) {
+                        uow.pushRoute(route);
+                    }
+                }
+
+                // process the processor until its fully done
+                processor.process(copy);
+
+                // handle the processing result
+                if (copy.getException() != null) {
+                    exchange.setException(copy.getException());
+                } else {
+                    // copy the result as its regarded as success
+                    ExchangeHelper.copyResults(exchange, copy);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, true);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+                }
+            } catch (Exception e) {
+                exchange.setException(e);
+            } finally {
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
+                // remember any thrown exception
+                cause = exchange.getException();
+            }
+
+            // and release exchange back in pool
+            processorExchangeFactory.release(exchange);
+
+            if (cause != null) {
+                // throw exception so resilient4j know it was a failure
+                throw RuntimeExchangeException.wrapRuntimeException(cause);
+            }
+            return exchange;
+        }
+    }
+
+    private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Callable<Exchange> {
+
+        private Exchange exchange;
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
+        }
+
+        @Override
+        public Exchange call() throws Exception {
+            Throwable throwable = exchange.getException();
+            if (fallbackProcessor == null) {
+                if (throwable instanceof TimeoutException) {
+                    // the circuit breaker triggered a timeout (and there is no
+                    // fallback) so lets mark the exchange as failed
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, false);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_TIMED_OUT, true);
+                    exchange.setException(throwable);
+                    return exchange;
+                } else if (throwable instanceof CircuitBreakerOpenException) {
+                    // the circuit breaker triggered a call rejected
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true);
+                    exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true);
+                    return exchange;
+                } else {
+                    // throw exception so fault tolerance know it was a failure
+                    throw RuntimeExchangeException.wrapRuntimeException(throwable);
+                }
+            }
+
+            // fallback route is handling the exception so its short-circuited
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, true);
+            exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true);
+
+            // store the last to endpoint as the failure endpoint
+            if (exchange.getProperty(ExchangePropertyKey.FAILURE_ENDPOINT) == null) {
+                exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT,
+                        exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT));
+            }
+            // give the rest of the pipeline another chance
+            exchange.setProperty(ExchangePropertyKey.EXCEPTION_HANDLED, true);
+            exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, exchange.getException());
+            exchange.setRouteStop(false);
+            exchange.setException(null);
+            // and we should not be regarded as exhausted as we are in a try ..
+            // catch block
+            exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+            // run the fallback processor
+            try {
+                LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange);
+                // process the fallback until its fully done
+                fallbackProcessor.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", fallbackProcessor, exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            return exchange;
+        }
+    }
+
+}
diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
similarity index 52%
copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
index 05673b1..2b70ca9 100644
--- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
@@ -14,25 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+package org.apache.camel.component.microprofile.faulttolerance;
 
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.support.TypedProcessorFactory;
 
-class MicroprofileFaultToleranceProcessor {
-
-    private static final String FEATURE = "camel-microprofile-fault-tolerance";
+/**
+ * To integrate camel-microprofile-faulttolerance with the Camel routes using the Circuit Breaker EIP.
+ */
+public class FaultToleranceProcessorFactory extends TypedProcessorFactory<CircuitBreakerDefinition> {
 
-    @BuildStep
-    FeatureBuildItem feature() {
-        return new FeatureBuildItem(FEATURE);
+    public FaultToleranceProcessorFactory() {
+        super(CircuitBreakerDefinition.class);
     }
 
-    @BuildStep
-    NativeImageResourceBuildItem initResources() {
-        return new NativeImageResourceBuildItem(
-                "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
+    @Override
+    public Processor doCreateProcessor(Route route, CircuitBreakerDefinition definition) throws Exception {
+        return new FaultToleranceReifier(route, definition).createProcessor();
     }
 
 }
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
new file mode 100644
index 0000000..2664734
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.model.FaultToleranceConfigurationCommon;
+import org.apache.camel.model.FaultToleranceConfigurationDefinition;
+import org.apache.camel.model.Model;
+import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.spi.BeanIntrospection;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurer;
+import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.function.Suppliers;
+
+public class FaultToleranceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
+
+    public FaultToleranceReifier(Route route, CircuitBreakerDefinition definition) {
+        super(route, definition);
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        // create the regular and fallback processors
+        Processor processor = createChildProcessor(true);
+        Processor fallback = null;
+        if (definition.getOnFallback() != null) {
+            fallback = createProcessor(definition.getOnFallback());
+        }
+        boolean fallbackViaNetwork = definition.getOnFallback() != null
+                && parseBoolean(definition.getOnFallback().getFallbackViaNetwork(), false);
+        if (fallbackViaNetwork) {
+            throw new UnsupportedOperationException("camel-microprofile-fault-tolerance does not support onFallbackViaNetwork");
+        }
+        final FaultToleranceConfigurationCommon config = buildFaultToleranceConfiguration();
+
+        FaultToleranceConfiguration configuration = new FaultToleranceConfiguration();
+        configureCircuitBreaker(config, configuration);
+        configureTimeLimiter(config, configuration);
+        configureBulkhead(config, configuration);
+
+        FaultToleranceProcessor answer = new FaultToleranceProcessor(configuration, processor, fallback);
+        // using any existing circuit breakers?
+        if (config.getCircuitBreaker() != null) {
+            CircuitBreaker cb = mandatoryLookup(parseString(config.getCircuitBreaker()), CircuitBreaker.class);
+            answer.setCircuitBreaker(cb);
+        }
+        configureBulkheadExecutorService(answer, config);
+        return answer;
+    }
+
+    private void configureCircuitBreaker(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) {
+        target.setDelay(parseDuration(config.getDelay(), 5000));
+        target.setSuccessThreshold(parseInt(config.getSuccessThreshold(), 1));
+        target.setRequestVolumeThreshold(parseInt(config.getRequestVolumeThreshold(), 20));
+        if (config.getFailureRatio() != null) {
+            float num = parseFloat(config.getFailureRatio(), 50);
+            if (num < 1 || num > 100) {
+                throw new IllegalArgumentException("FailureRatio must be between 1 and 100, was: " + num);
+            }
+            float percent = num / 100;
+            target.setFailureRatio(percent);
+        } else {
+            target.setFailureRatio(0.5f);
+        }
+    }
+
+    private void configureTimeLimiter(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) {
+        if (!parseBoolean(config.getTimeoutEnabled(), false)) {
+            target.setTimeoutEnabled(false);
+        } else {
+            target.setTimeoutEnabled(true);
+        }
+
+        target.setTimeoutDuration(parseDuration(config.getTimeoutDuration(), 1000));
+        target.setTimeoutPoolSize(parseInt(config.getTimeoutPoolSize(), 10));
+    }
+
+    private void configureBulkhead(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) {
+        if (!parseBoolean(config.getBulkheadEnabled(), false)) {
+            return;
+        }
+
+        target.setBulkheadMaxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls(), 10));
+        target.setBulkheadWaitingTaskQueue(parseInt(config.getBulkheadWaitingTaskQueue(), 10));
+    }
+
+    private void configureBulkheadExecutorService(FaultToleranceProcessor processor, FaultToleranceConfigurationCommon config) {
+        if (!parseBoolean(config.getBulkheadEnabled(), false)) {
+            return;
+        }
+
+        if (config.getBulkheadExecutorService() != null) {
+            String ref = config.getBulkheadExecutorService();
+            boolean shutdownThreadPool = false;
+            ExecutorService executorService = lookupByNameAndType(ref, ExecutorService.class);
+            if (executorService == null) {
+                executorService = lookupExecutorServiceRef("CircuitBreaker", definition, ref);
+                shutdownThreadPool = true;
+            }
+            processor.setExecutorService(executorService);
+            processor.setShutdownExecutorService(shutdownThreadPool);
+        }
+    }
+
+    // *******************************
+    // Helpers
+    // *******************************
+
+    FaultToleranceConfigurationDefinition buildFaultToleranceConfiguration() throws Exception {
+        Map<String, Object> properties = new HashMap<>();
+
+        final PropertyConfigurer configurer = camelContext.adapt(ExtendedCamelContext.class)
+                .getConfigurerResolver()
+                .resolvePropertyConfigurer(FaultToleranceConfigurationDefinition.class.getName(), camelContext);
+
+        // Extract properties from default configuration, the one configured on
+        // camel context takes the precedence over those in the registry
+        loadProperties(properties, Suppliers.firstNotNull(
+                () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(null),
+                () -> lookupByNameAndType(FaultToleranceConstants.DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID,
+                        FaultToleranceConfigurationDefinition.class)),
+                configurer);
+
+        // Extract properties from referenced configuration, the one configured
+        // on camel context takes the precedence over those in the registry
+        if (definition.getConfiguration() != null) {
+            final String ref = parseString(definition.getConfiguration());
+
+            loadProperties(properties, Suppliers.firstNotNull(
+                    () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(ref),
+                    () -> mandatoryLookup(ref, FaultToleranceConfigurationDefinition.class)),
+                    configurer);
+        }
+
+        // Extract properties from local configuration
+        loadProperties(properties, Optional.ofNullable(definition.getFaultToleranceConfiguration()), configurer);
+
+        // Apply properties to a new configuration
+        FaultToleranceConfigurationDefinition config = new FaultToleranceConfigurationDefinition();
+        PropertyBindingSupport.build()
+                .withCamelContext(camelContext)
+                .withConfigurer(configurer)
+                .withProperties(properties)
+                .withTarget(config)
+                .bind();
+
+        return config;
+    }
+
+    private void loadProperties(Map<String, Object> properties, Optional<?> optional, PropertyConfigurer configurer) {
+        BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+        optional.ifPresent(bean -> {
+            if (configurer instanceof ExtendedPropertyConfigurerGetter) {
+                ExtendedPropertyConfigurerGetter getter = (ExtendedPropertyConfigurerGetter) configurer;
+                Map<String, Object> types = getter.getAllOptions(bean);
+                types.forEach((k, t) -> {
+                    Object value = getter.getOptionValue(bean, k, true);
+                    if (value != null) {
+                        properties.put(k, value);
+                    }
+                });
+            } else {
+                // no configurer found so use bean introspection (reflection)
+                beanIntrospection.getProperties(bean, properties, null, false);
+            }
+        });
+    }
+
+}
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
new file mode 100644
index 0000000..c43d558
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory
diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
index e33777b..907c133 100644
--- a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
+++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
@@ -20,7 +20,6 @@ import io.quarkus.runtime.RuntimeValue;
 import io.quarkus.runtime.annotations.Recorder;
 import org.apache.camel.CamelContext;
 import org.apache.camel.health.HealthCheckRegistry;
-import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry;
 import org.apache.camel.spi.CamelContextCustomizer;
 
 @Recorder
@@ -31,7 +30,7 @@ public class CamelMicroProfileHealthRecorder {
         return new RuntimeValue<>(new CamelContextCustomizer() {
             @Override
             public void configure(CamelContext camelContext) {
-                HealthCheckRegistry registry = new CamelMicroProfileHealthCheckRegistry(camelContext);
+                HealthCheckRegistry registry = new CamelQuarkusMicroProfileHealthCheckRegistry(camelContext);
                 registry.setId("camel-microprofile-health");
                 registry.setEnabled(true);
 
diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
new file mode 100644
index 0000000..8f51f80
--- /dev/null
+++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.microprofile.health.runtime;
+
+import java.lang.annotation.Annotation;
+import java.util.Set;
+
+import javax.enterprise.inject.spi.Bean;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.inject.spi.CDI;
+
+import io.smallrye.health.api.HealthRegistry;
+import io.smallrye.health.registry.LivenessHealthRegistry;
+import io.smallrye.health.registry.ReadinessHealthRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry;
+import org.eclipse.microprofile.health.Liveness;
+import org.eclipse.microprofile.health.Readiness;
+
+public class CamelQuarkusMicroProfileHealthCheckRegistry extends CamelMicroProfileHealthCheckRegistry {
+
+    CamelQuarkusMicroProfileHealthCheckRegistry(CamelContext camelContext) {
+        super(camelContext);
+    }
+
+    @Override
+    protected HealthRegistry getLivenessRegistry() {
+        return getHealthRegistryBean(LivenessHealthRegistry.class, Liveness.Literal.INSTANCE);
+    }
+
+    @Override
+    protected HealthRegistry getReadinessRegistry() {
+        return getHealthRegistryBean(ReadinessHealthRegistry.class, Readiness.Literal.INSTANCE);
+    }
+
+    private static HealthRegistry getHealthRegistryBean(Class<? extends HealthRegistry> type, Annotation qualifier) {
+        BeanManager beanManager = CDI.current().getBeanManager();
+        Set<Bean<?>> beans = beanManager.getBeans(type, qualifier);
+        if (beans.isEmpty()) {
+            throw new IllegalStateException(
+                    "Beans for type " + type.getName() + " with qualifier " + qualifier + " could not be found.");
+        }
+
+        Bean<?> bean = beanManager.resolve(beans);
+        Object reference = beanManager.getReference(bean, type, beanManager.createCreationalContext(bean));
+        return type.cast(reference);
+    }
+}