You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/18 09:39:13 UTC

(camel) branch main updated: CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d41b0b5fc0 CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465)
1d41b0b5fc0 is described below

commit 1d41b0b5fc03ac7bd285b3be63c06cf54ea297fc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Dec 18 10:39:06 2023 +0100

    CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465)
    
    * CAMEL-17721: camel-core - MDC custom keys should preserve existing value during routing, so users can alter its value.
    
    CAMEL-20246: WireTap should not create correlated exchange copy
---
 .../apache/camel/impl/engine/MDCUnitOfWork.java    | 42 ++++++++++++-
 .../apache/camel/processor/WireTapProcessor.java   |  2 +
 .../{MDCSplitTest.java => MDCCustomKeysTest.java}  | 71 ++++++++++++++++------
 .../org/apache/camel/processor/MDCSplitTest.java   |  1 +
 .../{MDCSplitTest.java => WireTapMDCTest.java}     | 64 +++++++++++--------
 .../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc     | 10 +++
 6 files changed, 147 insertions(+), 43 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index e7c6c748849..d2cdffc6cb4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -211,9 +211,42 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
         }
     }
 
+    /**
+     * Clear custom MDC values based on the configured MDC pattern
+     */
+    protected void clearCustom(Exchange exchange) {
+        // clear custom patterns
+        if (pattern != null) {
+
+            // only clear if the UoW is the parent UoW (split, multicast and other EIPs create child exchanges with their own UoW)
+            if (exchange != null) {
+                String cid = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                if (cid != null && !cid.equals(exchange.getExchangeId())) {
+                    return;
+                }
+            }
+
+            Map<String, String> mdc = MDC.getCopyOfContextMap();
+            if (mdc != null) {
+                if ("*".equals(pattern)) {
+                    MDC.clear();
+                } else {
+                    final String[] patterns = pattern.split(",");
+                    mdc.forEach((k, v) -> {
+                        if (PatternHelper.matchPatterns(k, patterns)) {
+                            MDC.remove(k);
+                        }
+                    });
+                }
+            }
+        }
+    }
+
     @Override
     public void done(Exchange exchange) {
         super.done(exchange);
+        // clear custom first
+        clearCustom(exchange);
         clear();
     }
 
@@ -227,6 +260,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
     @Override
     public void reset() {
         super.reset();
+        // clear custom first
+        clearCustom(null);
         clear();
     }
 
