You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/11/14 10:20:11 UTC
svn commit: r1034962 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/
main/java/org/apache/camel/util/concurrent/
test/java/org/apache/camel/util/concurrent/
Author: davsclaus
Date: Sun Nov 14 09:20:10 2010
New Revision: 1034962
URL: http://svn.apache.org/viewvc?rev=1034962&view=rev
Log:
CAMEL-3337: Fixed creating synchronous thread pool.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sun Nov 14 09:20:10 2010
@@ -254,6 +254,16 @@ public class DefaultExecutorServiceStrat
return answer;
}
+ public ExecutorService newSynchronousThreadPool(Object source, String name) {
+ ExecutorService answer = ExecutorServiceHelper.newSynchronousThreadPool(threadNamePattern, name);
+ onThreadPoolCreated(answer);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new synchronous thread pool for source: " + source + " with name: " + name + ". -> " + answer);
+ }
+ return answer;
+ }
+
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
onThreadPoolCreated(answer);
@@ -265,6 +275,17 @@ public class DefaultExecutorServiceStrat
return answer;
}
+ public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+ ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, maxQueueSize);
+ onThreadPoolCreated(answer);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize
+ + ", maxPoolSize=" + maxPoolSize + ", maxQueueSize=" + maxQueueSize + "] -> " + answer);
+ }
+ return answer;
+ }
+
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit timeUnit, int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
boolean daemon) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sun Nov 14 09:20:10 2010
@@ -194,6 +194,15 @@ public interface ExecutorServiceStrategy
ExecutorService newSingleThreadExecutor(Object source, String name);
/**
+ * Creates a new synchronous thread pool, which executes the task in the caller thread (no task queue).
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ExecutorService newSynchronousThreadPool(Object source, String name);
+
+ /**
* Creates a new custom thread pool.
* <p/>
* Will by default use 60 seconds for keep alive time for idle threads.
@@ -209,6 +218,21 @@ public interface ExecutorServiceStrategy
/**
* Creates a new custom thread pool.
+ * <p/>
+ * Will by default use 60 seconds for keep alive time for idle threads.
+ * And use {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as rejection handler
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param corePoolSize the core pool size
+ * @param maxPoolSize the maximum pool size
+ * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
+ * @return the created thread pool
+ */
+ ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize);
+
+ /**
+ * Creates a new custom thread pool.
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sun Nov 14 09:20:10 2010
@@ -112,7 +112,9 @@ public final class ExecutorServiceHelper
}
/**
- * Creates a new fixed thread pool
+ * Creates a new fixed thread pool.
+ * <p/>
+ * Beware that the task queue is unbounded
*
* @param poolSize the fixed pool size
* @param pattern pattern of the thread name
@@ -169,6 +171,20 @@ public final class ExecutorServiceHelper
}
/**
+ * Creates a new synchronous thread pool which executes the task in the caller thread (no task queue)
+ * <p/>
+ * Uses a {@link java.util.concurrent.SynchronousQueue} queue as task queue.
+ *
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @return the created pool
+ */
+ public static ExecutorService newSynchronousThreadPool(final String pattern, final String name) {
+ return ExecutorServiceHelper.newThreadPool(pattern, name, 0, 0, 60,
+ TimeUnit.SECONDS, 0, new ThreadPoolExecutor.CallerRunsPolicy(), true);
+ }
+
+ /**
* Creates a new custom thread pool using 60 seconds as keep alive and with an unbounded queue.
*
* @param pattern pattern of the thread name
@@ -183,6 +199,21 @@ public final class ExecutorServiceHelper
}
/**
+ * Creates a new custom thread pool using 60 seconds as keep alive and with bounded queue.
+ *
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @param corePoolSize the core size
+ * @param maxPoolSize the maximum pool size
+ * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
+ * @return the created pool
+ */
+ public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+ return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60,
+ TimeUnit.SECONDS, maxQueueSize, new ThreadPoolExecutor.CallerRunsPolicy(), true);
+ }
+
+ /**
* Creates a new custom thread pool
*
* @param pattern pattern of the thread name
@@ -209,8 +240,11 @@ public final class ExecutorServiceHelper
BlockingQueue<Runnable> queue;
if (corePoolSize == 0 && maxQueueSize <= 0) {
- // use a synchronous so we can act like the cached thread pool
+ // use a synchronous queue
queue = new SynchronousQueue<Runnable>();
+ // and force 1 as pool size to be able to create the thread pool by the JDK
+ corePoolSize = 1;
+ maxPoolSize = 1;
} else if (maxQueueSize <= 0) {
// unbounded task queue
queue = new LinkedBlockingQueue<Runnable>();
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java?rev=1034962&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java Sun Nov 14 09:20:10 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class ExecutorServiceHelperTest extends TestCase {
+
+ public void testGetThreadName() {
+ String name = ExecutorServiceHelper.getThreadName("Camel Thread ${counter} - ${name}", "foo");
+
+ assertTrue(name.startsWith("Camel Thread"));
+ assertTrue(name.endsWith("foo"));
+ }
+
+ public void testNewScheduledThreadPool() {
+ ScheduledExecutorService pool = ExecutorServiceHelper.newScheduledThreadPool(1, "MyPool ${name}", "foo", true);
+ assertNotNull(pool);
+ }
+
+ public void testNewThreadPool() {
+ ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1);
+ assertNotNull(pool);
+ }
+
+ public void testNewThreadPool2() {
+ ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1, 20);
+ assertNotNull(pool);
+ }
+
+ public void testNewThreadPool3() {
+ ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1,
+ 30, TimeUnit.SECONDS, 20, null, true);
+ assertNotNull(pool);
+ }
+
+ public void testNewCachedThreadPool() {
+ ExecutorService pool = ExecutorServiceHelper.newCachedThreadPool("MyPool ${name}", "foo", true);
+ assertNotNull(pool);
+ }
+
+ public void testNewFixedThreadPool() {
+ ExecutorService pool = ExecutorServiceHelper.newFixedThreadPool(1, "MyPool ${name}", "foo", true);
+ assertNotNull(pool);
+ }
+
+ public void testNewSynchronousThreadPool() {
+ ExecutorService pool = ExecutorServiceHelper.newSynchronousThreadPool("MyPool ${name}", "foo");
+ assertNotNull(pool);
+ }
+
+ public void testNewSingleThreadPool() {
+ ExecutorService pool = ExecutorServiceHelper.newSingleThreadExecutor("MyPool ${name}", "foo", true);
+ assertNotNull(pool);
+ }
+
+}