You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/31 13:00:14 UTC

[2/2] camel git commit: CAMEL-9037: When using defaultTaskExecutorType then the thread pool should shutdown when the DMLC is stopping.

CAMEL-9037: When using defaultTaskExecutorType then the thread pool should shutdown when the DMLC is stopping.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0dac50e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0dac50e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0dac50e

Branch: refs/heads/camel-2.15.x
Commit: a0dac50e7fcbb2abdb23271d918b3656e356266f
Parents: 18d7bfc
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 31 13:07:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 31 13:07:28 2015 +0200

----------------------------------------------------------------------
 .../jms/DefaultJmsMessageListenerContainer.java | 36 ++++++++++++++------
 .../component/jms/DefaultTaskExecutorType.java  | 13 +++----
 .../camel/component/jms/JmsConfiguration.java   |  1 +
 3 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
index ba3282e..63e5382 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
@@ -36,6 +36,7 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
 
     private final JmsEndpoint endpoint;
     private final boolean allowQuickStop;
+    private volatile TaskExecutor taskExecutor;
 
     public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) {
         this(endpoint, true);
@@ -96,23 +97,28 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
         String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
         String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName();
 
+        TaskExecutor answer;
+
         if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool) {
-            ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
-            answer.setBeanName(beanName);
-            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
-            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
+            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+            executor.setBeanName(beanName);
+            executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+            executor.setCorePoolSize(endpoint.getConcurrentConsumers());
             // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately.
             // We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by
             // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as
             // defined by maxConcurrentConsumers).
-            answer.setQueueCapacity(0);
-            answer.initialize();
-            return answer;
+            executor.setQueueCapacity(0);
+            executor.initialize();
+            answer = executor;
         } else {
-            SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
-            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
-            return answer;
+            SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(beanName);
+            executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+            answer = executor;
         }
+
+        taskExecutor = answer;
+        return answer;
     }
 
     @Override
@@ -122,6 +128,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
                     + " and sharedConnectionEnabled: " + sharedConnectionEnabled());
         }
         super.stop();
+
+        if (taskExecutor instanceof ThreadPoolTaskExecutor) {
+            ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
+            executor.destroy();
+        }
     }
 
     @Override
@@ -131,6 +142,11 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
                     + " and sharedConnectionEnabled: " + sharedConnectionEnabled());
         }
         super.destroy();
+
+        if (taskExecutor instanceof ThreadPoolTaskExecutor) {
+            ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
+            executor.destroy();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
index aa6166a..1022531 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
@@ -23,6 +23,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 /**
  * Hints what type of default task executor our {@link DefaultJmsMessageListenerContainer} should use.
+ *
  * @since 2.10.3
  */
 public enum DefaultTaskExecutorType {
@@ -30,13 +31,13 @@ public enum DefaultTaskExecutorType {
     /**
      * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor for consuming messages.
      * It will be configured with these attributes:
-     * <p />
+     * <p/>
      * <li>
-     * <ul>{@code corePoolSize} = concurrentConsumers</ul>
-     * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool,
-     * see Javadoc of {@link ThreadPoolExecutor}.</ul>
-     * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by
-     * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
+     *   <ul>{@code corePoolSize} = concurrentConsumers</ul>
+     *   <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing the thread pool,
+     *       see Javadoc of {@link ThreadPoolExecutor}.</ul>
+     *   <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency should be limited by
+     *       the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
      * </li>
      */
     ThreadPool,

http://git-wip-us.apache.org/repos/asf/camel/blob/a0dac50e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 8c6c5fe..7c5e151 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -199,6 +199,7 @@ public class JmsConfiguration implements Cloneable {
     private MessageListenerContainerFactory messageListenerContainerFactory;
     @UriParam
     private boolean includeSentJMSMessageID;
+    @UriParam
     private DefaultTaskExecutorType defaultTaskExecutorType;
     @UriParam
     private boolean includeAllJMSXProperties;