You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/29 06:42:53 UTC

[camel] branch camel-3.18.x updated: CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8645)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new 33d45397ba7 CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8645)
33d45397ba7 is described below

commit 33d45397ba777838b69f0d2f45982f2c75720c19
Author: Rastislav Papp <ra...@gmail.com>
AuthorDate: Sat Oct 29 08:42:47 2022 +0200

    CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8645)
    
    * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
    Call the afterProcess from the MDCcallback instead of scheduling it
    separately. Reset current routeId on MDC from startProcess.
    
    * CAMEL-18093 - Add option to turn on follow redirects
    
    Signed-off-by: Rhuan Rocha <rh...@gmail.com>
    
    * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
    Call the afterProcess from the MDCcallback instead of scheduling it
    separately. Reset current routeId on MDC from startProcess.
    
    Co-authored-by: Rhuan Rocha <rh...@gmail.com>
    
    Co-authored-by: klease <38...@users.noreply.github.com>
    Co-authored-by: Rhuan Rocha <rh...@gmail.com>
---
 .../camel/impl/engine/CamelInternalProcessor.java  |  9 +-------
 .../camel/impl/engine/DefaultUnitOfWork.java       | 26 ++++++++++++++++++++--
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |  7 +++++-
 3 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 7babd956ea8..868269ac0ab 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -401,14 +401,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            // optimize to only do after uow processing if really needed
-            if (beforeAndAfter) {
-                // use the same callback as with beforeProcess
-                final CamelInternalTask afterCallback = afterTask;
-                reactiveExecutor.schedule(() -> {
-                    uow.afterProcess(processor, exchange, afterCallback, sync);
-                });
-            }
+            // CAMEL-18255: move uow.afterProcess handling to the callback
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 704d0d56f85..fa8c5c75f19 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -356,8 +356,8 @@ public class DefaultUnitOfWork implements UnitOfWork {
 
     @Override
     public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
-        // no wrapping needed
-        return callback;
+        // CAMEL-18255: support running afterProcess from the async callback
+        return isBeforeAfterProcess() ? new UnitOfWorkCallback(callback, processor) : callback;
     }
 
     @Override
@@ -377,4 +377,26 @@ public class DefaultUnitOfWork implements UnitOfWork {
     public String toString() {
         return "DefaultUnitOfWork";
     }
+
+    private final class UnitOfWorkCallback implements AsyncCallback {
+
+        private final AsyncCallback delegate;
+        private final Processor processor;
+
+        private UnitOfWorkCallback(AsyncCallback delegate, Processor processor) {
+            this.delegate = delegate;
+            this.processor = processor;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            delegate.done(doneSync);
+            afterProcess(processor, exchange, delegate, doneSync);
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+    }
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index 074521d0b1b..3cf1a52b289 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -85,6 +85,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
         if (breadcrumbId != null) {
             MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
         }
+        Route current = getRoute();
+        if (current != null) {
+            MDC.put(MDC_ROUTE_ID, current.getRouteId());
+        }
     }
 
     @Override
@@ -145,7 +149,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
             MDC.put(MDC_STEP_ID, stepId);
         }
         // return callback with after processing work
-        return new MDCCallback(callback, pattern);
+        final AsyncCallback uowCallback = super.beforeProcess(processor, exchange, callback);
+        return new MDCCallback(uowCallback, pattern);
     }
 
     @Override