You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/22 13:53:07 UTC

[camel] branch master updated: CAMEL-16378: Camel MDC - Not propagating custom keys when using Split with Parallel Processing

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a382da5  CAMEL-16378: Camel MDC - Not propagating custom keys when using Split with Parallel Processing
a382da5 is described below

commit a382da52dd308947e9fd465fe827767c2a655d85
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 22 14:50:22 2021 +0100

    CAMEL-16378: Camel MDC - Not propagating custom keys when using Split with Parallel Processing
---
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |  11 +-
 .../apache/camel/processor/MulticastProcessor.java |  56 +++++++-
 .../processor/MDCSplitParallelProcessingTest.java  | 143 +++++++++++++++++++++
 .../org/apache/camel/processor/MDCSplitTest.java   | 140 ++++++++++++++++++++
 .../org/apache/camel/support/PatternHelper.java    |  17 +++
 5 files changed, 356 insertions(+), 11 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index 8075040..a8e1683 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -206,15 +206,6 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
         return "MDCUnitOfWork";
     }
 
-    private static boolean matchPatterns(String value, String[] patterns) {
-        for (String pattern : patterns) {
-            if (PatternHelper.matchPattern(value, pattern)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     /**
      * {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when the asynchronous routing engine is being used.
      */
@@ -247,7 +238,7 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
                     } else {
                         final String[] patterns = pattern.split(",");
                         mdc.forEach((k, v) -> {
-                            if (matchPatterns(k, patterns)) {
+                            if (PatternHelper.matchPatterns(k, patterns)) {
                                 custom.put(k, v);
                             }
                         });
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 265bf2f..71d00ab 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -64,6 +64,7 @@ import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
@@ -72,6 +73,7 @@ import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -310,12 +312,44 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     protected void schedule(Runnable runnable) {
         if (isParallelProcessing()) {
-            executorService.submit(() -> reactiveExecutor.schedule(runnable));
+            Runnable task = prepareParallelTask(runnable);
+            executorService.submit(() -> reactiveExecutor.schedule(task));
         } else {
             reactiveExecutor.schedule(runnable);
         }
     }
 
+    private Runnable prepareParallelTask(Runnable runnable) {
+        Runnable answer = runnable;
+
+        // if MDC is enabled we need to propagate the information
+        // to the sub task which is executed on another thread from the thread pool
+        if (camelContext.isUseMDCLogging()) {
+            String pattern = camelContext.getMDCLoggingKeysPattern();
+            Map<String, String> mdc = MDC.getCopyOfContextMap();
+            if (mdc != null && !mdc.isEmpty()) {
+                answer = () -> {
+                    try {
+                        if (pattern == null || "*".equals(pattern)) {
+                            mdc.forEach(MDC::put);
+                        } else {
+                            final String[] patterns = pattern.split(",");
+                            mdc.forEach((k, v) -> {
+                                if (PatternHelper.matchPatterns(k, patterns)) {
+                                    MDC.put(k, v);
+                                }
+                            });
+                        }
+                    } finally {
+                        runnable.run();
+                    }
+                };
+            }
+        }
+
+        return answer;
+    }
+
     protected abstract class MulticastTask implements Runnable {
 
         final Exchange original;
@@ -329,6 +363,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         final AtomicInteger nbAggregated = new AtomicInteger();
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
+        final Map<String, String> mdc;
 
         MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             this.original = original;
@@ -341,6 +376,14 @@ public class MulticastProcessor extends AsyncProcessorSupport
             if (timeout > 0) {
                 schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
             }
+            // if MDC is enabled we must make a copy in this constructor when the task
+            // is created by the caller thread, and then propagate back when run is called
+            // which can happen from another thread
+            if (isParallelProcessing() && original.getContext().isUseMDCLogging()) {
+                this.mdc = MDC.getCopyOfContextMap();
+            } else {
+                this.mdc = null;
+            }
         }
 
         @Override
@@ -348,6 +391,13 @@ public class MulticastProcessor extends AsyncProcessorSupport
             return "MulticastTask";
         }
 
+        @Override
+        public void run() {
+            if (this.mdc != null) {
+                this.mdc.forEach(MDC::put);
+            }
+        }
+
         protected void aggregate() {
             Lock lock = this.lock;
             if (lock.tryLock()) {
@@ -415,6 +465,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
         @Override
         public void run() {
+            super.run();
+
             try {
                 if (done.get()) {
                     return;
@@ -509,6 +561,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
         @Override
         public void run() {
+            super.run();
+
             boolean next = true;
             while (next) {
                 try {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitParallelProcessingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitParallelProcessingTest.java
new file mode 100644
index 0000000..8a988ac
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitParallelProcessingTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCSplitParallelProcessingTest extends ContextTestSupport {
+
+    @Test
+    public void testMdcPreservedAfterAsyncEndpoint() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "A,B");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable MDC and breadcrumb
+                context.setUseMDCLogging(true);
+                context.setUseBreadcrumb(true);
+                context.setMDCLoggingKeysPattern("custom*,my*");
+
+                MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+                from("direct:a").routeId("route-async")
+                        .process(e -> {
+                            // custom is propagated
+                            MDC.put("custom.hello", "World");
+                            // foo is not propagated
+                            MDC.put("foo", "Bar");
+                            // myKey is propagated
+                            MDC.put("myKey", "Baz");
+                        })
+                        .process(checker)
+                        .to("log:foo")
+                        .split(body().tokenize(",")).parallelProcessing()
+                        .process(checker)
+                        .end()
+                        .to("mock:end");
+
+            }
+        };
+    }
+
+    /**
+     * Stores values from the first invocation to compare them with the second invocation later.
+     */
+    private static class MdcCheckerProcessor implements Processor {
+
+        private String routeId = "route-async";
+        private String exchangeId;
+        private String messageId;
+        private String breadcrumbId;
+        private String contextId;
+        private Long threadId;
+        private String foo;
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // custom is propagated as its pattern matches
+            assertEquals("World", MDC.get("custom.hello"));
+            assertEquals("Baz", MDC.get("myKey"));
+
+            if (foo != null) {
+                // foo is not propagated
+                assertNotEquals(foo, MDC.get("foo"));
+            } else {
+                foo = MDC.get("foo");
+            }
+
+            if (threadId != null) {
+                Long currId = Thread.currentThread().getId();
+                assertNotEquals(threadId, (Object) currId);
+            } else {
+                threadId = Thread.currentThread().getId();
+            }
+
+            if (routeId != null) {
+                assertEquals(routeId, MDC.get("camel.routeId"));
+            }
+
+            if (exchangeId != null) {
+                assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+            } else {
+                exchangeId = MDC.get("camel.exchangeId");
+                assertTrue(exchangeId != null && exchangeId.length() > 0);
+            }
+
+            if (messageId != null) {
+                assertNotEquals(messageId, MDC.get("camel.messageId"));
+            } else {
+                messageId = MDC.get("camel.messageId");
+                assertTrue(messageId != null && messageId.length() > 0);
+            }
+
+            if (breadcrumbId != null) {
+                assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+            } else {
+                breadcrumbId = MDC.get("camel.breadcrumbId");
+                assertTrue(breadcrumbId != null && breadcrumbId.length() > 0);
+            }
+
+            if (contextId != null) {
+                assertEquals(contextId, MDC.get("camel.contextId"));
+            } else {
+                contextId = MDC.get("camel.contextId");
+                assertTrue(contextId != null && contextId.length() > 0);
+            }
+
+        }
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
new file mode 100644
index 0000000..e325a73
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCSplitTest extends ContextTestSupport {
+
+    @Test
+    public void testMdcPreserved() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "A,B");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable MDC and breadcrumb
+                context.setUseMDCLogging(true);
+                context.setUseBreadcrumb(true);
+                context.setMDCLoggingKeysPattern("custom*,my*");
+
+                MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+                from("direct:a").routeId("route-async").process(e -> {
+                    // custom is propagated
+                    MDC.put("custom.hello", "World");
+                    // foo is propagated due we use the same thread
+                    MDC.put("foo", "Bar");
+                    // myKey is propagated
+                    MDC.put("myKey", "Baz");
+                }).process(checker)
+                        .to("log:foo")
+                        .split(body().tokenize(","))
+                        .process(checker)
+                        .end()
+                        .to("mock:end");
+
+            }
+        };
+    }
+
+    /**
+     * Stores values from the first invocation to compare them with the second invocation later.
+     */
+    private static class MdcCheckerProcessor implements Processor {
+
+        private String routeId = "route-async";
+        private String exchangeId;
+        private String messageId;
+        private String breadcrumbId;
+        private String contextId;
+        private Long threadId;
+        private String foo;
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // custom is propagated as its pattern matches
+            assertEquals("World", MDC.get("custom.hello"));
+            assertEquals("Baz", MDC.get("myKey"));
+
+            if (foo != null) {
+                // foo propagated because its the same thread
+                assertEquals(foo, MDC.get("foo"));
+            } else {
+                foo = MDC.get("foo");
+            }
+
+            if (threadId != null) {
+                Long currId = Thread.currentThread().getId();
+                assertEquals(threadId, (Object) currId);
+            } else {
+                threadId = Thread.currentThread().getId();
+            }
+
+            if (routeId != null) {
+                assertEquals(routeId, MDC.get("camel.routeId"));
+            }
+
+            if (exchangeId != null) {
+                assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+            } else {
+                exchangeId = MDC.get("camel.exchangeId");
+                assertTrue(exchangeId != null && exchangeId.length() > 0);
+            }
+
+            if (messageId != null) {
+                assertNotEquals(messageId, MDC.get("camel.messageId"));
+            } else {
+                messageId = MDC.get("camel.messageId");
+                assertTrue(messageId != null && messageId.length() > 0);
+            }
+
+            if (breadcrumbId != null) {
+                assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+            } else {
+                breadcrumbId = MDC.get("camel.breadcrumbId");
+                assertTrue(breadcrumbId != null && breadcrumbId.length() > 0);
+            }
+
+            if (contextId != null) {
+                assertEquals(contextId, MDC.get("camel.contextId"));
+            } else {
+                contextId = MDC.get("camel.contextId");
+                assertTrue(contextId != null && contextId.length() > 0);
+            }
+        }
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java
index 942bedd..e490f7e 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PatternHelper.java
@@ -65,6 +65,23 @@ public final class PatternHelper {
     }
 
     /**
+     * Matches the name with the given patterns (case insensitive).
+     *
+     * @param  name     the name
+     * @param  patterns pattern(s) to match
+     * @return          <tt>true</tt> if match, <tt>false</tt> otherwise.
+     * @see             #matchPattern(String, String)
+     */
+    public static boolean matchPatterns(String name, String[] patterns) {
+        for (String pattern : patterns) {
+            if (PatternHelper.matchPattern(name, pattern)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
      * Matches the name with the given pattern (case insensitive).
      * <p/>
      * The match rules are applied in this order: