You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/05/14 01:13:48 UTC

svn commit: r774589 - /mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Author: elecharny
Date: Wed May 13 23:13:48 2009
New Revision: 774589

URL: http://svn.apache.org/viewvc?rev=774589&view=rev
Log:
o Fix for DIRMINA-687
o Removed the @version tag
o Added a LOGGER, but commented it as it breaks the MDCFilterTest for some unknown reason ...
o Minor refactoring
o Javadoc addition

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=774589&r1=774588&r2=774589&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Wed May 13 23:13:48 2009
@@ -39,6 +39,8 @@
 import org.apache.mina.core.session.DummySession;
 import org.apache.mina.core.session.IoEvent;
 import org.apache.mina.core.session.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
@@ -47,10 +49,12 @@
  * {@link UnorderedThreadPoolExecutor}.
 
  * @author The Apache MINA Project (dev@mina.apache.org)
- * @version $Rev$, $Date$
  * @org.apache.xbean.XBean
  */
 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
+    /** A logger for this class (commented as it breaks MDCFlter tests) */
+    //static Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
+
     /** A default value for the initial pool size */
     private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
     
@@ -65,6 +69,7 @@
     /** A key stored into the session's attribute for the event tasks being queued */ 
     private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
     
+    /** A queue used to store the available sessions */
     private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
 
     private final Set<Worker> workers = new HashSet<Worker>();
@@ -142,14 +147,14 @@
      * @param maximumPoolSize The maximum pool size
      * @param keepAliveTime Default duration for a thread
      * @param unit Time unit used for the keepAlive value
-     * @param queueHandler The queue used to store events
+     * @param eventQueueHandler The queue used to store events
      */
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
-            IoEventQueueHandler queueHandler) {
+            IoEventQueueHandler eventQueueHandler) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
-            Executors.defaultThreadFactory(), queueHandler);
+            Executors.defaultThreadFactory(), eventQueueHandler);
     }
 
     /**
@@ -177,12 +182,12 @@
      * @param keepAliveTime Default duration for a thread
      * @param unit Time unit used for the keepAlive value
      * @param threadFactory The factory used to create threads
-     * @param queueHandler The queue used to store events
+     * @param eventQueueHandler The queue used to store events
      */
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
+            ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
         // We have to initialize the pool with default values (0 and 1) in order to
         // handle the exception in a better way. We can't add a try {} catch() {}
         // around the super() call.
@@ -202,11 +207,35 @@
         super.setMaximumPoolSize( maximumPoolSize );
         
         // The queueHandler might be null.
-        this.eventQueueHandler = queueHandler;
+        if (eventQueueHandler == null) {
+            this.eventQueueHandler = IoEventQueueHandler.NOOP;
+        } else {
+            this.eventQueueHandler = eventQueueHandler;
+        }
     }
     
 
     /**
+     * Get the session's tasks queue.
+     */
+    private SessionTasksQueue getSessionTasksQueue(IoSession session) {
+        SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
+
+        if (queue == null) {
+            queue = new SessionTasksQueue();
+            SessionTasksQueue oldQueue = 
+                (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
+            
+            if (oldQueue != null) {
+                queue = oldQueue;
+            }
+        }
+        
+        return queue;
+    }
+
+    
+    /**
      * @return The associated queue handler. 
      */
     public IoEventQueueHandler getQueueHandler() {
@@ -232,7 +261,7 @@
             }
 
             // Create a new worker, and add it to the thread pool
-            Worker worker = new Worker();
+            Worker worker = new Worker( workers.size());
             Thread thread = getThreadFactory().newThread(worker);
             
             // As we have added a new thread, it's considered as idle.
@@ -377,21 +406,43 @@
                 continue;
             }
 
-            Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
+            SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
             
-            synchronized (tasksQueue) {
+            synchronized (sessionTasksQueue.tasksQueue) {
                 
-                for (Runnable task: tasksQueue) {
+                for (Runnable task: sessionTasksQueue.tasksQueue) {
                     getQueueHandler().polled(this, (IoEvent) task);
                     answer.add(task);
                 }
                 
-                tasksQueue.clear();
+                sessionTasksQueue.tasksQueue.clear();
             }
         }
 
         return answer;
     }
