You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/04/05 23:57:44 UTC

svn commit: r762167 - /mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Author: elecharny
Date: Sun Apr  5 21:57:44 2009
New Revision: 762167

URL: http://svn.apache.org/viewvc?rev=762167&view=rev
Log:
o Added Javadoc for all the constructors
o Replaced the non thread safe circular queue by a concurentLinkedQueue
o Added some constants definition for clarity sake
o Removed the inner SessionBuffer class
o Renamed the BUFFER constants to TASKS_QUEUE
o Added comments in the important parts of the code, for clarity
o Renamed queuehandler by eventQueueHandler
o Removed the maxPoolSize and corPoolSize, as they are already present in the parent class
o Added @inheritDoc tags
o Some other minor cleaning

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=762167&r1=762166&r2=762167&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Sun Apr  5 21:57:44 2009
@@ -25,6 +25,7 @@
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
@@ -38,7 +39,6 @@
 import org.apache.mina.core.session.DummySession;
 import org.apache.mina.core.session.IoEvent;
 import org.apache.mina.core.session.IoSession;
-import org.apache.mina.util.CircularQueue;
 
 /**
  * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
@@ -51,48 +51,117 @@
  * @org.apache.xbean.XBean
  */
 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
-
+    /** A default value for the initial pool size */
+    private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
+    
+    /** A default value for the maximum pool size */
+    private static final int DEFAULT_MAX_THREAD_POOL = 16;
+    
+    /** A default value for the KeepAlive delay */
+    private static final int DEFAULT_KEEP_ALIVE = 30;
+    
     private static final IoSession EXIT_SIGNAL = new DummySession();
 
-    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
+    /** A key stored into the session's attribute for the event tasks being queued */ 
+    private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
+    
     private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
 
     private final Set<Worker> workers = new HashSet<Worker>();
 
-    private volatile int corePoolSize;
-    private volatile int maximumPoolSize;
     private volatile int largestPoolSize;
     private final AtomicInteger idleWorkers = new AtomicInteger();
 
     private long completedTaskCount;
     private volatile boolean shutdown;
 
-    private final IoEventQueueHandler queueHandler;
+    private final IoEventQueueHandler eventQueueHandler;
 
+    /**
+     * Creates a default ThreadPool, with default values :
+     * - minimum pool size is 0
+     * - maximum pool size is 16
+     * - keepAlive set to 30 seconds
+     * - A default ThreadFactory
+     * - All events are accepted
+     */
     public OrderedThreadPoolExecutor() {
-        this(16);
+        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
+            DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
     }
 
+    /**
+     * Creates a default ThreadPool, with default values :
+     * - minimum pool size is 0
+     * - keepAlive set to 30 seconds
+     * - A default ThreadFactory
+     * - All events are accepted
+     * 
+     * @param maximumPoolSize The maximum pool size
+     */
     public OrderedThreadPoolExecutor(int maximumPoolSize) {
-        this(0, maximumPoolSize);
+        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, 
+            Executors.defaultThreadFactory(), null);
     }
 
+    /**
+     * Creates a default ThreadPool, with default values :
+     * - keepAlive set to 30 seconds
+     * - A default ThreadFactory
+     * - All events are accepted
+     *
+     * @param corePoolSize The initial pool sizePoolSize
+     * @param maximumPoolSize The maximum pool size
+     */
     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
-        this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
+        this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, 
+            Executors.defaultThreadFactory(), null);
     }
 
+    /**
+     * Creates a default ThreadPool, with default values :
+     * - A default ThreadFactory
+     * - All events are accepted
+     * 
+     * @param corePoolSize The initial pool sizePoolSize
+     * @param maximumPoolSize The maximum pool size
+     * @param keepAliveTime Default duration for a thread
+     * @param unit Time unit used for the keepAlive value
+     */
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
+            Executors.defaultThreadFactory(), null);
     }
 
