You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/04/09 13:39:43 UTC

svn commit: r646280 - in /mina/trunk/core/src/main/java/org/apache/mina/filter/executor: OrderedThreadPoolExecutor.java UnorderedThreadPoolExecutor.java

Author: trustin
Date: Wed Apr  9 04:39:42 2008
New Revision: 646280

URL: http://svn.apache.org/viewvc?rev=646280&view=rev
Log:
Additional comments or exception throwing code in empty blocks

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=646280&r1=646279&r2=646280&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Wed Apr  9 04:39:42 2008
@@ -56,78 +56,82 @@
         public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
             return true;
         }
-        public void offered(ThreadPoolExecutor executor, IoEvent event) {}
-        public void polled(ThreadPoolExecutor executor, IoEvent event) {}
+        public void offered(ThreadPoolExecutor executor, IoEvent event) {
+            // NOOP
+        }
+        public void polled(ThreadPoolExecutor executor, IoEvent event) {
+            // NOOP
+        }
     };
 
     private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
     private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
-    
+
     private final Set<Worker> workers = new HashSet<Worker>();
-    
+
     private volatile int corePoolSize;
     private volatile int maximumPoolSize;
     private volatile int largestPoolSize;
     private final AtomicInteger idleWorkers = new AtomicInteger();
-    
+
     private long completedTaskCount;
     private volatile boolean shutdown;
-    
+
     private final IoEventQueueHandler queueHandler;
-    
+
     public OrderedThreadPoolExecutor() {
         this(16);
     }
-    
+
     public OrderedThreadPoolExecutor(int maximumPoolSize) {
         this(0, maximumPoolSize);
     }
-    
+
     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
         this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
     }
-    
+
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
     }
-    
+
     public OrderedThreadPoolExecutor(
-            int corePoolSize, int maximumPoolSize, 
+            int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             IoEventQueueHandler queueHandler) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
     }
 
     public OrderedThreadPoolExecutor(
-            int corePoolSize, int maximumPoolSize, 
+            int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
     }
 
     public OrderedThreadPoolExecutor(
-            int corePoolSize, int maximumPoolSize, 
+            int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
         super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
         if (corePoolSize < 0) {
             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
         }
-        
+
         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
         }
-        
+
         if (queueHandler == null) {
             queueHandler = NOOP_QUEUE_HANDLER;
         }
-        
+
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
         this.queueHandler = queueHandler;
     }
-    
+
     public IoEventQueueHandler getQueueHandler() {
         return queueHandler;
     }
@@ -148,13 +152,13 @@
             idleWorkers.incrementAndGet();
             thread.start();
             workers.add(worker);
-            
+
             if (workers.size() > largestPoolSize) {
                 largestPoolSize = workers.size();
             }
         }
     }
-    
+
     private void addWorkerIfNecessary() {
         if (idleWorkers.get() == 0) {
             synchronized (workers) {
@@ -164,7 +168,7 @@
             }
         }
     }
-    
+
     private void removeWorker() {
         synchronized (workers) {
             if (workers.size() <= corePoolSize) {
@@ -173,12 +177,12 @@
             waitingSessions.offer(EXIT_SIGNAL);
         }
     }
-    
+
     @Override
     public int getMaximumPoolSize() {
         return maximumPoolSize;
     }
-    
+
     @Override
     public void setMaximumPoolSize(int maximumPoolSize) {
         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
@@ -195,20 +199,20 @@
             }
         }
     }
-    
+
     @Override
     public boolean awaitTermination(long timeout, TimeUnit unit)
             throws InterruptedException {
-        
+
         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
-        
+
         synchronized (workers) {
             while (!isTerminated()) {
                 long waitTime = deadline - System.currentTimeMillis();
                 if (waitTime <= 0) {
                     break;
                 }
-                
+
                 workers.wait(waitTime);
             }
         }
@@ -225,7 +229,7 @@
         if (!shutdown) {
             return false;
         }
-        
+
         synchronized (workers) {
             return workers.isEmpty();
         }
@@ -236,7 +240,7 @@
         if (shutdown) {
             return;
         }
-        
+
         shutdown = true;
 
         synchronized (workers) {
@@ -249,7 +253,7 @@
     @Override
     public List<Runnable> shutdownNow() {
         shutdown();
-        
+
         List<Runnable> answer = new ArrayList<Runnable>();
         IoSession session;
         while ((session = waitingSessions.poll()) != null) {
@@ -258,7 +262,7 @@
                 Thread.yield(); // Let others take the signal.
                 continue;
             }
-            
+
             SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
             synchronized (buf.queue) {
                 for (Runnable task: buf.queue) {
@@ -268,7 +272,7 @@
                 buf.queue.clear();
             }
         }
-        
+
         return answer;
     }
 
@@ -279,7 +283,7 @@
         }
 
         checkTaskType(task);
-        
+
         IoEvent e = (IoEvent) task;
         IoSession s = e.getSession();
         SessionBuffer buf = getSessionBuffer(s);
@@ -295,26 +299,26 @@
                 } else {
                     offerSession = false;
                 }
