You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/31 13:00:14 UTC
[2/2] camel git commit: CAMEL-9037: When using
defaultTaskExecutorType then the thread pool should shutdown when the DMLC is
stopping.
CAMEL-9037: When using defaultTaskExecutorType then the thread pool should shutdown when the DMLC is stopping.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0dac50e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0dac50e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0dac50e
Branch: refs/heads/camel-2.15.x
Commit: a0dac50e7fcbb2abdb23271d918b3656e356266f
Parents: 18d7bfc
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 31 13:07:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 31 13:07:28 2015 +0200
----------------------------------------------------------------------
.../jms/DefaultJmsMessageListenerContainer.java | 36 ++++++++++++++------
.../component/jms/DefaultTaskExecutorType.java | 13 +++----
.../camel/component/jms/JmsConfiguration.java | 1 +
3 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
index ba3282e..63e5382 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
@@ -36,6 +36,7 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
private final JmsEndpoint endpoint;
private final boolean allowQuickStop;
+ private volatile TaskExecutor taskExecutor;
public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) {
this(endpoint, true);
@@ -96,23 +97,28 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName();
+ TaskExecutor answer;
+
if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool) {
- ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
- answer.setBeanName(beanName);
- answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
- answer.setCorePoolSize(endpoint.getConcurrentConsumers());
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setBeanName(beanName);
+ executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+ executor.setCorePoolSize(endpoint.getConcurrentConsumers());
// Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately.
// We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by
// maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as
// defined by maxConcurrentConsumers).
- answer.setQueueCapacity(0);
- answer.initialize();
- return answer;
+ executor.setQueueCapacity(0);
+ executor.initialize();
+ answer = executor;
} else {
- SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
- answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
- return answer;
+ SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(beanName);
+ executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+ answer = executor;
}
+
+ taskExecutor = answer;
+ return answer;
}
@Override
@@ -122,6 +128,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
+ " and sharedConnectionEnabled: " + sharedConnectionEnabled());
}
super.stop();
+
+ if (taskExecutor instanceof ThreadPoolTaskExecutor) {
+ ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
+ executor.destroy();
+ }
}
@Override
@@ -131,6 +142,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
+ " and sharedConnectionEnabled: " + sharedConnectionEnabled());
}
super.destroy();
+
+ if (taskExecutor instanceof ThreadPoolTaskExecutor) {
+ ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
+ executor.destroy();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
index aa6166a..1022531 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
@@ -23,6 +23,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* Hints what type of default task executor our {@link DefaultJmsMessageListenerContainer} should use.
+ *
* @since 2.10.3
*/
public enum DefaultTaskExecutorType {
@@ -30,13 +31,13 @@ public enum DefaultTaskExecutorType {
/**
* Use a {@link ThreadPoolTaskExecutor} as the underlying task executor for consuming messages.
* It will be configured with these attributes:
- * <p />
+ * <p/>
* <li>
- * <ul>{@code corePoolSize} = concurrentConsumers</ul>
- * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool,
- * see Javadoc of {@link ThreadPoolExecutor}.</ul>
- * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by
- * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
+ * <ul>{@code corePoolSize} = concurrentConsumers</ul>
+ * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool,
+ * see Javadoc of {@link ThreadPoolExecutor}.</ul>
+ * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by
+ * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
* </li>
*/
ThreadPool,
http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 8c6c5fe..7c5e151 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -199,6 +199,7 @@ public class JmsConfiguration implements Cloneable {
private MessageListenerContainerFactory messageListenerContainerFactory;
@UriParam
private boolean includeSentJMSMessageID;
+ @UriParam
private DefaultTaskExecutorType defaultTaskExecutorType;
@UriParam
private boolean includeAllJMSXProperties;