You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/06 07:31:11 UTC

[camel] branch master updated (5721592 -> 61cd7ae)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 5721592  CAMEL-16185 - AWS S3: improve multipart support - streaming upload
     new e1b4310  CAMEL-16451: camel-core - ExchangePooling for EIPs. Circuit Breaker EIP
     new 61cd7ae  CAMEL-16451: camel-core - ExchangePooling for EIPs. Circuit Breaker EIP

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../faulttolerance/FaultToleranceProcessor.java    | 71 ++++++++++++++---
 .../resilience4j/ResilienceProcessor.java          | 88 ++++++++++++++++------
 2 files changed, 126 insertions(+), 33 deletions(-)

[camel] 02/02: CAMEL-16451: camel-core - ExchangePooling for EIPs. Circuit Breaker EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 61cd7ae5c58e949b4dc810aa053e111e8721f704
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 6 09:29:20 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Circuit Breaker EIP
---
 .../faulttolerance/FaultToleranceProcessor.java    | 71 +++++++++++++++++++---
 .../resilience4j/ResilienceProcessor.java          |  2 +-
 2 files changed, 62 insertions(+), 11 deletions(-)

diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index e2dfc7b..1293aed 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -46,10 +46,13 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
 import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
@@ -63,13 +66,14 @@ import static io.smallrye.faulttolerance.core.Invocation.invocation;
  */
 @ManagedResource(description = "Managed FaultTolerance Processor")
 public class FaultToleranceProcessor extends AsyncProcessorSupport
-        implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
+        implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceProcessor.class);
 
     private volatile CircuitBreaker circuitBreaker;
     private CamelContext camelContext;
     private String id;
+    private String routeId;
     private final FaultToleranceConfiguration config;
     private final Processor processor;
     private final Processor fallbackProcessor;
@@ -77,6 +81,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
     private boolean shutdownScheduledExecutorService;
     private ExecutorService executorService;
     private boolean shutdownExecutorService;
+    private ProcessorExchangeFactory processorExchangeFactory;
 
     public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor,
                                    Processor fallbackProcessor) {
@@ -105,6 +110,16 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
         this.id = id;
     }
 
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
     public CircuitBreaker getCircuitBreaker() {
         return circuitBreaker;
     }
@@ -209,7 +224,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
         // Camel error handler
         exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
 
-        Callable<Exchange> task = new CircuitBreakerTask(processor, exchange);
+        Callable<Exchange> task = new CircuitBreakerTask(processorExchangeFactory, processor, exchange);
 
         // circuit breaker
         FaultToleranceStrategy target = circuitBreaker;
@@ -253,6 +268,19 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+        // create a per processor exchange factory
+        this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+                .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.processorExchangeFactory.setRouteId(getRouteId());
+        this.processorExchangeFactory.setId(getId());
+
+        ServiceHelper.buildService(processorExchangeFactory, processor);
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
     protected void doInit() throws Exception {
         ObjectHelper.notNull(camelContext, "CamelContext", this);
@@ -262,6 +290,8 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                     SetOfThrowables.EMPTY, config.getDelay(), config.getRequestVolumeThreshold(), config.getFailureRatio(),
                     config.getSuccessThreshold(), new SystemStopwatch(), null);
         }
+
+        ServiceHelper.initService(processorExchangeFactory, processor);
     }
 
     @Override
@@ -276,6 +306,8 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                     config.getBulkheadMaxConcurrentCalls(), config.getBulkheadMaxConcurrentCalls());
             shutdownExecutorService = true;
         }
+
+        ServiceHelper.startService(processorExchangeFactory, processor);
     }
 
     @Override
@@ -288,14 +320,23 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
             getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
             executorService = null;
         }