@@ -309,7 +344,12 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
                         MDC.put(MDC_CAMEL_CONTEXT_ID, camelContextId);
                     }
                     if (custom != null) {
-                        custom.forEach(MDC::put);
+                        // keep existing custom value to not override
+                        custom.forEach((k, v) -> {
+                            if (MDC.get(k) == null) {
+                                MDC.put(k, v);
+                            }
+                        });
                     }
                 }
                 // need to setup the routeId finally
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index c025f169672..755724ad85c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -250,6 +250,8 @@ public class WireTapProcessor extends AsyncProcessorSupport
     private Exchange configureCopyExchange(Exchange exchange) {
         // must use a copy as we dont want it to cause side effects of the original exchange
         Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+        // should not be correlated, but we needed to copy without handover
+        copy.removeProperty(ExchangePropertyKey.CORRELATION_ID);
         // set MEP to InOnly as this wire tap is a fire and forget
         copy.setPattern(ExchangePattern.InOnly);
         // move OUT to IN if needed
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java
similarity index 72%
copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java
index 897eafb1438..8ca7df20ef3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java
@@ -26,16 +26,39 @@ import org.slf4j.MDC;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class MDCSplitTest extends ContextTestSupport {
+public class MDCCustomKeysTest extends ContextTestSupport {
+
+    private MdcCheckerProcessor checker1 = new MdcCheckerProcessor("N/A");
+    private MdcCheckerProcessor checker2 = new MdcCheckerProcessor("World");
 
     @Test
     public void testMdcPreserved() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedBodiesReceived("A");
+
+        checker1.reset();
+        checker2.reset();
+        template.sendBody("direct:a", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testMdcPreservedTwo() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:end");
-        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("A", "B");
+
+        checker1.reset();
+        checker2.reset();
+        template.sendBody("direct:a", "A");
 
-        template.sendBody("direct:a", "A,B");
+        checker1.reset();
+        checker2.reset();
+        template.sendBody("direct:a", "B");
 
         assertMockEndpointsSatisfied();
     }
@@ -50,22 +73,25 @@ public class MDCSplitTest extends ContextTestSupport {
                 context.setUseBreadcrumb(true);
                 context.setMDCLoggingKeysPattern("custom*,my*");
 
-                MdcCheckerProcessor checker = new MdcCheckerProcessor();
+                from("direct:a").process(e -> {
+
+                    // custom should be empty
+                    String hello = MDC.get("custom.hello");
+                    assertNull(hello);
 
-                from("direct:a").routeId("route-async").process(e -> {
                     // custom is propagated
-                    MDC.put("custom.hello", "World");
+                    MDC.put("custom.hello", "N/A");
                     // foo is propagated due we use the same thread
                     MDC.put("foo", "Bar");
                     // myKey is propagated
                     MDC.put("myKey", "Baz");
-                }).process(checker)
+                }).process(checker1)
                         .to("log:foo")
-                        .split(body().tokenize(","))
-                        .process(checker)
-                        .end()
+                        .process(e -> {
+                            MDC.put("custom.hello", "World");
+                        })
+                        .process(checker2)
                         .to("mock:end");
-
             }
         };
     }
@@ -75,7 +101,6 @@ public class MDCSplitTest extends ContextTestSupport {
      */
     private static class MdcCheckerProcessor implements Processor {
 
-        private String routeId = "route-async";
         private String exchangeId;
         private String messageId;
         private String breadcrumbId;
@@ -83,10 +108,25 @@ public class MDCSplitTest extends ContextTestSupport {
         private Long threadId;
         private String foo;
 
+        private final String expected;
+
+        public MdcCheckerProcessor(String expected) {
+            this.expected = expected;
+        }
+
+        public void reset() {
+            exchangeId = null;
+            messageId = null;
+            breadcrumbId = null;
+            contextId = null;
+            threadId = null;
+            foo = null;
+        }
+
         @Override
         public void process(Exchange exchange) throws Exception {
             // custom is propagated as its pattern matches
-            assertEquals("World", MDC.get("custom.hello"));
+            assertEquals(expected, MDC.get("custom.hello"));
             assertEquals("Baz", MDC.get("myKey"));
 
             if (foo != null) {
@@ -102,11 +142,6 @@ public class MDCSplitTest extends ContextTestSupport {
             } else {
                 threadId = Thread.currentThread().getId();
             }
-
-            if (routeId != null) {
-                assertEquals(routeId, MDC.get("camel.routeId"));
-            }
-
             if (exchangeId != null) {
                 assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
             } else {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
index 897eafb1438..a78d59dd2b4 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
@@ -62,6 +62,7 @@ public class MDCSplitTest extends ContextTestSupport {
                 }).process(checker)
                         .to("log:foo")
                         .split(body().tokenize(","))
+                        .to("log:line")
                         .process(checker)
                         .end()
                         .to("mock:end");
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java
similarity index 69%
copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java
index 897eafb1438..ccc020cee44 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java
@@ -25,17 +25,16 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.MDC;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class MDCSplitTest extends ContextTestSupport {
+public class WireTapMDCTest extends ContextTestSupport {
 
     @Test
     public void testMdcPreserved() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:end");
-        mock.expectedMessageCount(1);
+        mock.expectedMessageCount(2);
 
-        template.sendBody("direct:a", "A,B");
+        template.sendBody("seda:a", "A");
 
         assertMockEndpointsSatisfied();
     }
@@ -50,22 +49,30 @@ public class MDCSplitTest extends ContextTestSupport {
                 context.setUseBreadcrumb(true);
                 context.setMDCLoggingKeysPattern("custom*,my*");
 
-                MdcCheckerProcessor checker = new MdcCheckerProcessor();
-
-                from("direct:a").routeId("route-async").process(e -> {
-                    // custom is propagated
-                    MDC.put("custom.hello", "World");
-                    // foo is propagated due we use the same thread
-                    MDC.put("foo", "Bar");
-                    // myKey is propagated
-                    MDC.put("myKey", "Baz");
-                }).process(checker)
-                        .to("log:foo")
-                        .split(body().tokenize(","))
+                MdcCheckerProcessor checker = new MdcCheckerProcessor("route-a", "World", "MyValue");
+                MdcCheckerProcessor checker2 = new MdcCheckerProcessor("route-b", "Moon", "MyValue2");
+
+                from("seda:a").routeId("route-a")
+                        .process(e -> {
+                            MDC.put("custom.hello", "World");
+                            MDC.put("foo", "Bar");
+                            MDC.put("myKey", "MyValue");
+                        })
+                        .process(checker)
+                        .to("log:a")
+                        .wireTap("direct:b")
                         .process(checker)
-                        .end()
                         .to("mock:end");
 
+                from("direct:b").routeId("route-b")
+                        .process(e -> {
+                            MDC.put("custom.hello", "Moon");
+                            MDC.put("foo", "Bar2");
+                            MDC.put("myKey", "MyValue2");
+                        })
+                        .process(checker2)
+                        .to("log:b")
+                        .to("mock:end");
             }
         };
     }
@@ -75,7 +82,6 @@ public class MDCSplitTest extends ContextTestSupport {
      */
     private static class MdcCheckerProcessor implements Processor {
 
-        private String routeId = "route-async";
         private String exchangeId;
         private String messageId;
         private String breadcrumbId;
@@ -83,11 +89,21 @@ public class MDCSplitTest extends ContextTestSupport {
         private Long threadId;
         private String foo;
 
+        private String expected1;
+        private String expected2;
+        private String expected3;
+
+        public MdcCheckerProcessor(String expected1, String expected2, String expected3) {
+            this.expected1 = expected1;
+            this.expected2 = expected2;
+            this.expected3 = expected3;
+        }
+
         @Override
         public void process(Exchange exchange) throws Exception {
             // custom is propagated as its pattern matches
-            assertEquals("World", MDC.get("custom.hello"));
-            assertEquals("Baz", MDC.get("myKey"));
+            assertEquals(expected2, MDC.get("custom.hello"));
+            assertEquals(expected3, MDC.get("myKey"));
 
             if (foo != null) {
                 // foo propagated because its the same thread
@@ -103,19 +119,19 @@ public class MDCSplitTest extends ContextTestSupport {
                 threadId = Thread.currentThread().getId();
             }
 
-            if (routeId != null) {
-                assertEquals(routeId, MDC.get("camel.routeId"));
+            if (expected1 != null) {
+                assertEquals(expected1, MDC.get("camel.routeId"));
             }
 
             if (exchangeId != null) {
-                assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+                assertEquals(exchangeId, MDC.get("camel.exchangeId"));
             } else {
                 exchangeId = MDC.get("camel.exchangeId");
                 assertTrue(exchangeId != null && exchangeId.length() > 0);
             }
 
             if (messageId != null) {
-                assertNotEquals(messageId, MDC.get("camel.messageId"));
+                assertEquals(messageId, MDC.get("camel.messageId"));
             } else {
                 messageId = MDC.get("camel.messageId");
                 assertTrue(messageId != null && messageId.length() > 0);
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
index bfc0737228a..be28d246969 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
@@ -15,6 +15,16 @@ The method `getCreated` is now deprecated. Access to the time-related informatio
 
 The `lookup` method in `org.apache.camel.component.properties.PropertiesLookup` now has a 2nd parameter for the default value.
 
+==== WireTap EIP
+
+The copied exchange is no longer having exchange property CORRELATION_ID set that links to the original exchange.
+The reason is that this link should only be for EIPs with sub exchanges such as Splitter and Multicast.
+
+==== MDC logging
+
+When using custom MDC keys (need to configure `MDCLoggingKeysPattern`) then these custom keys are cleared at the end of routing.
+Also, custom keys is allowed to be changed during routing, using the `MDC.set(myKey, ...)` Java API.
+
 === camel-main
 
 The route controller configuration has been moved from general main to its own group.