You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/22 13:53:07 UTC
[camel] branch master updated: CAMEL-16378: Camel MDC - Not
propagating custom keys when using Split with Parallel Processing
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new a382da5 CAMEL-16378: Camel MDC - Not propagating custom keys when using Split with Parallel Processing
a382da5 is described below
commit a382da52dd308947e9fd465fe827767c2a655d85
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 22 14:50:22 2021 +0100
CAMEL-16378: Camel MDC - Not propagating custom keys when using Split with Parallel Processing
---
.../apache/camel/impl/engine/MDCUnitOfWork.java | 11 +-
.../apache/camel/processor/MulticastProcessor.java | 56 +++++++-
.../processor/MDCSplitParallelProcessingTest.java | 143 +++++++++++++++++++++
.../org/apache/camel/processor/MDCSplitTest.java | 140 ++++++++++++++++++++
.../org/apache/camel/support/PatternHelper.java | 17 +++
5 files changed, 356 insertions(+), 11 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index 8075040..a8e1683 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -206,15 +206,6 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
return "MDCUnitOfWork";
}
- private static boolean matchPatterns(String value, String[] patterns) {
- for (String pattern : patterns) {
- if (PatternHelper.matchPattern(value, pattern)) {
- return true;
- }
- }
- return false;
- }
-
/**
* {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when the asynchronous routing engine is being used.
*/
@@ -247,7 +238,7 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
} else {
final String[] patterns = pattern.split(",");
mdc.forEach((k, v) -> {
- if (matchPatterns(k, patterns)) {
+ if (PatternHelper.matchPatterns(k, patterns)) {
custom.put(k, v);
}
});
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 265bf2f..71d00ab 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -64,6 +64,7 @@ import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.IOHelper;
@@ -72,6 +73,7 @@ import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AsyncCompletionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -310,12 +312,44 @@ public class MulticastProcessor extends AsyncProcessorSupport
protected void schedule(Runnable runnable) {
if (isParallelProcessing()) {
- executorService.submit(() -> reactiveExecutor.schedule(runnable));
+ Runnable task = prepareParallelTask(runnable);
+ executorService.submit(() -> reactiveExecutor.schedule(task));
} else {
reactiveExecutor.schedule(runnable);
}
}
+ private Runnable prepareParallelTask(Runnable runnable) {
+ Runnable answer = runnable;
+
+ // if MDC is enabled we need to propagate the information
+ // to the sub task which is executed on another thread from the thread pool
+ if (camelContext.isUseMDCLogging()) {
+ String pattern = camelContext.getMDCLoggingKeysPattern();
+ Map<String, String> mdc = MDC.getCopyOfContextMap();
+ if (mdc != null && !mdc.isEmpty()) {
+ answer = () -> {
+ try {
+ if (pattern == null || "*".equals(pattern)) {
+ mdc.forEach(MDC::put);
+ } else {
+ final String[] patterns = pattern.split(",");
+ mdc.forEach((k, v) -> {
+ if (PatternHelper.matchPatterns(k, patterns)) {
+ MDC.put(k, v);
+ }
+ });
+ }
+ } finally {
+ runnable.run();
+ }
+ };
+ }
+ }
+
+ return answer;
+ }
+
protected abstract class MulticastTask implements Runnable {
final Exchange original;
@@ -329,6 +363,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
final AtomicInteger nbAggregated = new AtomicInteger();
final AtomicBoolean allSent = new AtomicBoolean();
final AtomicBoolean done = new AtomicBoolean();
+ final Map<String, String> mdc;
MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
this.original = original;
@@ -341,6 +376,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
if (timeout > 0) {
schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
}
+ // if MDC is enabled we must make a copy in this constructor when the task
+ // is created by the caller thread, and then propagate back when run is called
+ // which can happen from another thread
+ if (isParallelProcessing() && original.getContext().isUseMDCLogging()) {
+ this.mdc = MDC.getCopyOfContextMap();
+ } else {
+ this.mdc = null;
+ }
}
@Override
@@ -348,6 +391,13 @@ public class MulticastProcessor extends AsyncProcessorSupport
return "MulticastTask";
}
+ @Override
+ public void run() {
+ if (this.mdc != null) {
+ this.mdc.forEach(MDC::put);
+ }
+ }
+
protected void aggregate() {
Lock lock = this.lock;
if (lock.tryLock()) {
@@ -415,6 +465,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
@Override
public void run() {
+ super.run();
+
try {
if (done.get()) {
return;
@@ -509,6 +561,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
@Override
public void run() {
+ super.run();
+
boolean next = true;
while (next) {
try {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitParallelProcessingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitParallelProcessingTest.java
new file mode 100644
index 0000000..8a988ac
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitParallelProcessingTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCSplitParallelProcessingTest extends ContextTestSupport {
+
+ @Test
+ public void testMdcPreservedAfterAsyncEndpoint() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:a", "A,B");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // enable MDC and breadcrumb
+ context.setUseMDCLogging(true);
+ context.setUseBreadcrumb(true);
+ context.setMDCLoggingKeysPattern("custom*,my*");
+
+ MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+ from("direct:a").routeId("route-async")
+ .process(e -> {
+ // custom is propagated
+ MDC.put("custom.hello", "World");
+ // foo is not propagated
+ MDC.put("foo", "Bar");
+ // myKey is propagated
+ MDC.put("myKey", "Baz");
+ })
+ .process(checker)
+ .to("log:foo")
+ .split(body().tokenize(",")).parallelProcessing()
+ .process(checker)
+ .end()
+ .to("mock:end");
+
+ }
+ };
+ }
+
+ /**
+ * Stores values from the first invocation to compare them with the second invocation later.
+ */
+ private static class MdcCheckerProcessor implements Processor {
+
+ private String routeId = "route-async";
+ private String exchangeId;
+ private String messageId;
+ private String breadcrumbId;
+ private String contextId;
+ private Long threadId;
+ private String foo;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // custom is propagated as its pattern matches
+ assertEquals("World", MDC.get("custom.hello"));
+ assertEquals("Baz", MDC.get("myKey"));
+
+ if (foo != null) {
+ // foo is not propagated
+ assertNotEquals(foo, MDC.get("foo"));
+ } else {
+ foo = MDC.get("foo");
+ }
+
+ if (threadId != null) {
+ Long currId = Thread.currentThread().getId();
+ assertNotEquals(threadId, (Object) currId);
+ } else {
+ threadId = Thread.currentThread().getId();
+ }
+
+ if (routeId != null) {
+ assertEquals(routeId, MDC.get("camel.routeId"));
+ }
+
+ if (exchangeId != null) {
+ assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+ } else {
+ exchangeId = MDC.get("camel.exchangeId");
+ assertTrue(exchangeId != null && exchangeId.length() > 0);
+ }
+
+ if (messageId != null) {
+ assertNotEquals(messageId, MDC.get("camel.messageId"));
+ } else {
+ messageId = MDC.get("camel.messageId");
+ assertTrue(messageId != null && messageId.length() > 0);
+ }
+
+ if (breadcrumbId != null) {
+ assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+ } else {
+ breadcrumbId = MDC.get("camel.breadcrumbId");
+ assertTrue(breadcrumbId != null && breadcrumbId.length() > 0);
+ }
+
+ if (contextId != null) {
+ assertEquals(contextId, MDC.get("camel.contextId"));
+ } else {
+ contextId = MDC.get("camel.contextId");
+ assertTrue(contextId != null && contextId.length() > 0);
+ }
+
+ }
+ }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
new file mode 100644
index 0000000..e325a73
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCSplitTest extends ContextTestSupport {
+
+ @Test
+ public void testMdcPreserved() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:a", "A,B");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // enable MDC and breadcrumb
+ context.setUseMDCLogging(true);
+ context.setUseBreadcrumb(true);
+ context.setMDCLoggingKeysPattern("custom*,my*");
+
+ MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+ from("direct:a").routeId("route-async").process(e -> {
+ // custom is propagated
+ MDC.put("custom.hello", "World");
+ // foo is propagated due we use the same thread
+ MDC.put("foo", "Bar");
+ // myKey is propagated
+ MDC.put("myKey", "Baz");
+ }).process(checker)
+ .to("log:foo")
+ .split(body().tokenize(","))
+ .process(checker)
+ .end()
+ .to("mock:end");
+
+ }
+ };
+ }
+
+ /**
+ * Stores values from the first invocation to compare them with the second invocation later.
+ */
+ private static class MdcCheckerProcessor implements Processor {
+
+ private String routeId = "route-async";
+ private String exchangeId;
+ private String messageId;
+ private String breadcrumbId;
+ private String contextId;
+ private Long threadId;
+ private String foo;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // custom is propagated as its pattern matches
+ assertEquals("World", MDC.get("custom.hello"));
+ assertEquals("Baz", MDC.get("myKey"));
+
+ if (foo != null) {
+ // foo propagated because its the same thread
+ assertEquals(foo, MDC.get("foo"));
+ } else {
+ foo = MDC.get("foo");
+ }
+
+ if (threadId != null) {
+ Long currId = Thread.currentThread().getId();
+ assertEquals(threadId, (Object) currId);
+ } else {
+ threadId = Thread.currentThread().getId();
+ }
+
+ if (routeId != null) {
+ assertEquals(routeId, MDC.get("camel.routeId"));
+ }
+
+ if (exchangeId != null) {
+ assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+ } else {
+ exchangeId = MDC.get("camel.exchangeId");
+ assertTrue(exchangeId != null && exchangeId.length() > 0);
+ }
+
+ if (messageId != null) {
+ assertNotEquals(messageId, MDC.get("camel.messageId"));
+ } else {
+ messageId = MDC.get("camel.messageId");
+ assertTrue(messageId != null && messageId.length() > 0);
+ }
+
+ if (breadcrumbId != null) {
+ assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+ } else {
+ breadcrumbId = MDC.get("camel.breadcrumbId");
+ assertTrue(breadcrumbId != null && breadcrumbId.length() > 0);
+ }
+
+ if (contextId != null) {
+ assertEquals(contextId, MDC.get("camel.contextId"));
+ } else {
+ contextId = MDC.get("camel.contextId");
+ assertTrue(contextId != null && contextId.length() > 0);
+ }
+ }
+ }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java
index 942bedd..e490f7e 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java
@@ -65,6 +65,23 @@ public final class PatternHelper {
}
/**
+ * Matches the name with the given patterns (case insensitive).
+ *
+ * @param name the name
+ * @param patterns pattern(s) to match
+ * @return <tt>true</tt> if match, <tt>false</tt> otherwise.
+ * @see #matchPattern(String, String)
+ */
+ public static boolean matchPatterns(String name, String[] patterns) {
+ for (String pattern : patterns) {
+ if (PatternHelper.matchPattern(name, pattern)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Matches the name with the given pattern (case insensitive).
* <p/>
* The match rules are applied in this order: