You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/11 14:24:31 UTC
(camel) branch camel-3.21.x updated: CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push:
new 93413fcc5cd CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)
93413fcc5cd is described below
commit 93413fcc5cdfab9cb1632212c4203038c4bb9da7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Dec 11 14:38:45 2023 +0100
CAMEL-20214: camel-core: Multicast/Splitter EIP using timeout should cancel task if completed before timeout, so task is not taking up space in thread pool. (#12399)
---
.../apache/camel/processor/MulticastProcessor.java | 61 ++++++++------------
.../processor/SplitTimeoutCancelTaskTest.java | 65 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 37 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 8ce05283e28..94fec20d1c3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,8 +53,6 @@ import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
-import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
-import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ErrorHandlerAware;
@@ -269,21 +268,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
processorExchangeFactory.setId(id);
processorExchangeFactory.setRouteId(routeId);
}
-
- // eager load classes
- Object dummy = new MulticastReactiveTask();
- LOG.trace("Loaded {}", dummy.getClass().getName());
- Object dummy2 = new MulticastTransactedTask();
- LOG.trace("Loaded {}", dummy2.getClass().getName());
- Object dummy3 = new UseOriginalAggregationStrategy();
- LOG.trace("Loaded {}", dummy3.getClass().getName());
- if (isShareUnitOfWork()) {
- Object dummy4 = new ShareUnitOfWorkAggregationStrategy(null);
- LOG.trace("Loaded {}", dummy4.getClass().getName());
- }
- Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null);
- LOG.trace("Loaded {}", dummy5.getClass().getName());
-
ServiceHelper.buildService(processorExchangeFactory);
}
@@ -418,16 +402,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
final AtomicBoolean allSent = new AtomicBoolean();
final AtomicBoolean done = new AtomicBoolean();
final Map<String, String> mdc;
-
- private MulticastTask() {
- // used for eager classloading
- this.original = null;
- this.pairs = null;
- this.callback = null;
- this.iterator = null;
- this.mdc = null;
- this.completion = null;
- }
+ final ScheduledFuture<?> timeoutTask;
MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int capacity) {
this.original = original;
@@ -435,7 +410,9 @@ public class MulticastProcessor extends AsyncProcessorSupport
this.callback = callback;
this.iterator = pairs.iterator();
if (timeout > 0) {
- schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
+ timeoutTask = schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
+ } else {
+ timeoutTask = null;
}
// if MDC is enabled we must make a copy in this constructor when the task
// is created by the caller thread, and then propagate back when run is called
@@ -506,15 +483,30 @@ public class MulticastProcessor extends AsyncProcessorSupport
} catch (Throwable e) {
original.setException(e);
// and do the done work
- doDone(null, false);
+ doTimeoutDone(null, false);
} finally {
lock.unlock();
}
}
}
+ protected void doTimeoutDone(Exchange exchange, boolean forceExhaust) {
+ if (done.compareAndSet(false, true)) {
+ MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+ }
+ }
+
protected void doDone(Exchange exchange, boolean forceExhaust) {
if (done.compareAndSet(false, true)) {
+ // cancel timeout if we are done normally (we cannot cancel if called via onTimeout)
+ if (timeoutTask != null) {
+ try {
+ timeoutTask.cancel(true);
+ } catch (Exception e) {
+ // ignore
+ LOG.debug("Cancel timeout task caused an exception. This exception is ignored.", e);
+ }
+ }
MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
}
}
@@ -525,9 +517,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
*/
protected class MulticastReactiveTask extends MulticastTask {
- private MulticastReactiveTask() {
- }
-
public MulticastReactiveTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
int size) {
super(original, pairs, callback, size);
@@ -626,9 +615,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
*/
protected class MulticastTransactedTask extends MulticastTask {
- private MulticastTransactedTask() {
- }
-
public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
int size) {
super(original, pairs, callback, size);
@@ -738,9 +724,9 @@ public class MulticastProcessor extends AsyncProcessorSupport
}
}
- protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
+ protected ScheduledFuture<?> schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
- ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
+ return ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
} else {
executor.execute(() -> {
try {
@@ -751,6 +737,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
runnable.run();
});
}
+ return null;
}
protected StopWatch beforeSend(ProcessorExchangePair pair) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitTimeoutCancelTaskTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitTimeoutCancelTaskTest.java
new file mode 100644
index 00000000000..449528bdaa2
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitTimeoutCancelTaskTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.junit.jupiter.api.Test;
+
+public class SplitTimeoutCancelTaskTest extends ContextTestSupport {
+
+ String payload1 = "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
+ String payload2 = "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
+
+ @Test
+ public void testSplitterTimeoutShouldNotExhaustThreadPool() throws Exception {
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
+ mockEndpoint.expectedMessageCount(4);
+
+ template.sendBody("direct:start", payload1);
+ template.sendBody("direct:start", payload2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("testProfile");
+ myThreadPoolProfile.setMaxPoolSize(20);
+ myThreadPoolProfile.setPoolSize(10);
+ myThreadPoolProfile.setMaxQueueSize(1);
+
+ getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
+
+ from("direct:start")
+ .split()
+ .xpath("//items/item")
+ .parallelProcessing(true)
+ .streaming(true)
+ .stopOnException(true)
+ .timeout("30000")
+ .executorService("testProfile")
+ .to("mock:split");
+ }
+ };
+ }
+}