You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/29 08:26:49 UTC

[camel] branch master updated (9675f4b -> 593b461)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 9675f4b  Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5273)
     new 997b869  CAMEL-16418: Added unit test
     new 6ff4429  CAMEL-16418: Circuit breakers should ensure UoW is done
     new 57bb48b  CAMEL-16418: Circuit breakers should ensure UoW is done
     new 593b461  CAMEL-16418: Circuit breakers should ensure UoW is done

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hystrix/processor/HystrixProcessorCommand.java | 23 ++++++++++++++-
 .../faulttolerance/FaultToleranceProcessor.java    | 27 ++++++++++-------
 components/camel-netty-http/pom.xml                |  5 ++++
 ...t.java => RestNettyCircuitBreakerLeakTest.java} | 34 +++++++++++++---------
 .../resilience4j/ResilienceProcessor.java          | 33 ++++++++++++---------
 5 files changed, 84 insertions(+), 38 deletions(-)
 copy components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/rest/{RestNettyHttpGetTest.java => RestNettyCircuitBreakerLeakTest.java} (66%)

[camel] 01/04: CAMEL-16418: Added unit test

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 997b869666abe8213446ebc6887f173a4d0ac3cc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 29 09:57:46 2021 +0200

    CAMEL-16418: Added unit test
---
 components/camel-netty-http/pom.xml                |  5 ++
 .../http/rest/RestNettyCircuitBreakerLeakTest.java | 68 ++++++++++++++++++++++
 2 files changed, 73 insertions(+)

diff --git a/components/camel-netty-http/pom.xml b/components/camel-netty-http/pom.xml
index 5609f7c..1124d1d 100644
--- a/components/camel-netty-http/pom.xml
+++ b/components/camel-netty-http/pom.xml
@@ -91,6 +91,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-resilience4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-mock</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/rest/RestNettyCircuitBreakerLeakTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/rest/RestNettyCircuitBreakerLeakTest.java
new file mode 100644
index 0000000..49db422
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/rest/RestNettyCircuitBreakerLeakTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http.rest;
+
+import io.netty.util.ResourceLeakDetector;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.http.BaseNettyTest;
+import org.apache.camel.component.netty.http.RestNettyHttpBinding;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RestNettyCircuitBreakerLeakTest extends BaseNettyTest {
+
+    @BindToRegistry("mybinding")
+    private RestNettyHttpBinding binding = new RestNettyHttpBinding();
+
+    @Test
+    public void testCircuitBreaker() throws Exception {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
+        String out = template.requestBody("netty-http:http://localhost:{{port}}/demo/get", null, String.class);
+        assertEquals("demo page", out);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
+                // configure to use netty-http on localhost with the given port
+                restConfiguration().component("netty-http").host("localhost").port(getPort())
+                        .endpointProperty("nettyHttpBinding", "#mybinding");
+
+                rest().get("/demo").produces("text/plain")
+                    .route()
+                        .transform().constant("demo page");
+
+                rest().get("/demo/get").route()
+                    .circuitBreaker()
+                        .resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(10000).end()
+                            .log("incoming request")
+                            .to("rest:get:demo?host=localhost:" + getPort())
+                        .onFallback()
+                            .transform().constant("timeout")
+                    .end();
+            }
+        };
+    }
+
+}

[camel] 04/04: CAMEL-16418: Circuit breakers should ensure UoW is done

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 593b4614cf38b7da89357e9d35851d7d53584de9
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 29 10:24:50 2021 +0200

    CAMEL-16418: Circuit breakers should ensure UoW is done