-            }            
+            }
         } else {
             offerSession = false;
         }
-        
+
         if (offerSession) {
             waitingSessions.offer(s);
         }
-        
+
         addWorkerIfNecessary();
-        
+
         if (offerEvent) {
             queueHandler.offered(this, e);
         }
     }
-    
+
     private void rejectTask(Runnable task) {
         getRejectedExecutionHandler().rejectedExecution(task, this);
     }
-    
+
     private void checkTaskType(Runnable task) {
         if (!(task instanceof IoEvent)) {
             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
@@ -335,7 +339,7 @@
             for (Worker w: workers) {
                 answer += w.completedTaskCount;
             }
-            
+
             return answer;
         }
     }
@@ -387,14 +391,15 @@
             }
         }
     }
-    
+
     @Override
     public BlockingQueue<Runnable> getQueue() {
         throw new UnsupportedOperationException();
     }
-    
+
     @Override
     public void purge() {
+        // Nothing to purge in this implementation.
     }
 
     @Override
@@ -406,19 +411,19 @@
         if (buffer == null) {
             return false;
         }
-        
+
         boolean removed;
         synchronized (buffer.queue) {
             removed = buffer.queue.remove(task);
         }
-        
+
         if (removed) {
             getQueueHandler().polled(this, e);
         }
-        
+
         return removed;
     }
-    
+
     @Override
     public int getCorePoolSize() {
         return corePoolSize;
@@ -432,7 +437,7 @@
         if (corePoolSize > maximumPoolSize) {
             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
         }
-        
+
         synchronized (workers) {
             if (this.corePoolSize > corePoolSize) {
                 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
@@ -454,26 +459,26 @@
         }
         return buffer;
     }
-    
+
     private static class SessionBuffer {
         private final Queue<Runnable> queue = new CircularQueue<Runnable>();
         private boolean processingCompleted = true;
     }
-    
+
     private class Worker implements Runnable {
-        
+
         private volatile long completedTaskCount;
         private Thread thread;
-        
+
         public void run() {
             thread = Thread.currentThread();
 
             try {
                 for (;;) {
                     IoSession session = fetchSession();
-                    
+
                     idleWorkers.decrementAndGet();
-                    
+
                     if (session == null) {
                         synchronized (workers) {
                             if (workers.size() > corePoolSize) {
@@ -483,11 +488,11 @@
                             }
                         }
                     }
-                    
+
                     if (session == EXIT_SIGNAL) {
                         break;
                     }
-                    
+
                     try {
                         if (session != null) {
                             runTasks(getSessionBuffer(session));
@@ -537,7 +542,7 @@
                 Runnable task;
                 synchronized (buf.queue) {
                     task = buf.queue.poll();
-    
+
                     if (task == null) {
                         buf.processingCompleted = true;
                         break;
@@ -559,8 +564,9 @@
                 afterExecute(task, null);
                 completedTaskCount ++;
             } catch (RuntimeException e) {
-                if (!ran)
+                if (!ran) {
                     afterExecute(task, e);
+                }
                 throw e;
             }
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java?rev=646280&r1=646279&r2=646280&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java Wed Apr  9 04:39:42 2008
@@ -47,79 +47,88 @@
  * </ul>
  * If you need to maintain the order of events per session, please use
  * {@link OrderedThreadPoolExecutor}.
- * 
+ *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
 public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
 
     private static final Runnable EXIT_SIGNAL = new Runnable() {
-        public void run() {}
+        public void run() {
+            throw new Error(
+                    "This method shouldn't be called. " +
+                    "Please file a bug report.");
+        }
     };
+
     private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
         public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
             return true;
         }
-        public void offered(ThreadPoolExecutor executor, IoEvent event) {}
-        public void polled(ThreadPoolExecutor executor, IoEvent event) {}
+        public void offered(ThreadPoolExecutor executor, IoEvent event) {
+            // NOOP
+        }
+        public void polled(ThreadPoolExecutor executor, IoEvent event) {
+            // NOOP
+        }
     };
 
     private final Set<Worker> workers = new HashSet<Worker>();
-    
+
     private volatile int corePoolSize;
     private volatile int maximumPoolSize;
     private volatile int largestPoolSize;
     private final AtomicInteger idleWorkers = new AtomicInteger();
-    
+
     private long completedTaskCount;
     private volatile boolean shutdown;
-    
+
     private final IoEventQueueHandler queueHandler;
-    
+
     public UnorderedThreadPoolExecutor() {
         this(16);
     }
-    
+
     public UnorderedThreadPoolExecutor(int maximumPoolSize) {
         this(0, maximumPoolSize);
     }
-    
+
     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
         this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
     }
-    
+
     public UnorderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
     }
-    
+
     public UnorderedThreadPoolExecutor(
-            int corePoolSize, int maximumPoolSize, 
+            int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             IoEventQueueHandler queueHandler) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
     }
 
     public UnorderedThreadPoolExecutor(
-            int corePoolSize, int maximumPoolSize, 
+            int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
     }
 
     public UnorderedThreadPoolExecutor(
-            int corePoolSize, int maximumPoolSize, 
+            int corePoolSize, int maximumPoolSize,
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
         if (corePoolSize < 0) {
             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
         }
-        
+
         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
         }
-        
+
         if (queueHandler == null) {
             queueHandler = NOOP_QUEUE_HANDLER;
         }
@@ -128,7 +137,7 @@
         this.maximumPoolSize = maximumPoolSize;
         this.queueHandler = queueHandler;
     }
-    
+
     public IoEventQueueHandler getQueueHandler() {
         return queueHandler;
     }
@@ -149,13 +158,13 @@
             idleWorkers.incrementAndGet();
             thread.start();
             workers.add(worker);
-            
+
             if (workers.size() > largestPoolSize) {
                 largestPoolSize = workers.size();
             }
         }
     }
