You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/29 08:26:51 UTC
[camel] 02/04: CAMEL-16418: Circuit breakers should ensure UoW is
done
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6ff4429185a68546aee8c76ac76a6516b9e376a6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 29 10:17:02 2021 +0200
CAMEL-16418: Circuit breakers should ensure UoW is done
---
.../resilience4j/ResilienceProcessor.java | 25 +++++++++++++---------
1 file changed, 15 insertions(+), 10 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 3dc5b4d..37b36f8 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -39,6 +39,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
@@ -48,7 +49,7 @@ import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.CircuitBreakerConstants;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.UnitOfWorkHelper;
@@ -410,14 +411,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport
}
private Exchange processInCopy(Exchange exchange) {
+ Exchange copy = null;
+ UnitOfWork uow = null;
try {
LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
// prepare a copy of exchange so downstream processors don't
// cause side-effects if they mutate the exchange
// in case timeout processing and continue with the fallback etc
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+ copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+ // prepare uow on copy
+ uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+ copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+
// process the processor until its fully done
processor.process(copy);
+
+ // handle the processing result
if (copy.getException() != null) {
exchange.setException(copy.getException());
} else {
@@ -426,17 +435,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
}
- if (copy.getUnitOfWork() == null) {
- // handover completions and done them manually to ensure they are being executed
- List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
- UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
- } else {
- // done the unit of work
- copy.getUnitOfWork().done(exchange);
- }
} catch (Exception e) {
exchange.setException(e);
+ } finally {
+ // must done uow
+ UnitOfWorkHelper.doneUow(uow, copy);
}
+
if (exchange.getException() != null) {
// throw exception so resilient4j know it was a failure
throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());