+    /**
+     * Creates a default ThreadPool, with default values :
+     * - A default ThreadFactory
+     * 
+     * @param corePoolSize The initial pool sizePoolSize
+     * @param maximumPoolSize The maximum pool size
+     * @param keepAliveTime Default duration for a thread
+     * @param unit Time unit used for the keepAlive value
+     * @param queueHandler The queue used to store events
+     */
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             IoEventQueueHandler queueHandler) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
+            Executors.defaultThreadFactory(), queueHandler);
     }
 
+    /**
+     * Creates a default ThreadPool, with default values :
+     * - A default ThreadFactory
+     * 
+     * @param corePoolSize The initial pool sizePoolSize
+     * @param maximumPoolSize The maximum pool size
+     * @param keepAliveTime Default duration for a thread
+     * @param unit Time unit used for the keepAlive value
+     * @param threadFactory The factory used to create threads
+     */
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
@@ -100,46 +169,76 @@
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
     }
 
+    /**
+     * Creates a new instance of a OrderedThreadPoolExecutor.
+     * 
+     * @param corePoolSize The initial pool sizePoolSize
+     * @param maximumPoolSize The maximum pool size
+     * @param keepAliveTime Default duration for a thread
+     * @param unit Time unit used for the keepAlive value
+     * @param threadFactory The factory used to create threads
+     * @param queueHandler The queue used to store events
+     */
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
-        super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
-        if (corePoolSize < 0) {
+        // We have to initialize the pool with default values (0 and 1) in order to
+        // handle the exception in a better way. We can't add a try {} catch() {}
+        // around the super() call.
+        super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, 
+            new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
+
+        if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
         }
 
-        if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
+        if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
         }
 
-        if (queueHandler == null) {
-            queueHandler = IoEventQueueHandler.NOOP;
-        }
-
-        this.corePoolSize = corePoolSize;
-        this.maximumPoolSize = maximumPoolSize;
-        this.queueHandler = queueHandler;
-    }
-
+        // Now, we can setup the pool sizes
+        super.setCorePoolSize( corePoolSize );
+        super.setMaximumPoolSize( maximumPoolSize );
+        
+        // The queueHandler might be null.
+        this.eventQueueHandler = queueHandler;
+    }
+    
+
+    /**
+     * @return The associated queue handler. 
+     */
     public IoEventQueueHandler getQueueHandler() {
-        return queueHandler;
+        return eventQueueHandler;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
         // Ignore the request.  It must always be AbortPolicy.
     }
 