+    
+    
+    /**
+     * A Helper class used to print the list of events being queued. 
+     */
+    private void print( Queue<Runnable> queue, IoEvent event) {
+        StringBuilder sb = new StringBuilder();
+        sb.append( "Adding event " ).append( event.getType() ).append( " to session " ).append(event.getSession().getId() );
+        boolean first = true;
+        sb.append( "\nQueue : [" );
+        for (Runnable elem:queue) {
+            if ( first ) {
+                first = false;
+            } else {
+                sb.append( ", " );
+            }
+                
+            sb.append(((IoEvent)elem).getType()).append(", ");
+        }
+        sb.append( "]\n" );
+        //LOGGER.debug( sb.toString() );
+    }
 
     /**
      * {@inheritDoc}
@@ -406,42 +457,55 @@
         checkTaskType(task);
 
         IoEvent event = (IoEvent) task;
+        
+        // Get the associated session
         IoSession session = event.getSession();
         
         // Get the session's queue of events
-        Queue<Runnable> tasksQueue = getTasksQueue(session);
-        boolean offerSession;
-        boolean offerEvent = true;
+        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
+        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
         
+        boolean offerSession;
+
         // propose the new event to the event queue handler. If we
         // use a throttle queue handler, the message may be rejected
         // if the maximum size has been reached.
-        if (eventQueueHandler != null) {
-            offerEvent = eventQueueHandler.accept(this, event);
-        }
+        boolean offerEvent = eventQueueHandler.accept(this, event);
         
         if (offerEvent) {
             // Ok, the message has been accepted
             synchronized (tasksQueue) {
-                offerSession = tasksQueue.isEmpty();
-
                 // Inject the event into the executor taskQueue
                 tasksQueue.offer(event);
+                
+                if (sessionTasksQueue.processingCompleted) {
+                    sessionTasksQueue.processingCompleted = false;
+                    offerSession = true;
+                } else {
+                    offerSession = false;
+                }
+
+                /*
+                if (LOGGER.isDebugEnabled()) {
+                    print(tasksQueue, event);
+                }
+                */
             }
         } else {
             offerSession = false;
         }
 
         if (offerSession) {
+            // As the tasksQueue was empty, the task has been executed
+            // immediately, so we can move the session to the queue
+            // of sessions waiting for completion.
             waitingSessions.offer(session);
         }
 
         addWorkerIfNecessary();
 
         if (offerEvent) {
-            if (eventQueueHandler != null) {
-                eventQueueHandler.offered(this, event);
-            }
+            eventQueueHandler.offered(this, event);
         }
     }
 
@@ -570,9 +634,10 @@
         checkTaskType(task);
         IoEvent event = (IoEvent) task;
         IoSession session = event.getSession();
-        Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE);
+        SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)session.getAttribute( TASKS_QUEUE );
+        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
         
-        if (tasksQueue == null) {
+        if (sessionTasksQueue == null) {
             return false;
         }
 
@@ -638,6 +703,11 @@
 
         private volatile long completedTaskCount;
         private Thread thread;
+        private int id;
+        
+        public Worker( int id ) {
+            this.id = id;
+        }
 
         public void run() {
             thread = Thread.currentThread();
@@ -664,7 +734,7 @@
 
                     try {
                         if (session != null) {
-                            runTasks(getTasksQueue(session));
+                            runTasks(getSessionTasksQueue(session));
                         }
                     } finally {
                         idleWorkers.incrementAndGet();
@@ -706,25 +776,21 @@
             return session;
         }
 
-        private void runTasks(Queue<Runnable> tasksQueue) {
+        private void runTasks(SessionTasksQueue sessionTasksQueue) {
             for (;;) {
                 Runnable task;
+                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
                 
                 synchronized (tasksQueue) {
-                    if ( tasksQueue.isEmpty()) {
-                        break;
-                    }
-
                     task = tasksQueue.poll();
-
+                    
                     if (task == null) {
+                        sessionTasksQueue.processingCompleted = true;
                         break;
                     }
                 }
 
-                if (eventQueueHandler != null) {
-                    eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
-                }
+                eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
 
                 runTask(task);
             }
@@ -746,4 +812,17 @@
             }
         }
     }
+    
+    
+    /**
+     * A class used to store the ordered list of events to be processed by the
+     * session, and the current task state.
+     */
+    private class SessionTasksQueue {
+        /**  A queue of ordered event waiting to be processed */ 
+        private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
+        
+        /** The current task state */
+        private boolean processingCompleted = true;
+    }
 }