-    
+
     private void addWorkerIfNecessary() {
         if (idleWorkers.get() == 0) {
             synchronized (workers) {
@@ -165,7 +174,7 @@
             }
         }
     }
-    
+
     private void removeWorker() {
         synchronized (workers) {
             if (workers.size() <= corePoolSize) {
@@ -174,12 +183,12 @@
             getQueue().offer(EXIT_SIGNAL);
         }
     }
-    
+
     @Override
     public int getMaximumPoolSize() {
         return maximumPoolSize;
     }
-    
+
     @Override
     public void setMaximumPoolSize(int maximumPoolSize) {
         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
@@ -196,20 +205,20 @@
             }
         }
     }
-    
+
     @Override
     public boolean awaitTermination(long timeout, TimeUnit unit)
             throws InterruptedException {
-        
+
         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
-        
+
         synchronized (workers) {
             while (!isTerminated()) {
                 long waitTime = deadline - System.currentTimeMillis();
                 if (waitTime <= 0) {
                     break;
                 }
-                
+
                 workers.wait(waitTime);
             }
         }
@@ -226,7 +235,7 @@
         if (!shutdown) {
             return false;
         }
-        
+
         synchronized (workers) {
             return workers.isEmpty();
         }
@@ -237,7 +246,7 @@
         if (shutdown) {
             return;
         }
-        
+
         shutdown = true;
 
         synchronized (workers) {
@@ -250,7 +259,7 @@
     @Override
     public List<Runnable> shutdownNow() {
         shutdown();
-        
+
         List<Runnable> answer = new ArrayList<Runnable>();
         Runnable task;
         while ((task = getQueue().poll()) != null) {
@@ -259,11 +268,11 @@
                 Thread.yield(); // Let others take the signal.
                 continue;
             }
-            
+
             getQueueHandler().polled(this, (IoEvent) task);
             answer.add(task);
         }
-        
+
         return answer;
     }
 
@@ -274,24 +283,24 @@
         }
 
         checkTaskType(task);
-        
+
         IoEvent e = (IoEvent) task;
         boolean offeredEvent = queueHandler.accept(this, e);
         if (offeredEvent) {
             getQueue().offer(e);
         }
-        
+
         addWorkerIfNecessary();
-        
+
         if (offeredEvent) {
             queueHandler.offered(this, e);
         }
     }
-    
+
     private void rejectTask(Runnable task) {
         getRejectedExecutionHandler().rejectedExecution(task, this);
     }
-    
+
     private void checkTaskType(Runnable task) {
         if (!(task instanceof IoEvent)) {
             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
@@ -312,7 +321,7 @@
             for (Worker w: workers) {
                 answer += w.completedTaskCount;
             }
-            
+
             return answer;
         }
     }
@@ -364,9 +373,10 @@
             }
         }
     }
-    
+
     @Override
     public void purge() {
+        // Nothing to purge in this implementation.
     }
 
     @Override
@@ -391,7 +401,7 @@
         if (corePoolSize > maximumPoolSize) {
             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
         }
-                
+
         synchronized (workers) {
             if (this.corePoolSize > corePoolSize) {
                 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
@@ -401,21 +411,21 @@
             this.corePoolSize = corePoolSize;
         }
     }
-    
+
     private class Worker implements Runnable {
-        
+
         private volatile long completedTaskCount;
         private Thread thread;
-        
+
         public void run() {
             thread = Thread.currentThread();
-            
+
             try {
                 for (;;) {
                     Runnable task = fetchTask();
-                    
+
                     idleWorkers.decrementAndGet();
-                    
+
                     if (task == null) {
                         synchronized (workers) {
                             if (workers.size() > corePoolSize) {
@@ -425,11 +435,11 @@
                             }
                         }
                     }
-                    
+
                     if (task == EXIT_SIGNAL) {
                         break;
                     }
-                    
+
                     try {
                         if (task != null) {
                             queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
@@ -484,8 +494,9 @@
                 afterExecute(task, null);
                 completedTaskCount ++;
             } catch (RuntimeException e) {
-                if (!ran)
+                if (!ran) {
                     afterExecute(task, e);
+                }
                 throw e;
             }
         }