You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/04/05 23:57:44 UTC
svn commit: r762167 -
/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Author: elecharny
Date: Sun Apr 5 21:57:44 2009
New Revision: 762167
URL: http://svn.apache.org/viewvc?rev=762167&view=rev
Log:
o Added Javadoc for all the constructors
o Replaced the non thread safe circular queue by a concurentLinkedQueue
o Added some constants definition for clarity sake
o Removed the inner SessionBuffer class
o Renamed the BUFFER constants to TASKS_QUEUE
o Added comments in the important parts of the code, for clarity
o Renamed queuehandler by eventQueueHandler
o Removed the maxPoolSize and corPoolSize, as they are already present in the parent class
o Added @inheritDoc tags
o Some other minor cleaning
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=762167&r1=762166&r2=762167&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Sun Apr 5 21:57:44 2009
@@ -25,6 +25,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
@@ -38,7 +39,6 @@
import org.apache.mina.core.session.DummySession;
import org.apache.mina.core.session.IoEvent;
import org.apache.mina.core.session.IoSession;
-import org.apache.mina.util.CircularQueue;
/**
* A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
@@ -51,48 +51,117 @@
* @org.apache.xbean.XBean
*/
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
-
+ /** A default value for the initial pool size */
+ private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
+
+ /** A default value for the maximum pool size */
+ private static final int DEFAULT_MAX_THREAD_POOL = 16;
+
+ /** A default value for the KeepAlive delay */
+ private static final int DEFAULT_KEEP_ALIVE = 30;
+
private static final IoSession EXIT_SIGNAL = new DummySession();
- private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
+ /** A key stored into the session's attribute for the event tasks being queued */
+ private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
+
private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
private final Set<Worker> workers = new HashSet<Worker>();
- private volatile int corePoolSize;
- private volatile int maximumPoolSize;
private volatile int largestPoolSize;
private final AtomicInteger idleWorkers = new AtomicInteger();
private long completedTaskCount;
private volatile boolean shutdown;
- private final IoEventQueueHandler queueHandler;
+ private final IoEventQueueHandler eventQueueHandler;
+ /**
+ * Creates a default ThreadPool, with default values :
+ * - minimum pool size is 0
+ * - maximum pool size is 16
+ * - keepAlive set to 30 seconds
+ * - A default ThreadFactory
+ * - All events are accepted
+ */
public OrderedThreadPoolExecutor() {
- this(16);
+ this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL,
+ DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
}
+ /**
+ * Creates a default ThreadPool, with default values :
+ * - minimum pool size is 0
+ * - keepAlive set to 30 seconds
+ * - A default ThreadFactory
+ * - All events are accepted
+ *
+ * @param maximumPoolSize The maximum pool size
+ */
public OrderedThreadPoolExecutor(int maximumPoolSize) {
- this(0, maximumPoolSize);
+ this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
+ Executors.defaultThreadFactory(), null);
}
+ /**
+ * Creates a default ThreadPool, with default values :
+ * - keepAlive set to 30 seconds
+ * - A default ThreadFactory
+ * - All events are accepted
+ *
+ * @param corePoolSize The initial pool sizePoolSize
+ * @param maximumPoolSize The maximum pool size
+ */
public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
- this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
+ this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
+ Executors.defaultThreadFactory(), null);
}
+ /**
+ * Creates a default ThreadPool, with default values :
+ * - A default ThreadFactory
+ * - All events are accepted
+ *
+ * @param corePoolSize The initial pool sizePoolSize
+ * @param maximumPoolSize The maximum pool size
+ * @param keepAliveTime Default duration for a thread
+ * @param unit Time unit used for the keepAlive value
+ */
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
+ Executors.defaultThreadFactory(), null);
}
+ /**
+ * Creates a default ThreadPool, with default values :
+ * - A default ThreadFactory
+ *
+ * @param corePoolSize The initial pool sizePoolSize
+ * @param maximumPoolSize The maximum pool size
+ * @param keepAliveTime Default duration for a thread
+ * @param unit Time unit used for the keepAlive value
+ * @param queueHandler The queue used to store events
+ */
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
IoEventQueueHandler queueHandler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
+ Executors.defaultThreadFactory(), queueHandler);
}
+ /**
+ * Creates a default ThreadPool, with default values :
+ * - A default ThreadFactory
+ *
+ * @param corePoolSize The initial pool sizePoolSize
+ * @param maximumPoolSize The maximum pool size
+ * @param keepAliveTime Default duration for a thread
+ * @param unit Time unit used for the keepAlive value
+ * @param threadFactory The factory used to create threads
+ */
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@@ -100,46 +169,76 @@
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
}
+ /**
+ * Creates a new instance of a OrderedThreadPoolExecutor.
+ *
+ * @param corePoolSize The initial pool sizePoolSize
+ * @param maximumPoolSize The maximum pool size
+ * @param keepAliveTime Default duration for a thread
+ * @param unit Time unit used for the keepAlive value
+ * @param threadFactory The factory used to create threads
+ * @param queueHandler The queue used to store events
+ */
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
- super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
- if (corePoolSize < 0) {
+ // We have to initialize the pool with default values (0 and 1) in order to
+ // handle the exception in a better way. We can't add a try {} catch() {}
+ // around the super() call.
+ super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
+ new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
+
+ if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
- if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
+ if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
}
- if (queueHandler == null) {
- queueHandler = IoEventQueueHandler.NOOP;
- }
-
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.queueHandler = queueHandler;
- }
-
+ // Now, we can setup the pool sizes
+ super.setCorePoolSize( corePoolSize );
+ super.setMaximumPoolSize( maximumPoolSize );
+
+ // The queueHandler might be null.
+ this.eventQueueHandler = queueHandler;
+ }
+
+
+ /**
+ * @return The associated queue handler.
+ */
public IoEventQueueHandler getQueueHandler() {
- return queueHandler;
+ return eventQueueHandler;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
// Ignore the request. It must always be AbortPolicy.
}
+ /**
+ * Add a new thread to execute a task, if needed and possible.
+ * It depends on the current pool size. If it's full, we do nothing.
+ */
private void addWorker() {
synchronized (workers) {
- if (workers.size() >= maximumPoolSize) {
+ if (workers.size() >= super.getMaximumPoolSize()) {
return;
}
+ // Create a new worker, and add it to the thread pool
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
+
+ // As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
+
+ // Now, we can start it.
thread.start();
workers.add(worker);
@@ -149,10 +248,13 @@
}
}
+ /**
+ * Add a new Worker only if there are no idle worker.
+ */
private void addWorkerIfNecessary() {
if (idleWorkers.get() == 0) {
synchronized (workers) {
- if (workers.isEmpty() || idleWorkers.get() == 0) {
+ if (workers.isEmpty() || (idleWorkers.get() == 0)) {
addWorker();
}
}
@@ -161,27 +263,33 @@
private void removeWorker() {
synchronized (workers) {
- if (workers.size() <= corePoolSize) {
+ if (workers.size() <= super.getCorePoolSize()) {
return;
}
waitingSessions.offer(EXIT_SIGNAL);
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int getMaximumPoolSize() {
- return maximumPoolSize;
+ return super.getMaximumPoolSize();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void setMaximumPoolSize(int maximumPoolSize) {
- if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
+ if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
throw new IllegalArgumentException("maximumPoolSize: "
+ maximumPoolSize);
}
synchronized (workers) {
- this.maximumPoolSize = maximumPoolSize;
+ super.setMaximumPoolSize( maximumPoolSize );
int difference = workers.size() - maximumPoolSize;
while (difference > 0) {
removeWorker();
@@ -190,6 +298,9 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
@@ -209,11 +320,17 @@
return isTerminated();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean isShutdown() {
return shutdown;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean isTerminated() {
if (!shutdown) {
@@ -225,6 +342,9 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void shutdown() {
if (shutdown) {
@@ -240,12 +360,16 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public List<Runnable> shutdownNow() {
shutdown();
List<Runnable> answer = new ArrayList<Runnable>();
IoSession session;
+
while ((session = waitingSessions.poll()) != null) {
if (session == EXIT_SIGNAL) {
waitingSessions.offer(EXIT_SIGNAL);
@@ -253,55 +377,71 @@
continue;
}
- SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
- synchronized (buf.queue) {
- for (Runnable task: buf.queue) {
+ Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
+
+ synchronized (tasksQueue) {
+
+ for (Runnable task: tasksQueue) {
getQueueHandler().polled(this, (IoEvent) task);
answer.add(task);
}
- buf.queue.clear();
+
+ tasksQueue.clear();
}
}
return answer;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void execute(Runnable task) {
if (shutdown) {
rejectTask(task);
}
+ // Check that it's a IoEvent task
checkTaskType(task);
- IoEvent e = (IoEvent) task;
- IoSession s = e.getSession();
- SessionBuffer buf = getSessionBuffer(s);
- Queue<Runnable> queue = buf.queue;
+ IoEvent event = (IoEvent) task;
+ IoSession session = event.getSession();
+
+ // Get the session's queue of events
+ Queue<Runnable> tasksQueue = getTasksQueue(session);
boolean offerSession;
- boolean offerEvent = queueHandler.accept(this, e);
+ boolean offerEvent = true;
+
+ // propose the new event to the event queue handler. If we
+ // use a throttle queue handler, the message may be rejected
+ // if the maximum size has been reached.
+ if (eventQueueHandler != null) {
+ offerEvent = eventQueueHandler.accept(this, event);
+ }
+
if (offerEvent) {
- synchronized (queue) {
- queue.offer(e);
- if (buf.processingCompleted) {
- buf.processingCompleted = false;
- offerSession = true;
- } else {
- offerSession = false;
- }
+ // Ok, the message has been accepted
+ synchronized (tasksQueue) {
+ offerSession = tasksQueue.isEmpty();
+
+ // Inject the event into the executor taskQueue
+ tasksQueue.offer(event);
}
} else {
offerSession = false;
}
if (offerSession) {
- waitingSessions.offer(s);
+ waitingSessions.offer(session);
}
addWorkerIfNecessary();
if (offerEvent) {
- queueHandler.offered(this, e);
+ if (eventQueueHandler != null) {
+ eventQueueHandler.offered(this, event);
+ }
}
}
@@ -315,6 +455,9 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int getActiveCount() {
synchronized (workers) {
@@ -322,6 +465,9 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public long getCompletedTaskCount() {
synchronized (workers) {
@@ -334,11 +480,17 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int getLargestPoolSize() {
return largestPoolSize;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int getPoolSize() {
synchronized (workers) {
@@ -346,11 +498,17 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public long getTaskCount() {
return getCompletedTaskCount();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean isTerminating() {
synchronized (workers) {
@@ -358,11 +516,14 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int prestartAllCoreThreads() {
int answer = 0;
synchronized (workers) {
- for (int i = corePoolSize - workers.size() ; i > 0; i --) {
+ for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {
addWorker();
answer ++;
}
@@ -370,10 +531,13 @@
return answer;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean prestartCoreThread() {
synchronized (workers) {
- if (workers.size() < corePoolSize) {
+ if (workers.size() < super.getCorePoolSize()) {
addWorker();
return true;
} else {
@@ -382,77 +546,92 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public BlockingQueue<Runnable> getQueue() {
throw new UnsupportedOperationException();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void purge() {
// Nothing to purge in this implementation.
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean remove(Runnable task) {
checkTaskType(task);
- IoEvent e = (IoEvent) task;
- IoSession s = e.getSession();
- SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER);
- if (buffer == null) {
+ IoEvent event = (IoEvent) task;
+ IoSession session = event.getSession();
+ Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE);
+
+ if (tasksQueue == null) {
return false;
}
boolean removed;
- synchronized (buffer.queue) {
- removed = buffer.queue.remove(task);
+
+ synchronized (tasksQueue) {
+ removed = tasksQueue.remove(task);
}
if (removed) {
- getQueueHandler().polled(this, e);
+ getQueueHandler().polled(this, event);
}
return removed;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int getCorePoolSize() {
- return corePoolSize;
+ return super.getCorePoolSize();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
- if (corePoolSize > maximumPoolSize) {
+ if (corePoolSize > super.getMaximumPoolSize()) {
throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
}
synchronized (workers) {
- if (this.corePoolSize > corePoolSize) {
- for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
+ if (super.getCorePoolSize()> corePoolSize) {
+ for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {
removeWorker();
}
}
- this.corePoolSize = corePoolSize;
+ super.setCorePoolSize(corePoolSize);
}
}
- private SessionBuffer getSessionBuffer(IoSession session) {
- SessionBuffer buffer = (SessionBuffer) session.getAttribute(BUFFER);
- if (buffer == null) {
- buffer = new SessionBuffer();
- SessionBuffer oldBuffer = (SessionBuffer) session.setAttributeIfAbsent(BUFFER, buffer);
- if (oldBuffer != null) {
- buffer = oldBuffer;
+ private Queue<Runnable> getTasksQueue(IoSession session) {
+ Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
+
+ if (tasksQueue == null) {
+ tasksQueue = new ConcurrentLinkedQueue<Runnable>();
+ Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue);
+
+ if (oldTasksQueue != null) {
+ tasksQueue = oldTasksQueue;
}
}
- return buffer;
- }
-
- private static class SessionBuffer {
- private final Queue<Runnable> queue = new CircularQueue<Runnable>();
- private boolean processingCompleted = true;
+
+ return tasksQueue;
}
private class Worker implements Runnable {
@@ -471,7 +650,7 @@
if (session == null) {
synchronized (workers) {
- if (workers.size() > corePoolSize) {
+ if (workers.size() > getCorePoolSize()) {
// Remove now to prevent duplicate exit.
workers.remove(this);
break;
@@ -485,7 +664,7 @@
try {
if (session != null) {
- runTasks(getSessionBuffer(session));
+ runTasks(getTasksQueue(session));
}
} finally {
idleWorkers.incrementAndGet();
@@ -527,19 +706,25 @@
return session;
}
- private void runTasks(SessionBuffer buf) {
+ private void runTasks(Queue<Runnable> tasksQueue) {
for (;;) {
Runnable task;
- synchronized (buf.queue) {
- task = buf.queue.poll();
+
+ synchronized (tasksQueue) {
+ if ( tasksQueue.isEmpty()) {
+ break;
+ }
+
+ task = tasksQueue.poll();
if (task == null) {
- buf.processingCompleted = true;
break;
}
}
- queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
+ if (eventQueueHandler != null) {
+ eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
+ }
runTask(task);
}