+    /**
+     * Add a new thread to execute a task, if needed and possible.
+     * It depends on the current pool size. If it's full, we do nothing.
+     */
     private void addWorker() {
         synchronized (workers) {
-            if (workers.size() >= maximumPoolSize) {
+            if (workers.size() >= super.getMaximumPoolSize()) {
                 return;
             }
 
+            // Create a new worker, and add it to the thread pool
             Worker worker = new Worker();
             Thread thread = getThreadFactory().newThread(worker);
+            
+            // As we have added a new thread, it's considered as idle.
             idleWorkers.incrementAndGet();
+            
+            // Now, we can start it.
             thread.start();
             workers.add(worker);
 
@@ -149,10 +248,13 @@
         }
     }
 
+    /**
+     * Add a new Worker only if there are no idle worker.
+     */
     private void addWorkerIfNecessary() {
         if (idleWorkers.get() == 0) {
             synchronized (workers) {
-                if (workers.isEmpty() || idleWorkers.get() == 0) {
+                if (workers.isEmpty() || (idleWorkers.get() == 0)) {
                     addWorker();
                 }
             }
@@ -161,27 +263,33 @@
 
     private void removeWorker() {
         synchronized (workers) {
-            if (workers.size() <= corePoolSize) {
+            if (workers.size() <= super.getCorePoolSize()) {
                 return;
             }
             waitingSessions.offer(EXIT_SIGNAL);
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int getMaximumPoolSize() {
-        return maximumPoolSize;
+        return super.getMaximumPoolSize();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void setMaximumPoolSize(int maximumPoolSize) {
-        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
+        if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
             throw new IllegalArgumentException("maximumPoolSize: "
                     + maximumPoolSize);
         }
 
         synchronized (workers) {
-            this.maximumPoolSize = maximumPoolSize;
+            super.setMaximumPoolSize( maximumPoolSize );
             int difference = workers.size() - maximumPoolSize;
             while (difference > 0) {
                 removeWorker();
@@ -190,6 +298,9 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean awaitTermination(long timeout, TimeUnit unit)
             throws InterruptedException {
@@ -209,11 +320,17 @@
         return isTerminated();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean isShutdown() {
         return shutdown;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean isTerminated() {
         if (!shutdown) {
@@ -225,6 +342,9 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void shutdown() {
         if (shutdown) {
@@ -240,12 +360,16 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public List<Runnable> shutdownNow() {
         shutdown();
 
         List<Runnable> answer = new ArrayList<Runnable>();
         IoSession session;
+        
         while ((session = waitingSessions.poll()) != null) {
             if (session == EXIT_SIGNAL) {
                 waitingSessions.offer(EXIT_SIGNAL);
@@ -253,55 +377,71 @@
                 continue;
             }
 
-            SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
-            synchronized (buf.queue) {
-                for (Runnable task: buf.queue) {
+            Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
+            
+            synchronized (tasksQueue) {
+                
+                for (Runnable task: tasksQueue) {
                     getQueueHandler().polled(this, (IoEvent) task);
                     answer.add(task);
                 }
-                buf.queue.clear();
+                
+                tasksQueue.clear();
             }
         }
 
         return answer;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void execute(Runnable task) {
         if (shutdown) {
             rejectTask(task);
         }
 
+        // Check that it's a IoEvent task
         checkTaskType(task);
 
-        IoEvent e = (IoEvent) task;
-        IoSession s = e.getSession();
-        SessionBuffer buf = getSessionBuffer(s);
-        Queue<Runnable> queue = buf.queue;
+        IoEvent event = (IoEvent) task;
+        IoSession session = event.getSession();
+        
+        // Get the session's queue of events
+        Queue<Runnable> tasksQueue = getTasksQueue(session);
         boolean offerSession;
-        boolean offerEvent = queueHandler.accept(this, e);
+        boolean offerEvent = true;
+        
+        // propose the new event to the event queue handler. If we
+        // use a throttle queue handler, the message may be rejected
+        // if the maximum size has been reached.
+        if (eventQueueHandler != null) {
+            offerEvent = eventQueueHandler.accept(this, event);
+        }
+        
         if (offerEvent) {
-            synchronized (queue) {
-                queue.offer(e);
-                if (buf.processingCompleted) {
-                    buf.processingCompleted = false;
-                    offerSession = true;
-                } else {
-                    offerSession = false;
-                }
+            // Ok, the message has been accepted
+            synchronized (tasksQueue) {
+                offerSession = tasksQueue.isEmpty();
+
+                // Inject the event into the executor taskQueue
+                tasksQueue.offer(event);
             }
         } else {
             offerSession = false;
         }
 
         if (offerSession) {
-            waitingSessions.offer(s);
+            waitingSessions.offer(session);
         }
 
         addWorkerIfNecessary();
 
         if (offerEvent) {
-            queueHandler.offered(this, e);
+            if (eventQueueHandler != null) {
+                eventQueueHandler.offered(this, event);
+            }
         }
     }
 
@@ -315,6 +455,9 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int getActiveCount() {
         synchronized (workers) {
@@ -322,6 +465,9 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public long getCompletedTaskCount() {
         synchronized (workers) {
@@ -334,11 +480,17 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int getLargestPoolSize() {
         return largestPoolSize;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int getPoolSize() {
         synchronized (workers) {
@@ -346,11 +498,17 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public long getTaskCount() {
         return getCompletedTaskCount();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean isTerminating() {
         synchronized (workers) {
@@ -358,11 +516,14 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int prestartAllCoreThreads() {
         int answer = 0;
         synchronized (workers) {
-            for (int i = corePoolSize - workers.size() ; i > 0; i --) {
+            for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {
                 addWorker();
                 answer ++;
             }
@@ -370,10 +531,13 @@
         return answer;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean prestartCoreThread() {
         synchronized (workers) {
-            if (workers.size() < corePoolSize) {
+            if (workers.size() < super.getCorePoolSize()) {
                 addWorker();
                 return true;
             } else {
@@ -382,77 +546,92 @@
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public BlockingQueue<Runnable> getQueue() {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void purge() {
         // Nothing to purge in this implementation.
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean remove(Runnable task) {
         checkTaskType(task);
-        IoEvent e = (IoEvent) task;
-        IoSession s = e.getSession();
-        SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER);
-        if (buffer == null) {
+        IoEvent event = (IoEvent) task;
+        IoSession session = event.getSession();
+        Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE);
+        
+        if (tasksQueue == null) {
             return false;
         }
 
         boolean removed;
-        synchronized (buffer.queue) {
-            removed = buffer.queue.remove(task);
+        
+        synchronized (tasksQueue) {
+            removed = tasksQueue.remove(task);
         }
 
         if (removed) {
-            getQueueHandler().polled(this, e);
+            getQueueHandler().polled(this, event);
         }
 
         return removed;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int getCorePoolSize() {
-        return corePoolSize;
+        return super.getCorePoolSize();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void setCorePoolSize(int corePoolSize) {
         if (corePoolSize < 0) {
             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
         }
-        if (corePoolSize > maximumPoolSize) {
+        if (corePoolSize > super.getMaximumPoolSize()) {
             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
         }
 
         synchronized (workers) {
-            if (this.corePoolSize > corePoolSize) {
-                for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
+            if (super.getCorePoolSize()> corePoolSize) {
+                for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {
                     removeWorker();
                 }
             }
-            this.corePoolSize = corePoolSize;
+            super.setCorePoolSize(corePoolSize);
         }
     }
 
-    private SessionBuffer getSessionBuffer(IoSession session) {
-        SessionBuffer buffer = (SessionBuffer) session.getAttribute(BUFFER);
-        if (buffer == null) {
-            buffer = new SessionBuffer();
-            SessionBuffer oldBuffer = (SessionBuffer) session.setAttributeIfAbsent(BUFFER, buffer);
-            if (oldBuffer != null) {
-                buffer = oldBuffer;
+    private Queue<Runnable> getTasksQueue(IoSession session) {
+        Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
+        
+        if (tasksQueue == null) {
+            tasksQueue = new ConcurrentLinkedQueue<Runnable>();
+            Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue);
+        
+            if (oldTasksQueue != null) {
+                tasksQueue = oldTasksQueue;
             }
         }
-        return buffer;
-    }
-
-    private static class SessionBuffer {
-        private final Queue<Runnable> queue = new CircularQueue<Runnable>();
-        private boolean processingCompleted = true;
+        
+        return tasksQueue;
     }
 
     private class Worker implements Runnable {
@@ -471,7 +650,7 @@
 
                     if (session == null) {
                         synchronized (workers) {
-                            if (workers.size() > corePoolSize) {
+                            if (workers.size() > getCorePoolSize()) {
                                 // Remove now to prevent duplicate exit.
                                 workers.remove(this);
                                 break;
@@ -485,7 +664,7 @@
 
                     try {
                         if (session != null) {
-                            runTasks(getSessionBuffer(session));
+                            runTasks(getTasksQueue(session));
                         }
                     } finally {
                         idleWorkers.incrementAndGet();
@@ -527,19 +706,25 @@
             return session;
         }
 
-        private void runTasks(SessionBuffer buf) {
+        private void runTasks(Queue<Runnable> tasksQueue) {
             for (;;) {
                 Runnable task;
-                synchronized (buf.queue) {
-                    task = buf.queue.poll();
+                
+                synchronized (tasksQueue) {
+                    if ( tasksQueue.isEmpty()) {
+                        break;
+                    }
+
+                    task = tasksQueue.poll();
 
                     if (task == null) {
-                        buf.processingCompleted = true;
                         break;
                     }
                 }
 
-                queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
+                if (eventQueueHandler != null) {
+                    eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
+                }
 
                 runTask(task);
             }