You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/29 06:43:06 UTC
[camel] branch camel-3.14.x updated: CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8643)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push:
new 6df097e52d1 CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8643)
6df097e52d1 is described below
commit 6df097e52d1cf22d2b4f038ba70c00594740f790
Author: Rastislav Papp <ra...@gmail.com>
AuthorDate: Sat Oct 29 08:43:01 2022 +0200
CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess. (#7987) (#8643)
* CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
Call the afterProcess from the MDCcallback instead of scheduling it
separately. Reset current routeId on MDC from startProcess.
* CAMEL-18093 - Add option to turn on follow redirects
Signed-off-by: Rhuan Rocha <rh...@gmail.com>
* CAMEL-18255: Address memory leak caused by scheduling MDC.afterProcess.
Call the afterProcess from the MDCcallback instead of scheduling it
separately. Reset current routeId on MDC from startProcess.
Co-authored-by: Rhuan Rocha <rh...@gmail.com>
Co-authored-by: klease <38...@users.noreply.github.com>
Co-authored-by: Rhuan Rocha <rh...@gmail.com>
---
.../camel/impl/engine/CamelInternalProcessor.java | 9 +-------
.../camel/impl/engine/DefaultUnitOfWork.java | 26 ++++++++++++++++++++--
.../apache/camel/impl/engine/MDCUnitOfWork.java | 7 +++++-
3 files changed, 31 insertions(+), 11 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 97ba072aaa0..658b3758807 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -400,14 +400,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
- // optimize to only do after uow processing if really needed
- if (beforeAndAfter) {
- // use the same callback as with beforeProcess
- final CamelInternalTask afterCallback = afterTask;
- reactiveExecutor.schedule(() -> {
- uow.afterProcess(processor, exchange, afterCallback, sync);
- });
- }
+ // CAMEL-18255: move uow.afterProcess handling to the callback
if (LOG.isTraceEnabled()) {
LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 704d0d56f85..fa8c5c75f19 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -356,8 +356,8 @@ public class DefaultUnitOfWork implements UnitOfWork {
@Override
public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
- // no wrapping needed
- return callback;
+ // CAMEL-18255: support running afterProcess from the async callback
+ return isBeforeAfterProcess() ? new UnitOfWorkCallback(callback, processor) : callback;
}
@Override
@@ -377,4 +377,26 @@ public class DefaultUnitOfWork implements UnitOfWork {
public String toString() {
return "DefaultUnitOfWork";
}
+
+ private final class UnitOfWorkCallback implements AsyncCallback {
+
+ private final AsyncCallback delegate;
+ private final Processor processor;
+
+ private UnitOfWorkCallback(AsyncCallback delegate, Processor processor) {
+ this.delegate = delegate;
+ this.processor = processor;
+ }
+
+ @Override
+ public void done(boolean doneSync) {
+ delegate.done(doneSync);
+ afterProcess(processor, exchange, delegate, doneSync);
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+ }
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index 074521d0b1b..3cf1a52b289 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -85,6 +85,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
if (breadcrumbId != null) {
MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
}
+ Route current = getRoute();
+ if (current != null) {
+ MDC.put(MDC_ROUTE_ID, current.getRouteId());
+ }
}
@Override
@@ -145,7 +149,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
MDC.put(MDC_STEP_ID, stepId);
}
// return callback with after processing work
- return new MDCCallback(callback, pattern);
+ final AsyncCallback uowCallback = super.beforeProcess(processor, exchange, callback);
+ return new MDCCallback(uowCallback, pattern);
}
@Override