You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/11/14 10:20:11 UTC

svn commit: r1034962 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/ main/java/org/apache/camel/util/concurrent/ test/java/org/apache/camel/util/concurrent/

Author: davsclaus
Date: Sun Nov 14 09:20:10 2010
New Revision: 1034962

URL: http://svn.apache.org/viewvc?rev=1034962&view=rev
Log:
CAMEL-3337: Fixed creating synchronous thread pool.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sun Nov 14 09:20:10 2010
@@ -254,6 +254,16 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    public ExecutorService newSynchronousThreadPool(Object source, String name) {
+        ExecutorService answer = ExecutorServiceHelper.newSynchronousThreadPool(threadNamePattern, name);
+        onThreadPoolCreated(answer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new synchronous thread pool for source: " + source + " with name: " + name + ". -> " + answer);
+        }
+        return answer;
+    }
+
     public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
         ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
         onThreadPoolCreated(answer);
@@ -265,6 +275,17 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+        ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, maxQueueSize);
+        onThreadPoolCreated(answer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize
+                    + ", maxPoolSize=" + maxPoolSize + ", maxQueueSize=" + maxQueueSize + "] -> " + answer);
+        }
+        return answer;
+    }
+
     public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime,
                                          TimeUnit timeUnit, int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
                                          boolean daemon) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sun Nov 14 09:20:10 2010
@@ -194,6 +194,15 @@ public interface ExecutorServiceStrategy
     ExecutorService newSingleThreadExecutor(Object source, String name);
 
     /**
+     * Creates a new synchronous thread pool, which executes the task in the caller thread (no task queue).
+     *
+     * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newSynchronousThreadPool(Object source, String name);
+
+    /**
      * Creates a new custom thread pool.
      * <p/>
      * Will by default use 60 seconds for keep alive time for idle threads.
@@ -209,6 +218,21 @@ public interface ExecutorServiceStrategy
 
     /**
      * Creates a new custom thread pool.
+     * <p/>
+     * Will by default use 60 seconds for keep alive time for idle threads.
+     * And use {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as rejection handler
+     *
+     * @param source        the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name          name which is appended to the thread name
+     * @param corePoolSize  the core pool size
+     * @param maxPoolSize   the maximum pool size
+     * @param maxQueueSize  the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
+     * @return the created thread pool
+     */
+    ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize);
+
+    /**
+     * Creates a new custom thread pool.
      *
      * @param source                     the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name                       name which is appended to the thread name

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sun Nov 14 09:20:10 2010
@@ -112,7 +112,9 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new fixed thread pool
+     * Creates a new fixed thread pool.
+     * <p/>
+     * Beware that the task queue is unbounded
      *
      * @param poolSize the fixed pool size
      * @param pattern  pattern of the thread name
@@ -169,6 +171,20 @@ public final class ExecutorServiceHelper
     }
 
     /**
+     * Creates a new synchronous thread pool which executes the task in the caller thread (no task queue)
+     * <p/>
+     * Uses a {@link java.util.concurrent.SynchronousQueue} queue as task queue.
+     *
+     * @param pattern      pattern of the thread name
+     * @param name         ${name} in the pattern name
+     * @return the created pool
+     */
+    public static ExecutorService newSynchronousThreadPool(final String pattern, final String name) {
+        return ExecutorServiceHelper.newThreadPool(pattern, name, 0, 0, 60,
+                TimeUnit.SECONDS, 0, new ThreadPoolExecutor.CallerRunsPolicy(), true);
+    }
+
+    /**
      * Creates a new custom thread pool using 60 seconds as keep alive and with an unbounded queue.
      *
      * @param pattern      pattern of the thread name
@@ -183,6 +199,21 @@ public final class ExecutorServiceHelper
     }
 
     /**
+     * Creates a new custom thread pool using 60 seconds as keep alive and with bounded queue.
+     *
+     * @param pattern      pattern of the thread name
+     * @param name         ${name} in the pattern name
+     * @param corePoolSize the core size
+     * @param maxPoolSize  the maximum pool size
+     * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
+     * @return the created pool
+     */
+    public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+        return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60,
+                TimeUnit.SECONDS, maxQueueSize, new ThreadPoolExecutor.CallerRunsPolicy(), true);
+    }
+
+    /**
      * Creates a new custom thread pool
      *
      * @param pattern                  pattern of the thread name
@@ -209,8 +240,11 @@ public final class ExecutorServiceHelper
 
         BlockingQueue<Runnable> queue;
         if (corePoolSize == 0 && maxQueueSize <= 0) {
-            // use a synchronous so we can act like the cached thread pool
+            // use a synchronous queue
             queue = new SynchronousQueue<Runnable>();
+            // and force 1 as pool size to be able to create the thread pool by the JDK
+            corePoolSize = 1;
+            maxPoolSize = 1;
         } else if (maxQueueSize <= 0) {
             // unbounded task queue
             queue = new LinkedBlockingQueue<Runnable>();

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java?rev=1034962&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java Sun Nov 14 09:20:10 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class ExecutorServiceHelperTest extends TestCase {
+
+    public void testGetThreadName() {
+        String name = ExecutorServiceHelper.getThreadName("Camel Thread ${counter} - ${name}", "foo");
+
+        assertTrue(name.startsWith("Camel Thread"));
+        assertTrue(name.endsWith("foo"));
+    }
+
+    public void testNewScheduledThreadPool() {
+        ScheduledExecutorService pool = ExecutorServiceHelper.newScheduledThreadPool(1, "MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+    public void testNewThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1);
+        assertNotNull(pool);
+    }
+
+    public void testNewThreadPool2() {
+        ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1, 20);
+        assertNotNull(pool);
+    }
+
+    public void testNewThreadPool3() {
+        ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1,
+                30, TimeUnit.SECONDS, 20, null, true);
+        assertNotNull(pool);
+    }
+
+    public void testNewCachedThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newCachedThreadPool("MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+    public void testNewFixedThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newFixedThreadPool(1, "MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+    public void testNewSynchronousThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newSynchronousThreadPool("MyPool ${name}", "foo");
+        assertNotNull(pool);
+    }
+
+    public void testNewSingleThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newSingleThreadExecutor("MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+}