You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/11/01 08:36:42 UTC

[camel] branch main updated: CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b33bea6  CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering.
b33bea6 is described below

commit b33bea6cccbc797cf874884247fac69bf8ef33ae
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Nov 1 09:35:13 2021 +0100

    CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering.
---
 .../camel/impl/engine/CamelInternalProcessor.java       | 10 ++++++----
 .../org/apache/camel/impl/engine/MDCUnitOfWork.java     | 17 ++++++++++++++++-
 .../org/apache/camel/processor/MDCClearingTest.java     |  3 ---
 ...t.java => MDCResetMidRouteProducerTemplateTest.java} |  4 +++-
 .../java/org/apache/camel/processor/MDCResetTest.java   | 11 +----------
 .../java/org/apache/camel/support/UnitOfWorkHelper.java |  6 ++++++
 6 files changed, 32 insertions(+), 19 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 25931fc..97ba072 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -382,8 +382,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         } else {
             final UnitOfWork uow = exchange.getUnitOfWork();
 
-            // do uow before processing and if a value is returned the the uow wants to be processed after
-            // was well in the same thread
+            // optimize to only do before uow processing if really needed
             AsyncCallback async = afterTask;
             boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess();
             if (beforeAndAfter) {
@@ -403,8 +402,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
 
             // optimize to only do after uow processing if really needed
             if (beforeAndAfter) {
-                // execute any after processor work (in current thread, not in the callback)
-                uow.afterProcess(processor, exchange, afterTask, sync);
+                // use the same callback as with beforeProcess
+                final CamelInternalTask afterCallback = afterTask;
+                reactiveExecutor.schedule(() -> {
+                    uow.afterProcess(processor, exchange, afterCallback, sync);
+                });
             }
 
             if (LOG.isTraceEnabled()) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index db7d5d1..dd76a31 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -24,6 +24,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
+import org.apache.camel.Service;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.PatternHelper;
@@ -34,7 +35,7 @@ import org.slf4j.MDC;
 /**
  * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>.
  */
-public class MDCUnitOfWork extends DefaultUnitOfWork {
+public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
 
     private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class);
 
@@ -147,6 +148,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
         if (stepId == null) {
             MDC.remove(MDC_STEP_ID);
         }
+
+        // clear to avoid leaking to current thread when
+        // the exchange is continued routed asynchronously
+        clear();
     }
 
     /**
@@ -208,6 +213,16 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
     }
 
     @Override
+    public void start() {
+        // noop
+    }
+
+    @Override
+    public void stop() {
+        clear();
+    }
+
+    @Override
     public String toString() {
         return "MDCUnitOfWork";
     }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java
index acf37f0..9869156 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java
@@ -26,7 +26,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.support.AsyncProcessorSupport;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +56,6 @@ public class MDCClearingTest extends ContextTestSupport {
     }
 
     @Test
-    @Disabled
     public void shouldPropagateAndClearMdcInAsyncRoute() {
         // given
         MDC.remove(CAMEL_BREADCRUMB_ID);
@@ -70,7 +68,6 @@ public class MDCClearingTest extends ContextTestSupport {
     }
 
     @Test
-    @Disabled
     public void shouldPropagateAndClearMdcInMixedRoute() {
         // given
         MDC.remove(CAMEL_BREADCRUMB_ID);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java
similarity index 96%
copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java
index 0f8939d..8038b71 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java
@@ -21,6 +21,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.slf4j.MDC;
 
@@ -30,7 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * Tests that MDC works as a stack remembering old values when using a producer template to send in new messages during
  * routing.
  */
-public class MDCResetTest extends ContextTestSupport {
+@Disabled
+public class MDCResetMidRouteProducerTemplateTest extends ContextTestSupport {
 
     @Test
     public void testMDC() throws Exception {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
index 0f8939d..ee2cc2a 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java
@@ -27,8 +27,7 @@ import org.slf4j.MDC;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
- * Tests that MDC works as a stack remembering old values when using a producer template to send in new messages during
- * routing.
+ * Tests that MDC works as a stack remembering old values when routing between routes.
  */
 public class MDCResetTest extends ContextTestSupport {
 
@@ -68,14 +67,6 @@ public class MDCResetTest extends ContextTestSupport {
                     }
                 }).to("log:foo").to("direct:b").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        String body = exchange.getIn().getBody(String.class);
-                        // use a producer template to send to b, instead of in
-                        // the route DSL
-                        body = template.requestBody("direct:b", body, String.class);
-                        exchange.getMessage().setBody(body);
-                    }
-                }).process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
                         assertEquals("route-a", MDC.get("camel.routeId"));
                         assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId"));
                     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index 47309a4..8dafdcc 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -22,10 +22,12 @@ import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
+import org.apache.camel.Service;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationRouteAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.spi.annotations.EagerClassloaded;
+import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,10 +59,14 @@ public final class UnitOfWorkHelper {
         // unit of work is done
         try {
             uow.done(exchange);
+            if (uow instanceof Service) {
+                ServiceHelper.stopService(uow);
+            }
         } catch (Throwable e) {
             LOG.warn("Exception occurred during done UnitOfWork for Exchange: {}. This exception will be ignored.",
                     exchange, e);
         }
+
     }
 
     public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {