You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/29 07:57:35 UTC

[camel] branch master updated: Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5273)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9675f4b  Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5273)
9675f4b is described below

commit 9675f4b48a19820ce22c8138db8d13a94d523b17
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Mon Mar 29 03:57:10 2021 -0400

    Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5273)
    
    Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
 .../microprofile/faulttolerance/FaultToleranceProcessor.java   | 10 ++++++++++
 .../camel/component/resilience4j/ResilienceProcessor.java      | 10 ++++++++++
 2 files changed, 20 insertions(+)

diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 7f12b19..73c9e40 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -45,8 +45,10 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
 import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
@@ -318,6 +320,14 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
                 }
+                if (copy.getUnitOfWork() == null) {
+                    // handover completions and done them manually to ensure they are being executed
+                    List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
+                    UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
+                } else {
+                    // done the unit of work
+                    copy.getUnitOfWork().done(exchange);
+                }
             } catch (Exception e) {
                 exchange.setException(e);
             }
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index c2da495..3dc5b4d 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -48,8 +48,10 @@ import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -424,6 +426,14 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
             }
+            if (copy.getUnitOfWork() == null) {
+                // handover completions and done them manually to ensure they are being executed
+                List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
+                UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
+            } else {
+                // done the unit of work
+                copy.getUnitOfWork().done(exchange);
+            }
         } catch (Exception e) {
             exchange.setException(e);
         }