You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/05/14 01:13:48 UTC
svn commit: r774589 -
/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Author: elecharny
Date: Wed May 13 23:13:48 2009
New Revision: 774589
URL: http://svn.apache.org/viewvc?rev=774589&view=rev
Log:
o Fix for DIRMINA-687
o Removed the @version tag
o Added a LOGGER, but commented it as it breaks the MDCFilterTest for some unknown reason ...
o Minor refactoring
o Javadoc addition
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=774589&r1=774588&r2=774589&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Wed May 13 23:13:48 2009
@@ -39,6 +39,8 @@
import org.apache.mina.core.session.DummySession;
import org.apache.mina.core.session.IoEvent;
import org.apache.mina.core.session.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
@@ -47,10 +49,12 @@
* {@link UnorderedThreadPoolExecutor}.
* @author The Apache MINA Project (dev@mina.apache.org)
- * @version $Rev$, $Date$
* @org.apache.xbean.XBean
*/
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
+ /** A logger for this class (commented as it breaks MDCFlter tests) */
+ //static Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
+
/** A default value for the initial pool size */
private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
@@ -65,6 +69,7 @@
/** A key stored into the session's attribute for the event tasks being queued */
private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
+ /** A queue used to store the available sessions */
private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
private final Set<Worker> workers = new HashSet<Worker>();
@@ -142,14 +147,14 @@
* @param maximumPoolSize The maximum pool size
* @param keepAliveTime Default duration for a thread
* @param unit Time unit used for the keepAlive value
- * @param queueHandler The queue used to store events
+ * @param eventQueueHandler The queue used to store events
*/
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- IoEventQueueHandler queueHandler) {
+ IoEventQueueHandler eventQueueHandler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
- Executors.defaultThreadFactory(), queueHandler);
+ Executors.defaultThreadFactory(), eventQueueHandler);
}
/**
@@ -177,12 +182,12 @@
* @param keepAliveTime Default duration for a thread
* @param unit Time unit used for the keepAlive value
* @param threadFactory The factory used to create threads
- * @param queueHandler The queue used to store events
+ * @param eventQueueHandler The queue used to store events
*/
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
+ ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
// We have to initialize the pool with default values (0 and 1) in order to
// handle the exception in a better way. We can't add a try {} catch() {}
// around the super() call.
@@ -202,11 +207,35 @@
super.setMaximumPoolSize( maximumPoolSize );
// The queueHandler might be null.
- this.eventQueueHandler = queueHandler;
+ if (eventQueueHandler == null) {
+ this.eventQueueHandler = IoEventQueueHandler.NOOP;
+ } else {
+ this.eventQueueHandler = eventQueueHandler;
+ }
}
/**
+ * Get the session's tasks queue.
+ */
+ private SessionTasksQueue getSessionTasksQueue(IoSession session) {
+ SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
+
+ if (queue == null) {
+ queue = new SessionTasksQueue();
+ SessionTasksQueue oldQueue =
+ (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
+
+ if (oldQueue != null) {
+ queue = oldQueue;
+ }
+ }
+
+ return queue;
+ }
+
+
+ /**
* @return The associated queue handler.
*/
public IoEventQueueHandler getQueueHandler() {
@@ -232,7 +261,7 @@
}
// Create a new worker, and add it to the thread pool
- Worker worker = new Worker();
+ Worker worker = new Worker( workers.size());
Thread thread = getThreadFactory().newThread(worker);
// As we have added a new thread, it's considered as idle.
@@ -377,21 +406,43 @@
continue;
}
- Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
+ SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
- synchronized (tasksQueue) {
+ synchronized (sessionTasksQueue.tasksQueue) {
- for (Runnable task: tasksQueue) {
+ for (Runnable task: sessionTasksQueue.tasksQueue) {
getQueueHandler().polled(this, (IoEvent) task);
answer.add(task);
}
- tasksQueue.clear();
+ sessionTasksQueue.tasksQueue.clear();
}
}
return answer;
}
+
+
+ /**
+ * A Helper class used to print the list of events being queued.
+ */
+ private void print( Queue<Runnable> queue, IoEvent event) {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "Adding event " ).append( event.getType() ).append( " to session " ).append(event.getSession().getId() );
+ boolean first = true;
+ sb.append( "\nQueue : [" );
+ for (Runnable elem:queue) {
+ if ( first ) {
+ first = false;
+ } else {
+ sb.append( ", " );
+ }
+
+ sb.append(((IoEvent)elem).getType()).append(", ");
+ }
+ sb.append( "]\n" );
+ //LOGGER.debug( sb.toString() );
+ }
/**
* {@inheritDoc}
@@ -406,42 +457,55 @@
checkTaskType(task);
IoEvent event = (IoEvent) task;
+
+ // Get the associated session
IoSession session = event.getSession();
// Get the session's queue of events
- Queue<Runnable> tasksQueue = getTasksQueue(session);
- boolean offerSession;
- boolean offerEvent = true;
+ SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
+ Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+ boolean offerSession;
+
// propose the new event to the event queue handler. If we
// use a throttle queue handler, the message may be rejected
// if the maximum size has been reached.
- if (eventQueueHandler != null) {
- offerEvent = eventQueueHandler.accept(this, event);
- }
+ boolean offerEvent = eventQueueHandler.accept(this, event);
if (offerEvent) {
// Ok, the message has been accepted
synchronized (tasksQueue) {
- offerSession = tasksQueue.isEmpty();
-
// Inject the event into the executor taskQueue
tasksQueue.offer(event);
+
+ if (sessionTasksQueue.processingCompleted) {
+ sessionTasksQueue.processingCompleted = false;
+ offerSession = true;
+ } else {
+ offerSession = false;
+ }
+
+ /*
+ if (LOGGER.isDebugEnabled()) {
+ print(tasksQueue, event);
+ }
+ */
}
} else {
offerSession = false;
}
if (offerSession) {
+ // As the tasksQueue was empty, the task has been executed
+ // immediately, so we can move the session to the queue
+ // of sessions waiting for completion.
waitingSessions.offer(session);
}
addWorkerIfNecessary();
if (offerEvent) {
- if (eventQueueHandler != null) {
- eventQueueHandler.offered(this, event);
- }
+ eventQueueHandler.offered(this, event);
}
}
@@ -570,9 +634,10 @@
checkTaskType(task);
IoEvent event = (IoEvent) task;
IoSession session = event.getSession();
- Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE);
+ SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)session.getAttribute( TASKS_QUEUE );
+ Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
- if (tasksQueue == null) {
+ if (sessionTasksQueue == null) {
return false;
}
@@ -638,6 +703,11 @@
private volatile long completedTaskCount;
private Thread thread;
+ private int id;
+
+ public Worker( int id ) {
+ this.id = id;
+ }
public void run() {
thread = Thread.currentThread();
@@ -664,7 +734,7 @@
try {
if (session != null) {
- runTasks(getTasksQueue(session));
+ runTasks(getSessionTasksQueue(session));
}
} finally {
idleWorkers.incrementAndGet();
@@ -706,25 +776,21 @@
return session;
}
- private void runTasks(Queue<Runnable> tasksQueue) {
+ private void runTasks(SessionTasksQueue sessionTasksQueue) {
for (;;) {
Runnable task;
+ Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
synchronized (tasksQueue) {
- if ( tasksQueue.isEmpty()) {
- break;
- }
-
task = tasksQueue.poll();
-
+
if (task == null) {
+ sessionTasksQueue.processingCompleted = true;
break;
}
}
- if (eventQueueHandler != null) {
- eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
- }
+ eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
runTask(task);
}
@@ -746,4 +812,17 @@
}
}
}
+
+
+ /**
+ * A class used to store the ordered list of events to be processed by the
+ * session, and the current task state.
+ */
+ private class SessionTasksQueue {
+ /** A queue of ordered event waiting to be processed */
+ private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
+
+ /** The current task state */
+ private boolean processingCompleted = true;
+ }
}