You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/11 14:24:12 UTC

(camel) branch camel-3.x updated: CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new a7b02f1a567 CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)
a7b02f1a567 is described below

commit a7b02f1a567b3bb52b9e6f463adcd4587503f895
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Dec 11 14:38:45 2023 +0100

    CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)
---
 .../apache/camel/processor/MulticastProcessor.java | 61 ++++++++------------
 .../processor/SplitTimeoutCancelTaskTest.java      | 65 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 37 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 8ce05283e28..94fec20d1c3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -52,8 +53,6 @@ import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
-import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
-import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ErrorHandlerAware;
@@ -269,21 +268,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
             processorExchangeFactory.setId(id);
             processorExchangeFactory.setRouteId(routeId);
         }
-
-        // eager load classes
-        Object dummy = new MulticastReactiveTask();
-        LOG.trace("Loaded {}", dummy.getClass().getName());
-        Object dummy2 = new MulticastTransactedTask();
-        LOG.trace("Loaded {}", dummy2.getClass().getName());
-        Object dummy3 = new UseOriginalAggregationStrategy();
-        LOG.trace("Loaded {}", dummy3.getClass().getName());
-        if (isShareUnitOfWork()) {
-            Object dummy4 = new ShareUnitOfWorkAggregationStrategy(null);
-            LOG.trace("Loaded {}", dummy4.getClass().getName());
-        }
-        Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null);
-        LOG.trace("Loaded {}", dummy5.getClass().getName());
-
         ServiceHelper.buildService(processorExchangeFactory);
     }
 
@@ -418,16 +402,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
         final Map<String, String> mdc;
-
-        private MulticastTask() {
-            // used for eager classloading
-            this.original = null;
-            this.pairs = null;
-            this.callback = null;
-            this.iterator = null;
-            this.mdc = null;
-            this.completion = null;
-        }
+        final ScheduledFuture<?> timeoutTask;
 
         MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int capacity) {
             this.original = original;
@@ -435,7 +410,9 @@ public class MulticastProcessor extends AsyncProcessorSupport
             this.callback = callback;
             this.iterator = pairs.iterator();
             if (timeout > 0) {
-                schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
+                timeoutTask = schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
+            } else {
+                timeoutTask = null;
             }
             // if MDC is enabled we must make a copy in this constructor when the task
             // is created by the caller thread, and then propagate back when run is called
@@ -506,15 +483,30 @@ public class MulticastProcessor extends AsyncProcessorSupport
                 } catch (Throwable e) {
                     original.setException(e);
                     // and do the done work
-                    doDone(null, false);
+                    doTimeoutDone(null, false);
                 } finally {
                     lock.unlock();
                 }
             }
         }
 
+        protected void doTimeoutDone(Exchange exchange, boolean forceExhaust) {
+            if (done.compareAndSet(false, true)) {
+                MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+            }
+        }
+
         protected void doDone(Exchange exchange, boolean forceExhaust) {
             if (done.compareAndSet(false, true)) {
+                // cancel timeout if we are done normally (we cannot cancel if called via onTimeout)
+                if (timeoutTask != null) {
+                    try {
+                        timeoutTask.cancel(true);
+                    } catch (Exception e) {
+                        // ignore
+                        LOG.debug("Cancel timeout task caused an exception. This exception is ignored.", e);
+                    }
+                }
                 MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
             }
         }
@@ -525,9 +517,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
      */
     protected class MulticastReactiveTask extends MulticastTask {
 
-        private MulticastReactiveTask() {
-        }
-
         public MulticastReactiveTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
                                      int size) {
             super(original, pairs, callback, size);
@@ -626,9 +615,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
      */
     protected class MulticastTransactedTask extends MulticastTask {
 
-        private MulticastTransactedTask() {
-        }
-
         public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
                                        int size) {
             super(original, pairs, callback, size);
@@ -738,9 +724,9 @@ public class MulticastProcessor extends AsyncProcessorSupport
         }
     }
 
-    protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
+    protected ScheduledFuture<?> schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
         if (executor instanceof ScheduledExecutorService) {
-            ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
+            return ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
         } else {
             executor.execute(() -> {
                 try {
@@ -751,6 +737,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
                 runnable.run();
             });
         }
+        return null;
     }
 
     protected StopWatch beforeSend(ProcessorExchangePair pair) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitTimeoutCancelTaskTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitTimeoutCancelTaskTest.java
new file mode 100644
index 00000000000..449528bdaa2
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitTimeoutCancelTaskTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.junit.jupiter.api.Test;
+
+public class SplitTimeoutCancelTaskTest extends ContextTestSupport {
+
+    String payload1 = "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
+    String payload2 = "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
+
+    @Test
+    public void testSplitterTimeoutShouldNotExhaustThreadPool() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
+        mockEndpoint.expectedMessageCount(4);
+
+        template.sendBody("direct:start", payload1);
+        template.sendBody("direct:start", payload2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("testProfile");
+                myThreadPoolProfile.setMaxPoolSize(20);
+                myThreadPoolProfile.setPoolSize(10);
+                myThreadPoolProfile.setMaxQueueSize(1);
+
+                getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
+
+                from("direct:start")
+                        .split()
+                        .xpath("//items/item")
+                        .parallelProcessing(true)
+                        .streaming(true)
+                        .stopOnException(true)
+                        .timeout("30000")
+                        .executorService("testProfile")
+                        .to("mock:split");
+            }
+        };
+    }
+}