You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/11/01 08:36:42 UTC
[camel] branch main updated: CAMEL-17009: MDCUnitOfWork should
clear when done/stopping and also after process to not leak state to other
threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via
reactive executor to ensure ordering.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b33bea6 CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering.
b33bea6 is described below
commit b33bea6cccbc797cf874884247fac69bf8ef33ae
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Nov 1 09:35:13 2021 +0100
CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering.
---
.../camel/impl/engine/CamelInternalProcessor.java | 10 ++++++----
.../org/apache/camel/impl/engine/MDCUnitOfWork.java | 17 ++++++++++++++++-
.../org/apache/camel/processor/MDCClearingTest.java | 3 ---
...t.java => MDCResetMidRouteProducerTemplateTest.java} | 4 +++-
.../java/org/apache/camel/processor/MDCResetTest.java | 11 +----------
.../java/org/apache/camel/support/UnitOfWorkHelper.java | 6 ++++++
6 files changed, 32 insertions(+), 19 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 25931fc..97ba072 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -382,8 +382,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
} else {
final UnitOfWork uow = exchange.getUnitOfWork();
- // do uow before processing and if a value is returned the the uow wants to be processed after
- // was well in the same thread
+ // optimize to only do before uow processing if really needed
AsyncCallback async = afterTask;
boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess();
if (beforeAndAfter) {
@@ -403,8 +402,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
// optimize to only do after uow processing if really needed
if (beforeAndAfter) {
- // execute any after processor work (in current thread, not in the callback)
- uow.afterProcess(processor, exchange, afterTask, sync);
+ // use the same callback as with beforeProcess
+ final CamelInternalTask afterCallback = afterTask;
+ reactiveExecutor.schedule(() -> {
+ uow.afterProcess(processor, exchange, afterCallback, sync);
+ });
}
if (LOG.isTraceEnabled()) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index db7d5d1..dd76a31 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -24,6 +24,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.Service;
import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.PatternHelper;
@@ -34,7 +35,7 @@ import org.slf4j.MDC;
/**
* This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>.
*/
-public class MDCUnitOfWork extends DefaultUnitOfWork {
+public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class);
@@ -147,6 +148,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
if (stepId == null) {
MDC.remove(MDC_STEP_ID);
}
+
+ // clear to avoid leaking to current thread when
+ // the exchange is continued routed asynchronously
+ clear();
}
/**
@@ -208,6 +213,16 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
}
@Override
+ public void start() {
+ // noop
+ }
+
+ @Override
+ public void stop() {
+ clear();
+ }
+
+ @Override
public String toString() {
return "MDCUnitOfWork";
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java
index acf37f0..9869156 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java
@@ -26,7 +26,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.support.AsyncProcessorSupport;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +56,6 @@ public class MDCClearingTest extends ContextTestSupport {
}
@Test
- @Disabled
public void shouldPropagateAndClearMdcInAsyncRoute() {
// given
MDC.remove(CAMEL_BREADCRUMB_ID);
@@ -70,7 +68,6 @@ public class MDCClearingTest extends ContextTestSupport {
}
@Test
- @Disabled
public void shouldPropagateAndClearMdcInMixedRoute() {
// given
MDC.remove(CAMEL_BREADCRUMB_ID);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java
similarity index 96%
copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java
index 0f8939d..8038b71 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java
@@ -21,6 +21,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.MDC;
@@ -30,7 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* Tests that MDC works as a stack remembering old values when using a producer template to send in new messages during
* routing.
*/
-public class MDCResetTest extends ContextTestSupport {
+@Disabled
+public class MDCResetMidRouteProducerTemplateTest extends ContextTestSupport {
@Test
public void testMDC() throws Exception {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
index 0f8939d..ee2cc2a 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
@@ -27,8 +27,7 @@ import org.slf4j.MDC;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
- * Tests that MDC works as a stack remembering old values when using a producer template to send in new messages during
- * routing.
+ * Tests that MDC works as a stack remembering old values when routing between routes.
*/
public class MDCResetTest extends ContextTestSupport {
@@ -68,14 +67,6 @@ public class MDCResetTest extends ContextTestSupport {
}
}).to("log:foo").to("direct:b").process(new Processor() {
public void process(Exchange exchange) throws Exception {
- String body = exchange.getIn().getBody(String.class);
- // use a producer template to send to b, instead of in
- // the route DSL
- body = template.requestBody("direct:b", body, String.class);
- exchange.getMessage().setBody(body);
- }
- }).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
assertEquals("route-a", MDC.get("camel.routeId"));
assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId"));
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index 47309a4..8dafdcc 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -22,10 +22,12 @@ import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
+import org.apache.camel.Service;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.SynchronizationRouteAware;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.spi.annotations.EagerClassloaded;
+import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,10 +59,14 @@ public final class UnitOfWorkHelper {
// unit of work is done
try {
uow.done(exchange);
+ if (uow instanceof Service) {
+ ServiceHelper.stopService(uow);
+ }
} catch (Throwable e) {
LOG.warn("Exception occurred during done UnitOfWork for Exchange: {}. This exception will be ignored.",
exchange, e);
}
+
}
public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {