You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/18 09:39:13 UTC
(camel) branch main updated: CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1d41b0b5fc0 CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465)
1d41b0b5fc0 is described below
commit 1d41b0b5fc03ac7bd285b3be63c06cf54ea297fc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Dec 18 10:39:06 2023 +0100
CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465)
* CAMEL-17721: camel-core - MDC custom keys should preserve existing value during routing, so users can alter its value.
CAMEL-20246: WireTap should not create correlated exchange copy
---
.../apache/camel/impl/engine/MDCUnitOfWork.java | 42 ++++++++++++-
.../apache/camel/processor/WireTapProcessor.java | 2 +
.../{MDCSplitTest.java => MDCCustomKeysTest.java} | 71 ++++++++++++++++------
.../org/apache/camel/processor/MDCSplitTest.java | 1 +
.../{MDCSplitTest.java => WireTapMDCTest.java} | 64 +++++++++++--------
.../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc | 10 +++
6 files changed, 147 insertions(+), 43 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index e7c6c748849..d2cdffc6cb4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -211,9 +211,42 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
}
}
+ /**
+ * Clear custom MDC values based on the configured MDC pattern
+ */
+ protected void clearCustom(Exchange exchange) {
+ // clear custom patterns
+ if (pattern != null) {
+
+ // only clear if the UoW is the parent UoW (split, multicast and other EIPs create child exchanges with their own UoW)
+ if (exchange != null) {
+ String cid = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ if (cid != null && !cid.equals(exchange.getExchangeId())) {
+ return;
+ }
+ }
+
+ Map<String, String> mdc = MDC.getCopyOfContextMap();
+ if (mdc != null) {
+ if ("*".equals(pattern)) {
+ MDC.clear();
+ } else {
+ final String[] patterns = pattern.split(",");
+ mdc.forEach((k, v) -> {
+ if (PatternHelper.matchPatterns(k, patterns)) {
+ MDC.remove(k);
+ }
+ });
+ }
+ }
+ }
+ }
+
@Override
public void done(Exchange exchange) {
super.done(exchange);
+ // clear custom first
+ clearCustom(exchange);
clear();
}
@@ -227,6 +260,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
@Override
public void reset() {
super.reset();
+ // clear custom first
+ clearCustom(null);
clear();
}
@@ -309,7 +344,12 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service {
MDC.put(MDC_CAMEL_CONTEXT_ID, camelContextId);
}
if (custom != null) {
- custom.forEach(MDC::put);
+ // keep existing custom value to not override
+ custom.forEach((k, v) -> {
+ if (MDC.get(k) == null) {
+ MDC.put(k, v);
+ }
+ });
}
}
// need to setup the routeId finally
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index c025f169672..755724ad85c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -250,6 +250,8 @@ public class WireTapProcessor extends AsyncProcessorSupport
private Exchange configureCopyExchange(Exchange exchange) {
// must use a copy as we dont want it to cause side effects of the original exchange
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
+ // should not be correlated, but we needed to copy without handover
+ copy.removeProperty(ExchangePropertyKey.CORRELATION_ID);
// set MEP to InOnly as this wire tap is a fire and forget
copy.setPattern(ExchangePattern.InOnly);
// move OUT to IN if needed
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java
similarity index 72%
copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java
index 897eafb1438..8ca7df20ef3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java
@@ -26,16 +26,39 @@ import org.slf4j.MDC;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class MDCSplitTest extends ContextTestSupport {
+public class MDCCustomKeysTest extends ContextTestSupport {
+
+ private MdcCheckerProcessor checker1 = new MdcCheckerProcessor("N/A");
+ private MdcCheckerProcessor checker2 = new MdcCheckerProcessor("World");
@Test
public void testMdcPreserved() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedBodiesReceived("A");
+
+ checker1.reset();
+ checker2.reset();
+ template.sendBody("direct:a", "A");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testMdcPreservedTwo() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:end");
- mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("A", "B");
+
+ checker1.reset();
+ checker2.reset();
+ template.sendBody("direct:a", "A");
- template.sendBody("direct:a", "A,B");
+ checker1.reset();
+ checker2.reset();
+ template.sendBody("direct:a", "B");
assertMockEndpointsSatisfied();
}
@@ -50,22 +73,25 @@ public class MDCSplitTest extends ContextTestSupport {
context.setUseBreadcrumb(true);
context.setMDCLoggingKeysPattern("custom*,my*");
- MdcCheckerProcessor checker = new MdcCheckerProcessor();
+ from("direct:a").process(e -> {
+
+ // custom should be empty
+ String hello = MDC.get("custom.hello");
+ assertNull(hello);
- from("direct:a").routeId("route-async").process(e -> {
// custom is propagated
- MDC.put("custom.hello", "World");
+ MDC.put("custom.hello", "N/A");
// foo is propagated due we use the same thread
MDC.put("foo", "Bar");
// myKey is propagated
MDC.put("myKey", "Baz");
- }).process(checker)
+ }).process(checker1)
.to("log:foo")
- .split(body().tokenize(","))
- .process(checker)
- .end()
+ .process(e -> {
+ MDC.put("custom.hello", "World");
+ })
+ .process(checker2)
.to("mock:end");
-
}
};
}
@@ -75,7 +101,6 @@ public class MDCSplitTest extends ContextTestSupport {
*/
private static class MdcCheckerProcessor implements Processor {
- private String routeId = "route-async";
private String exchangeId;
private String messageId;
private String breadcrumbId;
@@ -83,10 +108,25 @@ public class MDCSplitTest extends ContextTestSupport {
private Long threadId;
private String foo;
+ private final String expected;
+
+ public MdcCheckerProcessor(String expected) {
+ this.expected = expected;
+ }
+
+ public void reset() {
+ exchangeId = null;
+ messageId = null;
+ breadcrumbId = null;
+ contextId = null;
+ threadId = null;
+ foo = null;
+ }
+
@Override
public void process(Exchange exchange) throws Exception {
// custom is propagated as its pattern matches
- assertEquals("World", MDC.get("custom.hello"));
+ assertEquals(expected, MDC.get("custom.hello"));
assertEquals("Baz", MDC.get("myKey"));
if (foo != null) {
@@ -102,11 +142,6 @@ public class MDCSplitTest extends ContextTestSupport {
} else {
threadId = Thread.currentThread().getId();
}
-
- if (routeId != null) {
- assertEquals(routeId, MDC.get("camel.routeId"));
- }
-
if (exchangeId != null) {
assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
} else {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
index 897eafb1438..a78d59dd2b4 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
@@ -62,6 +62,7 @@ public class MDCSplitTest extends ContextTestSupport {
}).process(checker)
.to("log:foo")
.split(body().tokenize(","))
+ .to("log:line")
.process(checker)
.end()
.to("mock:end");
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java
similarity index 69%
copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java
index 897eafb1438..ccc020cee44 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java
@@ -25,17 +25,16 @@ import org.junit.jupiter.api.Test;
import org.slf4j.MDC;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class MDCSplitTest extends ContextTestSupport {
+public class WireTapMDCTest extends ContextTestSupport {
@Test
public void testMdcPreserved() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:end");
- mock.expectedMessageCount(1);
+ mock.expectedMessageCount(2);
- template.sendBody("direct:a", "A,B");
+ template.sendBody("seda:a", "A");
assertMockEndpointsSatisfied();
}
@@ -50,22 +49,30 @@ public class MDCSplitTest extends ContextTestSupport {
context.setUseBreadcrumb(true);
context.setMDCLoggingKeysPattern("custom*,my*");
- MdcCheckerProcessor checker = new MdcCheckerProcessor();
-
- from("direct:a").routeId("route-async").process(e -> {
- // custom is propagated
- MDC.put("custom.hello", "World");
- // foo is propagated due we use the same thread
- MDC.put("foo", "Bar");
- // myKey is propagated
- MDC.put("myKey", "Baz");
- }).process(checker)
- .to("log:foo")
- .split(body().tokenize(","))
+ MdcCheckerProcessor checker = new MdcCheckerProcessor("route-a", "World", "MyValue");
+ MdcCheckerProcessor checker2 = new MdcCheckerProcessor("route-b", "Moon", "MyValue2");
+
+ from("seda:a").routeId("route-a")
+ .process(e -> {
+ MDC.put("custom.hello", "World");
+ MDC.put("foo", "Bar");
+ MDC.put("myKey", "MyValue");
+ })
+ .process(checker)
+ .to("log:a")
+ .wireTap("direct:b")
.process(checker)
- .end()
.to("mock:end");
+ from("direct:b").routeId("route-b")
+ .process(e -> {
+ MDC.put("custom.hello", "Moon");
+ MDC.put("foo", "Bar2");
+ MDC.put("myKey", "MyValue2");
+ })
+ .process(checker2)
+ .to("log:b")
+ .to("mock:end");
}
};
}
@@ -75,7 +82,6 @@ public class MDCSplitTest extends ContextTestSupport {
*/
private static class MdcCheckerProcessor implements Processor {
- private String routeId = "route-async";
private String exchangeId;
private String messageId;
private String breadcrumbId;
@@ -83,11 +89,21 @@ public class MDCSplitTest extends ContextTestSupport {
private Long threadId;
private String foo;
+ private String expected1;
+ private String expected2;
+ private String expected3;
+
+ public MdcCheckerProcessor(String expected1, String expected2, String expected3) {
+ this.expected1 = expected1;
+ this.expected2 = expected2;
+ this.expected3 = expected3;
+ }
+
@Override
public void process(Exchange exchange) throws Exception {
// custom is propagated as its pattern matches
- assertEquals("World", MDC.get("custom.hello"));
- assertEquals("Baz", MDC.get("myKey"));
+ assertEquals(expected2, MDC.get("custom.hello"));
+ assertEquals(expected3, MDC.get("myKey"));
if (foo != null) {
// foo propagated because its the same thread
@@ -103,19 +119,19 @@ public class MDCSplitTest extends ContextTestSupport {
threadId = Thread.currentThread().getId();
}
- if (routeId != null) {
- assertEquals(routeId, MDC.get("camel.routeId"));
+ if (expected1 != null) {
+ assertEquals(expected1, MDC.get("camel.routeId"));
}
if (exchangeId != null) {
- assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+ assertEquals(exchangeId, MDC.get("camel.exchangeId"));
} else {
exchangeId = MDC.get("camel.exchangeId");
assertTrue(exchangeId != null && exchangeId.length() > 0);
}
if (messageId != null) {
- assertNotEquals(messageId, MDC.get("camel.messageId"));
+ assertEquals(messageId, MDC.get("camel.messageId"));
} else {
messageId = MDC.get("camel.messageId");
assertTrue(messageId != null && messageId.length() > 0);
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
index bfc0737228a..be28d246969 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
@@ -15,6 +15,16 @@ The method `getCreated` is now deprecated. Access to the time-related informatio
The `lookup` method in `org.apache.camel.component.properties.PropertiesLookup` now has a 2nd parameter for the default value.
+==== WireTap EIP
+
+The copied exchange is no longer having exchange property CORRELATION_ID set that links to the original exchange.
+The reason is that this link should only be for EIPs with sub exchanges such as Splitter and Multicast.
+
+==== MDC logging
+
+When using custom MDC keys (need to configure `MDCLoggingKeysPattern`) then these custom keys are cleared at the end of routing.
+Also, custom keys is allowed to be changed during routing, using the `MDC.set(myKey, ...)` Java API.
+
=== camel-main
The route controller configuration has been moved from general main to its own group.