---
 .../hystrix/processor/HystrixProcessorCommand.java | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 0a1d04e..4201850 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -23,10 +23,13 @@ import com.netflix.hystrix.exception.HystrixBadRequestException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,11 +115,18 @@ public class HystrixProcessorCommand extends HystrixCommand {
 
     @Override
     protected Message run() throws Exception {
+        Exchange copy = null;
+        UnitOfWork uow = null;
+
         LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
 
         // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
         // in case Hystrix timeout processing and continue with the fallback etc
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+        copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+
+        // prepare uow on copy
+        uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+        copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
         try {
             // process the processor until its fully done
             // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method)
@@ -131,6 +141,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
                 && getProperties().fallbackEnabled().get()
                 && isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
             LOG.debug("Exiting run command due to a hystrix execution timeout in processing exchange: {}", exchange);
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
             return null;
         }
 
@@ -138,6 +150,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
         // and therefore we need this thread to not do anymore if fallback is already in process
         if (fallbackInUse.get()) {
             LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
             return null;
         }
 
@@ -150,6 +164,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
             // and therefore we need this thread to not do anymore if fallback is already in process
             if (fallbackInUse.get()) {
                 LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange);
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
                 return null;
             }
 
@@ -164,12 +180,17 @@ public class HystrixProcessorCommand extends HystrixCommand {
             if (camelExchangeException instanceof HystrixBadRequestException) {
                 LOG.debug("Running processor: {} with exchange: {} done as bad request", processor, exchange);
                 exchange.setException(camelExchangeException);
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
                 throw camelExchangeException;
             }
 
             // copy the result before its regarded as success
             ExchangeHelper.copyResults(exchange, copy);
 
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
+
             // in case of an exception in the exchange
             // we need to trigger this by throwing the exception so hystrix will execute the fallback
             // or open the circuit

[camel] 02/04: CAMEL-16418: Circuit breakers should ensure UoW is done

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6ff4429185a68546aee8c76ac76a6516b9e376a6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 29 10:17:02 2021 +0200

    CAMEL-16418: Circuit breakers should ensure UoW is done
---
 .../resilience4j/ResilienceProcessor.java          | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 3dc5b4d..37b36f8 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -39,6 +39,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -48,7 +49,7 @@ import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
@@ -410,14 +411,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     }
 
     private Exchange processInCopy(Exchange exchange) {
+        Exchange copy = null;
+        UnitOfWork uow = null;
         try {
             LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
             // prepare a copy of exchange so downstream processors don't
             // cause side-effects if they mutate the exchange
             // in case timeout processing and continue with the fallback etc
-            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+            copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+            // prepare uow on copy
+            uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+            copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+
             // process the processor until its fully done
             processor.process(copy);
+
+            // handle the processing result
             if (copy.getException() != null) {
                 exchange.setException(copy.getException());
             } else {
@@ -426,17 +435,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
             }
-            if (copy.getUnitOfWork() == null) {
-                // handover completions and done them manually to ensure they are being executed
-                List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
-                UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
-            } else {
-                // done the unit of work
-                copy.getUnitOfWork().done(exchange);
-            }
         } catch (Exception e) {
             exchange.setException(e);
+        } finally {
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
         }
+
         if (exchange.getException() != null) {
             // throw exception so resilient4j know it was a failure
             throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());

[camel] 03/04: CAMEL-16418: Circuit breakers should ensure UoW is done

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 57bb48bac7a617d6b6f886cb7b1b9753ddbf21f3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 29 10:21:19 2021 +0200

    CAMEL-16418: Circuit breakers should ensure UoW is done
---
 .../faulttolerance/FaultToleranceProcessor.java    | 27 ++++++++++++++--------
 .../resilience4j/ResilienceProcessor.java          |  8 +++----
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 73c9e40..e2dfc7b 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -37,6 +37,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -45,7 +46,7 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
@@ -301,17 +302,27 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
 
         @Override
         public Exchange call() throws Exception {
+            Exchange copy = null;
+            UnitOfWork uow = null;
+
             // turn of interruption to allow fault tolerance to process the exchange under its handling
             exchange.adapt(ExtendedExchange.class).setInterruptable(false);
 
             try {
                 LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
+
                 // prepare a copy of exchange so downstream processors don't
                 // cause side-effects if they mutate the exchange
                 // in case timeout processing and continue with the fallback etc
-                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+                copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+                // prepare uow on copy
+                uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+                copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+
                 // process the processor until its fully done
                 processor.process(copy);
+
+                // handle the processing result
                 if (copy.getException() != null) {
                     exchange.setException(copy.getException());
                 } else {
@@ -320,17 +331,13 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
                 }
-                if (copy.getUnitOfWork() == null) {
-                    // handover completions and done them manually to ensure they are being executed
-                    List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
-                    UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
-                } else {
-                    // done the unit of work
-                    copy.getUnitOfWork().done(exchange);
-                }
             } catch (Exception e) {
                 exchange.setException(e);
+            } finally {
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
             }
+
             if (exchange.getException() != null) {
                 // force exception so the circuit breaker can react
                 throw exchange.getException();
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 37b36f8..a541b61 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -378,13 +378,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         if (timeLimiter != null) {
             Supplier<CompletableFuture<Exchange>> futureSupplier;
             if (executorService == null) {
-                futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange));
+                futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange));
             } else {
-                futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange), executorService);
+                futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange), executorService);
             }
             task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
         } else {
-            task = new CircuitBreakerTask(() -> processInCopy(exchange));
+            task = new CircuitBreakerTask(() -> processTask(exchange));
         }
 
         if (bulkhead != null) {
@@ -410,7 +410,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         return true;
     }
 
-    private Exchange processInCopy(Exchange exchange) {
+    private Exchange processTask(Exchange exchange) {
         Exchange copy = null;
         UnitOfWork uow = null;
         try {