You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2022/03/18 13:11:13 UTC
[camel-quarkus] 03/08: Temporary hacks to handle version misalignment of smallrye-health and smallrye-faulttolerance in Quarkus & Camel
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch camel-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 25c531db04752772116bfbf4b53d21997e8dc26f
Author: James Netherton <ja...@gmail.com>
AuthorDate: Fri Feb 11 10:08:03 2022 +0000
Temporary hacks to handle version misalignment of smallrye-health and smallrye-faulttolerance in Quarkus & Camel
---
.../MicroprofileFaultToleranceProcessor.java | 9 +
.../microprofile-fault-tolerance/runtime/pom.xml | 2 +
.../FaultToleranceConfiguration.java | 120 +++++
.../faulttolerance/FaultToleranceConstants.java} | 21 +-
.../faulttolerance/FaultToleranceProcessor.java | 536 +++++++++++++++++++++
.../FaultToleranceProcessorFactory.java} | 28 +-
.../faulttolerance/FaultToleranceReifier.java | 193 ++++++++
.../apache/camel/model/CircuitBreakerDefinition | 18 +
.../runtime/CamelMicroProfileHealthRecorder.java | 3 +-
...amelQuarkusMicroProfileHealthCheckRegistry.java | 62 +++
10 files changed, 958 insertions(+), 34 deletions(-)
diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
index 05673b1..6e8c382 100644
--- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
@@ -16,9 +16,13 @@
*/
package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+import java.nio.file.Paths;
+
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+import org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory;
+import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem;
class MicroprofileFaultToleranceProcessor {
@@ -35,4 +39,9 @@ class MicroprofileFaultToleranceProcessor {
"META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
}
+ @BuildStep
+ CamelServiceBuildItem camelCronServicePattern() {
+ return new CamelServiceBuildItem(Paths.get("META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"),
+ FaultToleranceProcessorFactory.class.getName());
+ }
}
diff --git a/extensions/microprofile-fault-tolerance/runtime/pom.xml b/extensions/microprofile-fault-tolerance/runtime/pom.xml
index 3401f07..22e3962 100644
--- a/extensions/microprofile-fault-tolerance/runtime/pom.xml
+++ b/extensions/microprofile-fault-tolerance/runtime/pom.xml
@@ -56,10 +56,12 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-core</artifactId>
</dependency>
+ <!-- Not compatible with Quarkus 2.7.x
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-microprofile-fault-tolerance</artifactId>
</dependency>
+ -->
</dependencies>
<build>
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
new file mode 100644
index 0000000..7cb3d4d
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+public class FaultToleranceConfiguration {
+
+ private long delay;
+ private int successThreshold;
+ private int requestVolumeThreshold;
+ private float failureRatio;
+ private boolean timeoutEnabled;
+ private long timeoutDuration;
+ private int timeoutPoolSize;
+ private String timeoutExecutorServiceRef;
+ private boolean bulkheadEnabled;
+ private int bulkheadMaxConcurrentCalls;
+ private int bulkheadWaitingTaskQueue;
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public int getSuccessThreshold() {
+ return successThreshold;
+ }
+
+ public void setSuccessThreshold(int successThreshold) {
+ this.successThreshold = successThreshold;
+ }
+
+ public int getRequestVolumeThreshold() {
+ return requestVolumeThreshold;
+ }
+
+ public void setRequestVolumeThreshold(int requestVolumeThreshold) {
+ this.requestVolumeThreshold = requestVolumeThreshold;
+ }
+
+ public float getFailureRatio() {
+ return failureRatio;
+ }
+
+ public void setFailureRatio(float failureRatio) {
+ this.failureRatio = failureRatio;
+ }
+
+ public boolean isTimeoutEnabled() {
+ return timeoutEnabled;
+ }
+
+ public void setTimeoutEnabled(boolean timeoutEnabled) {
+ this.timeoutEnabled = timeoutEnabled;
+ }
+
+ public long getTimeoutDuration() {
+ return timeoutDuration;
+ }
+
+ public void setTimeoutDuration(long timeoutDuration) {
+ this.timeoutDuration = timeoutDuration;
+ }
+
+ public int getTimeoutPoolSize() {
+ return timeoutPoolSize;
+ }
+
+ public void setTimeoutPoolSize(int timeoutPoolSize) {
+ this.timeoutPoolSize = timeoutPoolSize;
+ }
+
+ public String getTimeoutExecutorServiceRef() {
+ return timeoutExecutorServiceRef;
+ }
+
+ public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+ this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+ }
+
+ public boolean isBulkheadEnabled() {
+ return bulkheadEnabled;
+ }
+
+ public void setBulkheadEnabled(boolean bulkheadEnabled) {
+ this.bulkheadEnabled = bulkheadEnabled;
+ }
+
+ public int getBulkheadMaxConcurrentCalls() {
+ return bulkheadMaxConcurrentCalls;
+ }
+
+ public void setBulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) {
+ this.bulkheadMaxConcurrentCalls = bulkheadMaxConcurrentCalls;
+ }
+
+ public int getBulkheadWaitingTaskQueue() {
+ return bulkheadWaitingTaskQueue;
+ }
+
+ public void setBulkheadWaitingTaskQueue(int bulkheadWaitingTaskQueue) {
+ this.bulkheadWaitingTaskQueue = bulkheadWaitingTaskQueue;
+ }
+}
diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
similarity index 53%
copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
index 05673b1..3bb0027 100644
--- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
@@ -14,25 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+package org.apache.camel.component.microprofile.faulttolerance;
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+public interface FaultToleranceConstants {
-class MicroprofileFaultToleranceProcessor {
-
- private static final String FEATURE = "camel-microprofile-fault-tolerance";
-
- @BuildStep
- FeatureBuildItem feature() {
- return new FeatureBuildItem(FEATURE);
- }
-
- @BuildStep
- NativeImageResourceBuildItem initResources() {
- return new NativeImageResourceBuildItem(
- "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
- }
+ String DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID = "fault-tolerance-configuration";
}
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
new file mode 100644
index 0000000..2195b89
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
+import io.smallrye.faulttolerance.core.InvocationContext;
+import io.smallrye.faulttolerance.core.bulkhead.FutureThreadPoolBulkhead;
+import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import io.smallrye.faulttolerance.core.fallback.Fallback;
+import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
+import io.smallrye.faulttolerance.core.timeout.ScheduledExecutorTimeoutWatcher;
+import io.smallrye.faulttolerance.core.timeout.Timeout;
+import io.smallrye.faulttolerance.core.timeout.TimeoutWatcher;
+import io.smallrye.faulttolerance.core.util.SetOfThrowables;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
+import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.smallrye.faulttolerance.core.Invocation.invocation;
+
+/**
+ * Implementation of Circuit Breaker EIP using microprofile fault tolerance.
+ */
+@ManagedResource(description = "Managed FaultTolerance Processor")
+public class FaultToleranceProcessor extends AsyncProcessorSupport
+ implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceProcessor.class);
+
+ private volatile CircuitBreaker circuitBreaker;
+ private CamelContext camelContext;
+ private String id;
+ private String routeId;
+ private final FaultToleranceConfiguration config;
+ private final Processor processor;
+ private final Processor fallbackProcessor;
+ private ScheduledExecutorService scheduledExecutorService;
+ private boolean shutdownScheduledExecutorService;
+ private ExecutorService executorService;
+ private boolean shutdownExecutorService;
+ private ProcessorExchangeFactory processorExchangeFactory;
+ private PooledExchangeTaskFactory taskFactory;
+ private PooledExchangeTaskFactory fallbackTaskFactory;
+
+ public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor,
+ Processor fallbackProcessor) {
+ this.config = config;
+ this.processor = processor;
+ this.fallbackProcessor = fallbackProcessor;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ public CircuitBreaker getCircuitBreaker() {
+ return circuitBreaker;
+ }
+
+ public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
+ this.circuitBreaker = circuitBreaker;
+ }
+
+ public boolean isShutdownExecutorService() {
+ return shutdownExecutorService;
+ }
+
+ public void setShutdownExecutorService(boolean shutdownExecutorService) {
+ this.shutdownExecutorService = shutdownExecutorService;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ @Override
+ public String getTraceLabel() {
+ return "faultTolerance";
+ }
+
+ @ManagedAttribute(description = "Returns the current delay in milliseconds.")
+ public long getDelay() {
+ return config.getDelay();
+ }
+
+ @ManagedAttribute(description = "Returns the current failure rate in percentage.")
+ public float getFailureRate() {
+ return config.getFailureRatio();
+ }
+
+ @ManagedAttribute(description = "Returns the current request volume threshold.")
+ public int getRequestVolumeThreshold() {
+ return config.getRequestVolumeThreshold();
+ }
+
+ @ManagedAttribute(description = "Returns the current success threshold.")
+ public int getSuccessThreshold() {
+ return config.getSuccessThreshold();
+ }
+
+ @ManagedAttribute(description = "Is timeout enabled")
+ public boolean isTimeoutEnabled() {
+ return config.isTimeoutEnabled();
+ }
+
+ @ManagedAttribute(description = "The timeout wait duration")
+ public long getTimeoutDuration() {
+ return config.getTimeoutDuration();
+ }
+
+ @ManagedAttribute(description = "The timeout pool size for the thread pool")
+ public int getTimeoutPoolSize() {
+ return config.getTimeoutPoolSize();
+ }
+
+ @ManagedAttribute(description = "Is bulkhead enabled")
+ public boolean isBulkheadEnabled() {
+ return config.isBulkheadEnabled();
+ }
+
+ @ManagedAttribute(description = "The max amount of concurrent calls the bulkhead will support.")
+ public int getBulkheadMaxConcurrentCalls() {
+ return config.getBulkheadMaxConcurrentCalls();
+ }
+
+ @ManagedAttribute(description = "The task queue size for holding waiting tasks to be processed by the bulkhead")
+ public int getBulkheadWaitingTaskQueue() {
+ return config.getBulkheadWaitingTaskQueue();
+ }
+
+ @Override
+ public List<Processor> next() {
+ if (!hasNext()) {
+ return null;
+ }
+ List<Processor> answer = new ArrayList<>();
+ answer.add(processor);
+ if (fallbackProcessor != null) {
+ answer.add(fallbackProcessor);
+ }
+ return answer;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // run this as if we run inside try .. catch so there is no regular
+ // Camel error handler
+ exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
+
+ CircuitBreakerFallbackTask fallbackTask = null;
+ CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback);
+
+ // circuit breaker
+ FaultToleranceStrategy target = circuitBreaker;
+
+ // 1. bulkhead
+ if (config.isBulkheadEnabled()) {
+ target = new FutureThreadPoolBulkhead(
+ target, "bulkhead", config.getBulkheadMaxConcurrentCalls(),
+ config.getBulkheadWaitingTaskQueue());
+ }
+ // 2. timeout
+ if (config.isTimeoutEnabled()) {
+ TimeoutWatcher watcher = new ScheduledExecutorTimeoutWatcher(scheduledExecutorService);
+ target = new Timeout(target, "timeout", config.getTimeoutDuration(), watcher);
+ }
+ // 3. fallback
+ if (fallbackProcessor != null) {
+ fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback);
+ final CircuitBreakerFallbackTask fFallbackTask = fallbackTask;
+ target = new Fallback(target, "fallback", fallbackContext -> {
+ exchange.setException(fallbackContext.failure);
+ return fFallbackTask.call();
+ }, SetOfThrowables.ALL, SetOfThrowables.EMPTY);
+ }
+
+ try {
+ target.apply(new InvocationContext(task));
+ } catch (CircuitBreakerOpenException e) {
+ // the circuit breaker triggered a call rejected
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true);
+ } catch (Exception e) {
+ // some other kind of exception
+ exchange.setException(e);
+ } finally {
+ taskFactory.release(task);
+ if (fallbackTask != null) {
+ fallbackTaskFactory.release(fallbackTask);
+ }
+ }
+
+ exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
+ callback.done(true);
+ return true;
+ }
+
+ @Override
+ protected void doBuild() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+ boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+ if (pooled) {
+ int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+ taskFactory = new PooledTaskFactory(getId()) {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerTask();
+ }
+ };
+ taskFactory.setCapacity(capacity);
+ fallbackTaskFactory = new PooledTaskFactory(getId()) {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerFallbackTask();
+ }
+ };
+ fallbackTaskFactory.setCapacity(capacity);
+ } else {
+ taskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerTask();
+ }
+ };
+ fallbackTaskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerFallbackTask();
+ }
+ };
+ }
+
+ // create a per processor exchange factory
+ this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+ .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+ this.processorExchangeFactory.setRouteId(getRouteId());
+ this.processorExchangeFactory.setId(getId());
+
+ ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void doInit() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext", this);
+ if (circuitBreaker == null) {
+ circuitBreaker = new CircuitBreaker(
+ invocation(), id, SetOfThrowables.ALL,
+ SetOfThrowables.EMPTY, config.getDelay(), config.getRequestVolumeThreshold(), config.getFailureRatio(),
+ config.getSuccessThreshold(), new SystemStopwatch());
+ }
+
+ ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (config.isTimeoutEnabled() && scheduledExecutorService == null) {
+ scheduledExecutorService = getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+ "CircuitBreakerTimeout", config.getTimeoutPoolSize());
+ shutdownScheduledExecutorService = true;
+ }
+ if (config.isBulkheadEnabled() && executorService == null) {
+ executorService = getCamelContext().getExecutorServiceManager().newThreadPool(this, "CircuitBreakerBulkhead",
+ config.getBulkheadMaxConcurrentCalls(), config.getBulkheadMaxConcurrentCalls());
+ shutdownExecutorService = true;
+ }
+
+ ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (shutdownScheduledExecutorService && scheduledExecutorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+ scheduledExecutorService = null;
+ }
+ if (shutdownExecutorService && executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ executorService = null;
+ }
+
+ ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
+ }
+
+ private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange> {
+
+ private Exchange exchange;
+
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ // callback not in use
+ }
+
+ @Override
+ public void reset() {
+ this.exchange = null;
+ }
+
+ @Override
+ public void run() {
+ // not in use
+ }
+
+ @Override
+ public Exchange call() throws Exception {
+ Exchange copy = null;
+ UnitOfWork uow = null;
+ Throwable cause;
+
+ // turn of interruption to allow fault tolerance to process the exchange under its handling
+ exchange.adapt(ExtendedExchange.class).setInterruptable(false);
+
+ try {
+ LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
+
+ // prepare a copy of exchange so downstream processors don't
+ // cause side-effects if they mutate the exchange
+ // in case timeout processing and continue with the fallback etc
+ copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+ if (copy.getUnitOfWork() != null) {
+ uow = copy.getUnitOfWork();
+ } else {
+ // prepare uow on copy
+ uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+ copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+ // the copy must be starting from the route where its copied from
+ Route route = ExchangeHelper.getRoute(exchange);
+ if (route != null) {
+ uow.pushRoute(route);
+ }
+ }
+
+ // process the processor until its fully done
+ processor.process(copy);
+
+ // handle the processing result
+ if (copy.getException() != null) {
+ exchange.setException(copy.getException());
+ } else {
+ // copy the result as its regarded as success
+ ExchangeHelper.copyResults(exchange, copy);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, true);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ } finally {
+ // must done uow
+ UnitOfWorkHelper.doneUow(uow, copy);
+ // remember any thrown exception
+ cause = exchange.getException();
+ }
+
+ // and release exchange back in pool
+ processorExchangeFactory.release(exchange);
+
+ if (cause != null) {
+ // throw exception so resilient4j know it was a failure
+ throw RuntimeExchangeException.wrapRuntimeException(cause);
+ }
+ return exchange;
+ }
+ }
+
+ private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Callable<Exchange> {
+
+ private Exchange exchange;
+
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ // callback not in use
+ }
+
+ @Override
+ public void reset() {
+ this.exchange = null;
+ }
+
+ @Override
+ public void run() {
+ // not in use
+ }
+
+ @Override
+ public Exchange call() throws Exception {
+ Throwable throwable = exchange.getException();
+ if (fallbackProcessor == null) {
+ if (throwable instanceof TimeoutException) {
+ // the circuit breaker triggered a timeout (and there is no
+ // fallback) so lets mark the exchange as failed
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_TIMED_OUT, true);
+ exchange.setException(throwable);
+ return exchange;
+ } else if (throwable instanceof CircuitBreakerOpenException) {
+ // the circuit breaker triggered a call rejected
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true);
+ return exchange;
+ } else {
+ // throw exception so fault tolerance know it was a failure
+ throw RuntimeExchangeException.wrapRuntimeException(throwable);
+ }
+ }
+
+ // fallback route is handling the exception so its short-circuited
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, true);
+ exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true);
+
+ // store the last to endpoint as the failure endpoint
+ if (exchange.getProperty(ExchangePropertyKey.FAILURE_ENDPOINT) == null) {
+ exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT,
+ exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT));
+ }
+ // give the rest of the pipeline another chance
+ exchange.setProperty(ExchangePropertyKey.EXCEPTION_HANDLED, true);
+ exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, exchange.getException());
+ exchange.setRouteStop(false);
+ exchange.setException(null);
+ // and we should not be regarded as exhausted as we are in a try ..
+ // catch block
+ exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ // run the fallback processor
+ try {
+ LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange);
+ // process the fallback until its fully done
+ fallbackProcessor.process(exchange);
+ LOG.debug("Running fallback: {} with exchange: {} done", fallbackProcessor, exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ return exchange;
+ }
+ }
+
+}
diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
similarity index 52%
copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
index 05673b1..2b70ca9 100644
--- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
@@ -14,25 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+package org.apache.camel.component.microprofile.faulttolerance;
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.support.TypedProcessorFactory;
-class MicroprofileFaultToleranceProcessor {
-
- private static final String FEATURE = "camel-microprofile-fault-tolerance";
+/**
+ * To integrate camel-microprofile-faulttolerance with the Camel routes using the Circuit Breaker EIP.
+ */
+public class FaultToleranceProcessorFactory extends TypedProcessorFactory<CircuitBreakerDefinition> {
- @BuildStep
- FeatureBuildItem feature() {
- return new FeatureBuildItem(FEATURE);
+ public FaultToleranceProcessorFactory() {
+ super(CircuitBreakerDefinition.class);
}
- @BuildStep
- NativeImageResourceBuildItem initResources() {
- return new NativeImageResourceBuildItem(
- "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
+ @Override
+ public Processor doCreateProcessor(Route route, CircuitBreakerDefinition definition) throws Exception {
+ return new FaultToleranceReifier(route, definition).createProcessor();
}
}
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
new file mode 100644
index 0000000..2664734
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.model.FaultToleranceConfigurationCommon;
+import org.apache.camel.model.FaultToleranceConfigurationDefinition;
+import org.apache.camel.model.Model;
+import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.spi.BeanIntrospection;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurer;
+import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.function.Suppliers;
+
+public class FaultToleranceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
+
+ public FaultToleranceReifier(Route route, CircuitBreakerDefinition definition) {
+ super(route, definition);
+ }
+
+ @Override
+ public Processor createProcessor() throws Exception {
+ // create the regular and fallback processors
+ Processor processor = createChildProcessor(true);
+ Processor fallback = null;
+ if (definition.getOnFallback() != null) {
+ fallback = createProcessor(definition.getOnFallback());
+ }
+ boolean fallbackViaNetwork = definition.getOnFallback() != null
+ && parseBoolean(definition.getOnFallback().getFallbackViaNetwork(), false);
+ if (fallbackViaNetwork) {
+ throw new UnsupportedOperationException("camel-microprofile-fault-tolerance does not support onFallbackViaNetwork");
+ }
+ final FaultToleranceConfigurationCommon config = buildFaultToleranceConfiguration();
+
+ FaultToleranceConfiguration configuration = new FaultToleranceConfiguration();
+ configureCircuitBreaker(config, configuration);
+ configureTimeLimiter(config, configuration);
+ configureBulkhead(config, configuration);
+
+ FaultToleranceProcessor answer = new FaultToleranceProcessor(configuration, processor, fallback);
+ // using any existing circuit breakers?
+ if (config.getCircuitBreaker() != null) {
+ CircuitBreaker cb = mandatoryLookup(parseString(config.getCircuitBreaker()), CircuitBreaker.class);
+ answer.setCircuitBreaker(cb);
+ }
+ configureBulkheadExecutorService(answer, config);
+ return answer;
+ }
+
+ private void configureCircuitBreaker(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) {
+ target.setDelay(parseDuration(config.getDelay(), 5000));
+ target.setSuccessThreshold(parseInt(config.getSuccessThreshold(), 1));
+ target.setRequestVolumeThreshold(parseInt(config.getRequestVolumeThreshold(), 20));
+ if (config.getFailureRatio() != null) {
+ float num = parseFloat(config.getFailureRatio(), 50);
+ if (num < 1 || num > 100) {
+ throw new IllegalArgumentException("FailureRatio must be between 1 and 100, was: " + num);
+ }
+ float percent = num / 100;
+ target.setFailureRatio(percent);
+ } else {
+ target.setFailureRatio(0.5f);
+ }
+ }
+
+ private void configureTimeLimiter(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) {
+ if (!parseBoolean(config.getTimeoutEnabled(), false)) {
+ target.setTimeoutEnabled(false);
+ } else {
+ target.setTimeoutEnabled(true);
+ }
+
+ target.setTimeoutDuration(parseDuration(config.getTimeoutDuration(), 1000));
+ target.setTimeoutPoolSize(parseInt(config.getTimeoutPoolSize(), 10));
+ }
+
+ private void configureBulkhead(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) {
+ if (!parseBoolean(config.getBulkheadEnabled(), false)) {
+ return;
+ }
+
+ target.setBulkheadMaxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls(), 10));
+ target.setBulkheadWaitingTaskQueue(parseInt(config.getBulkheadWaitingTaskQueue(), 10));
+ }
+
+ private void configureBulkheadExecutorService(FaultToleranceProcessor processor, FaultToleranceConfigurationCommon config) {
+ if (!parseBoolean(config.getBulkheadEnabled(), false)) {
+ return;
+ }
+
+ if (config.getBulkheadExecutorService() != null) {
+ String ref = config.getBulkheadExecutorService();
+ boolean shutdownThreadPool = false;
+ ExecutorService executorService = lookupByNameAndType(ref, ExecutorService.class);
+ if (executorService == null) {
+ executorService = lookupExecutorServiceRef("CircuitBreaker", definition, ref);
+ shutdownThreadPool = true;
+ }
+ processor.setExecutorService(executorService);
+ processor.setShutdownExecutorService(shutdownThreadPool);
+ }
+ }
+
+ // *******************************
+ // Helpers
+ // *******************************
+
+ FaultToleranceConfigurationDefinition buildFaultToleranceConfiguration() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ final PropertyConfigurer configurer = camelContext.adapt(ExtendedCamelContext.class)
+ .getConfigurerResolver()
+ .resolvePropertyConfigurer(FaultToleranceConfigurationDefinition.class.getName(), camelContext);
+
+ // Extract properties from default configuration, the one configured on
+ // camel context takes the precedence over those in the registry
+ loadProperties(properties, Suppliers.firstNotNull(
+ () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(null),
+ () -> lookupByNameAndType(FaultToleranceConstants.DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID,
+ FaultToleranceConfigurationDefinition.class)),
+ configurer);
+
+ // Extract properties from referenced configuration, the one configured
+ // on camel context takes the precedence over those in the registry
+ if (definition.getConfiguration() != null) {
+ final String ref = parseString(definition.getConfiguration());
+
+ loadProperties(properties, Suppliers.firstNotNull(
+ () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(ref),
+ () -> mandatoryLookup(ref, FaultToleranceConfigurationDefinition.class)),
+ configurer);
+ }
+
+ // Extract properties from local configuration
+ loadProperties(properties, Optional.ofNullable(definition.getFaultToleranceConfiguration()), configurer);
+
+ // Apply properties to a new configuration
+ FaultToleranceConfigurationDefinition config = new FaultToleranceConfigurationDefinition();
+ PropertyBindingSupport.build()
+ .withCamelContext(camelContext)
+ .withConfigurer(configurer)
+ .withProperties(properties)
+ .withTarget(config)
+ .bind();
+
+ return config;
+ }
+
+ private void loadProperties(Map<String, Object> properties, Optional<?> optional, PropertyConfigurer configurer) {
+ BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+ optional.ifPresent(bean -> {
+ if (configurer instanceof ExtendedPropertyConfigurerGetter) {
+ ExtendedPropertyConfigurerGetter getter = (ExtendedPropertyConfigurerGetter) configurer;
+ Map<String, Object> types = getter.getAllOptions(bean);
+ types.forEach((k, t) -> {
+ Object value = getter.getOptionValue(bean, k, true);
+ if (value != null) {
+ properties.put(k, value);
+ }
+ });
+ } else {
+ // no configurer found so use bean introspection (reflection)
+ beanIntrospection.getProperties(bean, properties, null, false);
+ }
+ });
+ }
+
+}
diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
new file mode 100644
index 0000000..c43d558
--- /dev/null
+++ b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory
diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
index e33777b..907c133 100644
--- a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
+++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
@@ -20,7 +20,6 @@ import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import org.apache.camel.CamelContext;
import org.apache.camel.health.HealthCheckRegistry;
-import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry;
import org.apache.camel.spi.CamelContextCustomizer;
@Recorder
@@ -31,7 +30,7 @@ public class CamelMicroProfileHealthRecorder {
return new RuntimeValue<>(new CamelContextCustomizer() {
@Override
public void configure(CamelContext camelContext) {
- HealthCheckRegistry registry = new CamelMicroProfileHealthCheckRegistry(camelContext);
+ HealthCheckRegistry registry = new CamelQuarkusMicroProfileHealthCheckRegistry(camelContext);
registry.setId("camel-microprofile-health");
registry.setEnabled(true);
diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
new file mode 100644
index 0000000..8f51f80
--- /dev/null
+++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.microprofile.health.runtime;
+
+import java.lang.annotation.Annotation;
+import java.util.Set;
+
+import javax.enterprise.inject.spi.Bean;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.inject.spi.CDI;
+
+import io.smallrye.health.api.HealthRegistry;
+import io.smallrye.health.registry.LivenessHealthRegistry;
+import io.smallrye.health.registry.ReadinessHealthRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry;
+import org.eclipse.microprofile.health.Liveness;
+import org.eclipse.microprofile.health.Readiness;
+
+public class CamelQuarkusMicroProfileHealthCheckRegistry extends CamelMicroProfileHealthCheckRegistry {
+
+ CamelQuarkusMicroProfileHealthCheckRegistry(CamelContext camelContext) {
+ super(camelContext);
+ }
+
+ @Override
+ protected HealthRegistry getLivenessRegistry() {
+ return getHealthRegistryBean(LivenessHealthRegistry.class, Liveness.Literal.INSTANCE);
+ }
+
+ @Override
+ protected HealthRegistry getReadinessRegistry() {
+ return getHealthRegistryBean(ReadinessHealthRegistry.class, Readiness.Literal.INSTANCE);
+ }
+
+ private static HealthRegistry getHealthRegistryBean(Class<? extends HealthRegistry> type, Annotation qualifier) {
+ BeanManager beanManager = CDI.current().getBeanManager();
+ Set<Bean<?>> beans = beanManager.getBeans(type, qualifier);
+ if (beans.isEmpty()) {
+ throw new IllegalStateException(
+ "Beans for type " + type.getName() + " with qualifier " + qualifier + " could not be found.");
+ }
+
+ Bean<?> bean = beanManager.resolve(beans);
+ Object reference = beanManager.getReference(bean, type, beanManager.createCreationalContext(bean));
+ return type.cast(reference);
+ }
+}