You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/29 08:26:51 UTC

[camel] 02/04: CAMEL-16418: Circuit breakers should ensure UoW is done

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6ff4429185a68546aee8c76ac76a6516b9e376a6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 29 10:17:02 2021 +0200

    CAMEL-16418: Circuit breakers should ensure UoW is done
---
 .../resilience4j/ResilienceProcessor.java          | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 3dc5b4d..37b36f8 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -39,6 +39,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -48,7 +49,7 @@ import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
@@ -410,14 +411,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     }
 
     private Exchange processInCopy(Exchange exchange) {
+        Exchange copy = null;
+        UnitOfWork uow = null;
         try {
             LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
             // prepare a copy of exchange so downstream processors don't
             // cause side-effects if they mutate the exchange
             // in case timeout processing and continue with the fallback etc
-            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+            copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+            // prepare uow on copy
+            uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+            copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+
             // process the processor until its fully done
             processor.process(copy);
+
+            // handle the processing result
             if (copy.getException() != null) {
                 exchange.setException(copy.getException());
             } else {
@@ -426,17 +435,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
             }
-            if (copy.getUnitOfWork() == null) {
-                // handover completions and done them manually to ensure they are being executed
-                List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
-                UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
-            } else {
-                // done the unit of work
-                copy.getUnitOfWork().done(exchange);
-            }
         } catch (Exception e) {
             exchange.setException(e);
+        } finally {
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
         }
+
         if (exchange.getException() != null) {
             // throw exception so resilient4j know it was a failure
             throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());