You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2008/05/06 19:38:28 UTC

svn commit: r653848 - in /synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base: ./ threads/

Author: veithen
Date: Tue May  6 10:38:25 2008
New Revision: 653848

URL: http://svn.apache.org/viewvc?rev=653848&view=rev
Log:
SYNAPSE-285:
* Code & Javadoc cleanup in NativeWorkerPool.
* Added a shutdown method to WorkerPool and implemented it in NativeWorkerPool.
* Removed BackportWorkerPool as the code in this class never worked and we assume that we are running on Java 1.5 or above anyway. Updated WorkerPoolFactory to reflect that change.
* Shut down the worker pool in AbstractTransportListener#destroy.

Removed:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/BackportWorkerPool.java
Modified:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java Tue May  6 10:38:25 2008
@@ -122,6 +122,11 @@
         } finally {
             state = BaseConstants.STOPPED;
         }
+        try {
+            workerPool.shutdown(10000);
+        } catch (InterruptedException ex) {
+            log.warn("Thread interrupted while waiting for worker pool to shut down");
+        }
     }
 
     public void stop() throws AxisFault {

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java Tue May  6 10:38:25 2008
@@ -26,16 +26,14 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * Utility class to support the backport util.concurrent in JDK 1.4 and the
- * native concurrent package in JDK 1.5 or later
+ * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later.
  */
 public class NativeWorkerPool implements WorkerPool {
 
     private static final Log log = LogFactory.getLog(NativeWorkerPool.class);
 
-    private java.util.concurrent.Executor nativeExecutor = null;
-    private Executor executor = null;
-    private LinkedBlockingQueue blockingQueue = null;
+    private final ThreadPoolExecutor executor;
+    private final LinkedBlockingQueue<Runnable> blockingQueue;
 
     public NativeWorkerPool(int core, int max, int keepAlive,
         int queueLength, String threadGroupName, String threadGroupId) {
@@ -44,7 +42,8 @@
             log.debug("Using native util.concurrent package..");
         }
         blockingQueue =
-            (queueLength == -1 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queueLength));
+            (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+                               : new LinkedBlockingQueue<Runnable>(queueLength));
         executor = new ThreadPoolExecutor(
             core, max, keepAlive,
             TimeUnit.SECONDS,
@@ -57,12 +56,17 @@
     }
 
     public int getActiveCount() {
-        return ((ThreadPoolExecutor) executor).getActiveCount();
+        return executor.getActiveCount();
     }
 
     public int getQueueSize() {
         return blockingQueue.size();
     }
+    
+    public void shutdown(int timeout) throws InterruptedException {
+        executor.shutdown();
+        executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+    }
 
     /**
      * This is a simple ThreadFactory implementation using java.util.concurrent

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java Tue May  6 10:38:25 2008
@@ -23,4 +23,18 @@
     public void execute(Runnable task);
     public int getActiveCount();
     public int getQueueSize();
+    
+    /**
+     * Destroy the worker pool. The pool will immediately stop
+     * accepting new tasks. All previously submitted tasks will
+     * be executed. The method blocks until all tasks have
+     * completed execution, or the timeout occurs, or the current
+     * thread is interrupted, whichever happens first.
+     * 
+     * @param timeout the timeout value in milliseconds
+     * @throws InterruptedException if the current thread was
+     *         interrupted while waiting for pending tasks to
+     *         finish execution
+     */
+    public void shutdown(int timeout) throws InterruptedException;
 }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java Tue May  6 10:38:25 2008
@@ -20,19 +20,15 @@
 package org.apache.synapse.transport.base.threads;
 
 /**
- * Dynamically select util.concurrent implemenation
+ * Worker pool factory.
+ * For the moment this always creates {@link NativeWorkerPool} instances since
+ * we assume that we are running on Java 1.5 or above.
  */
 public class WorkerPoolFactory {
 
     public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
         int queueLength, String threadGroupName, String threadGroupId) {
-        try {
-            Class.forName("java.util.concurrent.ThreadPoolExecutor");
             return new NativeWorkerPool(
                 core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
-        } catch (ClassNotFoundException e) {
-            return new BackportWorkerPool(
-                core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
-        }
     }
 }