You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/29 13:55:17 UTC

[camel] branch camel-3.7.x updated: Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5274)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new dc82083  Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5274)
dc82083 is described below

commit dc8208328f916a10021d298bbfc460631bab2e73
Author: Samrat Dhillon <sa...@gmail.com>
AuthorDate: Mon Mar 29 09:54:39 2021 -0400

    Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer (#5274)
    
    Co-authored-by: Samrat Dhillon <sa...@innovapost.com>
---
 .../microprofile/faulttolerance/FaultToleranceProcessor.java  | 10 ++++++++++
 .../camel/component/resilience4j/ResilienceProcessor.java     | 11 +++++++++++
 2 files changed, 21 insertions(+)

diff --git a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index e8eb374..9d5754c 100644
--- a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -44,8 +44,10 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
 import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
@@ -317,6 +319,14 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
                 }
+                if (copy.getUnitOfWork() == null) {
+                    // handover completions and done them manually to ensure they are being executed
+                    List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
+                    UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
+                } else {
+                    // done the unit of work
+                    copy.getUnitOfWork().done(exchange);
+                }
             } catch (Throwable e) {
                 exchange.setException(e);
             }
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index c9bd5ca..e8cb471 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -47,8 +47,10 @@ import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,6 +425,15 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true);
                 exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
             }
+            if (copy.getUnitOfWork() == null) {
+                // handover completions and done them manually to ensure they are being executed
+                List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions();
+                UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG);
+            } else {
+                // done the unit of work
+                copy.getUnitOfWork().done(exchange);
+            }
+
         } catch (Throwable e) {
             exchange.setException(e);
         }