You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/29 06:43:06 UTC

[camel] branch camel-3.14.x updated: CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8643)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new 6df097e52d1 CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8643)
6df097e52d1 is described below

commit 6df097e52d1cf22d2b4f038ba70c00594740f790
Author: Rastislav Papp <ra...@gmail.com>
AuthorDate: Sat Oct 29 08:43:01 2022 +0200

    CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8643)
    
    * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
    Call the afterProcess from the MDCcallback instead of scheduling it
    separately. Reset current routeId on MDC from startProcess.
    
    * CAMEL-18093 - Add option to turn on follow redirects
    
    Signed-off-by: Rhuan Rocha <rh...@gmail.com>
    
    * CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
    Call the afterProcess from the MDCcallback instead of scheduling it
    separately. Reset current routeId on MDC from startProcess.
    
    Co-authored-by: Rhuan Rocha <rh...@gmail.com>
    
    Co-authored-by: klease <38...@users.noreply.github.com>
    Co-authored-by: Rhuan Rocha <rh...@gmail.com>
---
 .../camel/impl/engine/CamelInternalProcessor.java  |  9 +-------
 .../camel/impl/engine/DefaultUnitOfWork.java       | 26 ++++++++++++++++++++--
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |  7 +++++-
 3 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 97ba072aaa0..658b3758807 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -400,14 +400,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            // optimize to only do after uow processing if really needed
-            if (beforeAndAfter) {
-                // use the same callback as with beforeProcess
-                final CamelInternalTask afterCallback = afterTask;
-                reactiveExecutor.schedule(() -> {
-                    uow.afterProcess(processor, exchange, afterCallback, sync);
-                });
-            }
+            // CAMEL-18255: move uow.afterProcess handling to the callback
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 704d0d56f85..fa8c5c75f19 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -356,8 +356,8 @@ public class DefaultUnitOfWork implements UnitOfWork {
 
     @Override
     public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
-        // no wrapping needed
-        return callback;
+        // CAMEL-18255: support running afterProcess from the async callback
+        return isBeforeAfterProcess() ? new UnitOfWorkCallback(callback, processor) : callback;
     }
 
     @Override
@@ -377,4 +377,26 @@ public class DefaultUnitOfWork implements UnitOfWork {
     public String toString() {
         return "DefaultUnitOfWork";
     }
+
+    private final class UnitOfWorkCallback implements AsyncCallback {
+
+        private final AsyncCallback delegate;
+        private final Processor processor;
+
+        private UnitOfWorkCallback(AsyncCallback delegate, Processor processor) {
+            this.delegate = delegate;
+            this.processor = processor;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            delegate.done(doneSync);
+            afterProcess(processor, exchange, delegate, doneSync);
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+    }
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index 074521d0b1b..3cf1a52b289 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -85,6 +85,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
         if (breadcrumbId != null) {
             MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
         }
+        Route current = getRoute();
+        if (current != null) {
+            MDC.put(MDC_ROUTE_ID, current.getRouteId());
+        }
     }
 
     @Override
@@ -145,7 +149,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
             MDC.put(MDC_STEP_ID, stepId);
         }
         // return callback with after processing work
-        return new MDCCallback(callback, pattern);
+        final AsyncCallback uowCallback = super.beforeProcess(processor, exchange, callback);
+        return new MDCCallback(uowCallback, pattern);
     }
 
     @Override