You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2008/05/06 19:38:28 UTC
svn commit: r653848 - in
/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base:
./ threads/
Author: veithen
Date: Tue May 6 10:38:25 2008
New Revision: 653848
URL: http://svn.apache.org/viewvc?rev=653848&view=rev
Log:
SYNAPSE-285:
* Code & Javadoc cleanup in NativeWorkerPool.
* Added a shutdown method to WorkerPool and implemented it in NativeWorkerPool.
* Removed BackportWorkerPool as the code in this class never worked and we assume that we are running on Java 1.5 or above anyway. Updated WorkerPoolFactory to reflect that change.
* Shut down the worker pool in AbstractTransportListener#destroy.
Removed:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/BackportWorkerPool.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java Tue May 6 10:38:25 2008
@@ -122,6 +122,11 @@
} finally {
state = BaseConstants.STOPPED;
}
+ try {
+ workerPool.shutdown(10000);
+ } catch (InterruptedException ex) {
+ log.warn("Thread interrupted while waiting for worker pool to shut down");
+ }
}
public void stop() throws AxisFault {
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/NativeWorkerPool.java Tue May 6 10:38:25 2008
@@ -26,16 +26,14 @@
import java.util.concurrent.atomic.AtomicInteger;
/**
- * Utility class to support the backport util.concurrent in JDK 1.4 and the
- * native concurrent package in JDK 1.5 or later
+ * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later.
*/
public class NativeWorkerPool implements WorkerPool {
private static final Log log = LogFactory.getLog(NativeWorkerPool.class);
- private java.util.concurrent.Executor nativeExecutor = null;
- private Executor executor = null;
- private LinkedBlockingQueue blockingQueue = null;
+ private final ThreadPoolExecutor executor;
+ private final LinkedBlockingQueue<Runnable> blockingQueue;
public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName, String threadGroupId) {
@@ -44,7 +42,8 @@
log.debug("Using native util.concurrent package..");
}
blockingQueue =
- (queueLength == -1 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queueLength));
+ (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+ : new LinkedBlockingQueue<Runnable>(queueLength));
executor = new ThreadPoolExecutor(
core, max, keepAlive,
TimeUnit.SECONDS,
@@ -57,12 +56,17 @@
}
public int getActiveCount() {
- return ((ThreadPoolExecutor) executor).getActiveCount();
+ return executor.getActiveCount();
}
public int getQueueSize() {
return blockingQueue.size();
}
+
+ public void shutdown(int timeout) throws InterruptedException {
+ executor.shutdown();
+ executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ }
/**
* This is a simple ThreadFactory implementation using java.util.concurrent
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPool.java Tue May 6 10:38:25 2008
@@ -23,4 +23,18 @@
public void execute(Runnable task);
public int getActiveCount();
public int getQueueSize();
+
+ /**
+ * Destroy the worker pool. The pool will immediately stop
+ * accepting new tasks. All previously submitted tasks will
+ * be executed. The method blocks until all tasks have
+ * completed execution, or the timeout occurs, or the current
+ * thread is interrupted, whichever happens first.
+ *
+ * @param timeout the timeout value in milliseconds
+ * @throws InterruptedException if the current thread was
+ * interrupted while waiting for pending tasks to
+ * finish execution
+ */
+ public void shutdown(int timeout) throws InterruptedException;
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java?rev=653848&r1=653847&r2=653848&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/threads/WorkerPoolFactory.java Tue May 6 10:38:25 2008
@@ -20,19 +20,15 @@
package org.apache.synapse.transport.base.threads;
/**
- * Dynamically select util.concurrent implemenation
+ * Worker pool factory.
+ * For the moment this always creates {@link NativeWorkerPool} instances since
+ * we assume that we are running on Java 1.5 or above.
*/
public class WorkerPoolFactory {
public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName, String threadGroupId) {
- try {
- Class.forName("java.util.concurrent.ThreadPoolExecutor");
return new NativeWorkerPool(
core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
- } catch (ClassNotFoundException e) {
- return new BackportWorkerPool(
- core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
- }
}
}