+
+        ServiceHelper.stopService(processorExchangeFactory, processor);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, processor);
     }
 
     private static final class CircuitBreakerTask implements Callable<Exchange> {
 
+        private final ProcessorExchangeFactory processorExchangeFactory;
         private final Processor processor;
         private final Exchange exchange;
 
-        private CircuitBreakerTask(Processor processor, Exchange exchange) {
+        private CircuitBreakerTask(ProcessorExchangeFactory processorExchangeFactory, Processor processor, Exchange exchange) {
+            this.processorExchangeFactory = processorExchangeFactory;
             this.processor = processor;
             this.exchange = exchange;
         }
@@ -304,6 +345,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
         public Exchange call() throws Exception {
             Exchange copy = null;
             UnitOfWork uow = null;
+            Throwable cause;
 
             // turn of interruption to allow fault tolerance to process the exchange under its handling
             exchange.adapt(ExtendedExchange.class).setInterruptable(false);
@@ -314,10 +356,14 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                 // prepare a copy of exchange so downstream processors don't
                 // cause side-effects if they mutate the exchange
                 // in case timeout processing and continue with the fallback etc
-                copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
-                // prepare uow on copy
-                uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
-                copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+                copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+                if (copy.getUnitOfWork() != null) {
+                    uow = copy.getUnitOfWork();
+                } else {
+                    // prepare uow on copy
+                    uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+                    copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+                }
 
                 // process the processor until its fully done
                 processor.process(copy);
@@ -336,11 +382,16 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
             } finally {
                 // must done uow
                 UnitOfWorkHelper.doneUow(uow, copy);
+                // remember any thrown exception
+                cause = exchange.getException();
             }
 
-            if (exchange.getException() != null) {
-                // force exception so the circuit breaker can react
-                throw exchange.getException();
+            // and release exchange back in pool
+            processorExchangeFactory.release(exchange);
+
+            if (cause != null) {
+                // throw exception so resilient4j know it was a failure
+                throw RuntimeExchangeException.wrapRuntimeException(cause);
             }
             return exchange;
         }
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index a6b0ed6..365c566 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -460,7 +460,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     private Exchange processTask(Exchange exchange) {
         Exchange copy = null;
         UnitOfWork uow = null;
-        Throwable cause = null;
+        Throwable cause;
         try {
             LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
             // prepare a copy of exchange so downstream processors don't

[camel] 01/02: CAMEL-16451: camel-core - ExchangePooling for EIPs. Circuit Breaker EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e1b4310a611f269f0b29d772b40391ccca04e13d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 6 09:16:34 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Circuit Breaker EIP
---
 .../resilience4j/ResilienceProcessor.java          | 88 ++++++++++++++++------
 1 file changed, 65 insertions(+), 23 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index a541b61..a6b0ed6 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -49,10 +49,13 @@ import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,13 +65,14 @@ import org.slf4j.LoggerFactory;
  */
 @ManagedResource(description = "Managed Resilience Processor")
 public class ResilienceProcessor extends AsyncProcessorSupport
-        implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
+        implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class);
 
     private volatile CircuitBreaker circuitBreaker;
     private CamelContext camelContext;
     private String id;
+    private String routeId;
     private final CircuitBreakerConfig circuitBreakerConfig;
     private final BulkheadConfig bulkheadConfig;
     private Bulkhead bulkhead;
@@ -78,6 +82,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     private final Processor fallback;
     private boolean shutdownExecutorService;
     private ExecutorService executorService;
+    private ProcessorExchangeFactory processorExchangeFactory;
 
     public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig,
                                TimeLimiterConfig timeLimiterConfig, Processor processor,
@@ -91,13 +96,45 @@ public class ResilienceProcessor extends AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
-        super.doBuild();
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+
         if (timeLimiterConfig != null) {
             timeLimiter = TimeLimiter.of(id, timeLimiterConfig);
         }
         if (bulkheadConfig != null) {
             bulkhead = Bulkhead.of(id, bulkheadConfig);
         }
+
+        // create a per processor exchange factory
+        this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+                .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.processorExchangeFactory.setRouteId(getRouteId());
+        this.processorExchangeFactory.setId(getId());
+
+        ServiceHelper.buildService(processorExchangeFactory, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (circuitBreaker == null) {
+            circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
+        }
+
+        ServiceHelper.startService(processorExchangeFactory, processor);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (shutdownExecutorService && executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+        }
+
+        ServiceHelper.stopService(processorExchangeFactory, processor);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, processor);
     }
 
     @Override
@@ -120,6 +157,16 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         this.id = id;
     }
 
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
     public CircuitBreaker getCircuitBreaker() {
         return circuitBreaker;
     }
@@ -413,15 +460,20 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     private Exchange processTask(Exchange exchange) {
         Exchange copy = null;
         UnitOfWork uow = null;
+        Throwable cause = null;
         try {
             LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
             // prepare a copy of exchange so downstream processors don't
             // cause side-effects if they mutate the exchange
             // in case timeout processing and continue with the fallback etc
-            copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
-            // prepare uow on copy
-            uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
-            copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+            copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+            if (copy.getUnitOfWork() != null) {
+                uow = copy.getUnitOfWork();
+            } else {
+                // prepare uow on copy
+                uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+                copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+            }
 
             // process the processor until its fully done
             processor.process(copy);
@@ -440,30 +492,20 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         } finally {
             // must done uow
             UnitOfWorkHelper.doneUow(uow, copy);
+            // remember any thrown exception
+            cause = exchange.getException();
         }
 
-        if (exchange.getException() != null) {
+        // and release exchange back in pool
+        processorExchangeFactory.release(exchange);
+
+        if (cause != null) {
             // throw exception so resilient4j know it was a failure
-            throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());
+            throw RuntimeExchangeException.wrapRuntimeException(cause);
         }
         return exchange;
     }
 
-    @Override
-    protected void doStart() throws Exception {
-        ObjectHelper.notNull(camelContext, "CamelContext", this);
-        if (circuitBreaker == null) {
-            circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (shutdownExecutorService && executorService != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
-        }
-    }
-
     private static final class CircuitBreakerTask implements Callable<Exchange> {
 
         Supplier<Exchange> supplier;