You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/28 07:52:13 UTC
[camel] branch camel-3.0.x updated: CAMEL-14442: Fixed
camel-scheduler to use same executor service for same scheduler name.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.0.x by this push:
new 46b671e CAMEL-14442: Fixed camel-scheduler to use same executor service for same scheduler name.
46b671e is described below
commit 46b671e8a8cffdc5f42ff19bd29806324c1b3f18
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jan 28 08:51:24 2020 +0100
CAMEL-14442: Fixed camel-scheduler to use same executor service for same scheduler name.
---
.../component/scheduler/TwoSchedulerTest.java | 13 ++++++++++--
.../camel/support/ScheduledPollConsumer.java | 23 +++++++++++-----------
2 files changed, 22 insertions(+), 14 deletions(-)
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/scheduler/TwoSchedulerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/scheduler/TwoSchedulerTest.java
index dc8a591..44e9520 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/scheduler/TwoSchedulerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/scheduler/TwoSchedulerTest.java
@@ -28,15 +28,24 @@ public class TwoSchedulerTest extends ContextTestSupport {
getMockEndpoint("mock:b").expectedMinimumMessageCount(2);
assertMockEndpointsSatisfied();
+
+ // should use same thread as they share the same scheduler
+ String tn1 = getMockEndpoint("mock:a").getReceivedExchanges().get(0).getMessage().getHeader("tn", String.class);
+ String tn2 = getMockEndpoint("mock:b").getReceivedExchanges().get(0).getMessage().getHeader("tn", String.class);
+ assertSame(tn1, tn2);
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- from("scheduler://foo?delay=100").to("mock:a");
+ from("scheduler://foo?delay=100")
+ .setHeader("tn", simple("${threadName}"))
+ .to("mock:a");
- from("scheduler://foo?delay=200").to("mock:b");
+ from("scheduler://foo?delay=200")
+ .setHeader("tn", simple("${threadName}"))
+ .to("mock:b");
}
};
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 77e53ee..17d4194 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -431,17 +431,6 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
log.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint());
}
- if (scheduler == null) {
- DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService);
- scheduler.setDelay(delay);
- scheduler.setInitialDelay(initialDelay);
- scheduler.setTimeUnit(timeUnit);
- scheduler.setUseFixedDelay(useFixedDelay);
- this.scheduler = scheduler;
- }
- scheduler.setCamelContext(getEndpoint().getCamelContext());
- scheduler.onInit(this);
-
// configure scheduler with options from this consumer
if (schedulerProperties != null && !schedulerProperties.isEmpty()) {
// need to use a copy in case the consumer is restarted so we keep the properties
@@ -455,7 +444,6 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
}
}
- ObjectHelper.notNull(scheduler, "scheduler", this);
ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
}
@@ -463,6 +451,17 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
protected void doStart() throws Exception {
super.doStart();
+ if (scheduler == null) {
+ DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService);
+ scheduler.setDelay(delay);
+ scheduler.setInitialDelay(initialDelay);
+ scheduler.setTimeUnit(timeUnit);
+ scheduler.setUseFixedDelay(useFixedDelay);
+ this.scheduler = scheduler;
+ }
+ scheduler.setCamelContext(getEndpoint().getCamelContext());
+ scheduler.onInit(this);
+
if (scheduler != null) {
scheduler.scheduleTask(this);
ServiceHelper.startService(scheduler);