You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/04/09 13:39:43 UTC
svn commit: r646280 - in
/mina/trunk/core/src/main/java/org/apache/mina/filter/executor:
OrderedThreadPoolExecutor.java UnorderedThreadPoolExecutor.java
Author: trustin
Date: Wed Apr 9 04:39:42 2008
New Revision: 646280
URL: http://svn.apache.org/viewvc?rev=646280&view=rev
Log:
Additional comments or exception throwing code in empty blocks
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=646280&r1=646279&r2=646280&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Wed Apr 9 04:39:42 2008
@@ -56,78 +56,82 @@
public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
return true;
}
- public void offered(ThreadPoolExecutor executor, IoEvent event) {}
- public void polled(ThreadPoolExecutor executor, IoEvent event) {}
+ public void offered(ThreadPoolExecutor executor, IoEvent event) {
+ // NOOP
+ }
+ public void polled(ThreadPoolExecutor executor, IoEvent event) {
+ // NOOP
+ }
};
private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
-
+
private final Set<Worker> workers = new HashSet<Worker>();
-
+
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile int largestPoolSize;
private final AtomicInteger idleWorkers = new AtomicInteger();
-
+
private long completedTaskCount;
private volatile boolean shutdown;
-
+
private final IoEventQueueHandler queueHandler;
-
+
public OrderedThreadPoolExecutor() {
this(16);
}
-
+
public OrderedThreadPoolExecutor(int maximumPoolSize) {
this(0, maximumPoolSize);
}
-
+
public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
}
-
+
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
}
-
+
public OrderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
+ int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
IoEventQueueHandler queueHandler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
}
public OrderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
+ int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
}
public OrderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
+ int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
if (corePoolSize < 0) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
-
+
if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
}
-
+
if (queueHandler == null) {
queueHandler = NOOP_QUEUE_HANDLER;
}
-
+
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.queueHandler = queueHandler;
}
-
+
public IoEventQueueHandler getQueueHandler() {
return queueHandler;
}
@@ -148,13 +152,13 @@
idleWorkers.incrementAndGet();
thread.start();
workers.add(worker);
-
+
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
}
}
}
-
+
private void addWorkerIfNecessary() {
if (idleWorkers.get() == 0) {
synchronized (workers) {
@@ -164,7 +168,7 @@
}
}
}
-
+
private void removeWorker() {
synchronized (workers) {
if (workers.size() <= corePoolSize) {
@@ -173,12 +177,12 @@
waitingSessions.offer(EXIT_SIGNAL);
}
}
-
+
@Override
public int getMaximumPoolSize() {
return maximumPoolSize;
}
-
+
@Override
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
@@ -195,20 +199,20 @@
}
}
}
-
+
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
-
+
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
-
+
synchronized (workers) {
while (!isTerminated()) {
long waitTime = deadline - System.currentTimeMillis();
if (waitTime <= 0) {
break;
}
-
+
workers.wait(waitTime);
}
}
@@ -225,7 +229,7 @@
if (!shutdown) {
return false;
}
-
+
synchronized (workers) {
return workers.isEmpty();
}
@@ -236,7 +240,7 @@
if (shutdown) {
return;
}
-
+
shutdown = true;
synchronized (workers) {
@@ -249,7 +253,7 @@
@Override
public List<Runnable> shutdownNow() {
shutdown();
-
+
List<Runnable> answer = new ArrayList<Runnable>();
IoSession session;
while ((session = waitingSessions.poll()) != null) {
@@ -258,7 +262,7 @@
Thread.yield(); // Let others take the signal.
continue;
}
-
+
SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
synchronized (buf.queue) {
for (Runnable task: buf.queue) {
@@ -268,7 +272,7 @@
buf.queue.clear();
}
}
-
+
return answer;
}
@@ -279,7 +283,7 @@
}
checkTaskType(task);
-
+
IoEvent e = (IoEvent) task;
IoSession s = e.getSession();
SessionBuffer buf = getSessionBuffer(s);
@@ -295,26 +299,26 @@
} else {
offerSession = false;
}
- }
+ }
} else {
offerSession = false;
}
-
+
if (offerSession) {
waitingSessions.offer(s);
}
-
+
addWorkerIfNecessary();
-
+
if (offerEvent) {
queueHandler.offered(this, e);
}
}
-
+
private void rejectTask(Runnable task) {
getRejectedExecutionHandler().rejectedExecution(task, this);
}
-
+
private void checkTaskType(Runnable task) {
if (!(task instanceof IoEvent)) {
throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
@@ -335,7 +339,7 @@
for (Worker w: workers) {
answer += w.completedTaskCount;
}
-
+
return answer;
}
}
@@ -387,14 +391,15 @@
}
}
}
-
+
@Override
public BlockingQueue<Runnable> getQueue() {
throw new UnsupportedOperationException();
}
-
+
@Override
public void purge() {
+ // Nothing to purge in this implementation.
}
@Override
@@ -406,19 +411,19 @@
if (buffer == null) {
return false;
}
-
+
boolean removed;
synchronized (buffer.queue) {
removed = buffer.queue.remove(task);
}
-
+
if (removed) {
getQueueHandler().polled(this, e);
}
-
+
return removed;
}
-
+
@Override
public int getCorePoolSize() {
return corePoolSize;
@@ -432,7 +437,7 @@
if (corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
}
-
+
synchronized (workers) {
if (this.corePoolSize > corePoolSize) {
for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
@@ -454,26 +459,26 @@
}
return buffer;
}
-
+
private static class SessionBuffer {
private final Queue<Runnable> queue = new CircularQueue<Runnable>();
private boolean processingCompleted = true;
}
-
+
private class Worker implements Runnable {
-
+
private volatile long completedTaskCount;
private Thread thread;
-
+
public void run() {
thread = Thread.currentThread();
try {
for (;;) {
IoSession session = fetchSession();
-
+
idleWorkers.decrementAndGet();
-
+
if (session == null) {
synchronized (workers) {
if (workers.size() > corePoolSize) {
@@ -483,11 +488,11 @@
}
}
}
-
+
if (session == EXIT_SIGNAL) {
break;
}
-
+
try {
if (session != null) {
runTasks(getSessionBuffer(session));
@@ -537,7 +542,7 @@
Runnable task;
synchronized (buf.queue) {
task = buf.queue.poll();
-
+
if (task == null) {
buf.processingCompleted = true;
break;
@@ -559,8 +564,9 @@
afterExecute(task, null);
completedTaskCount ++;
} catch (RuntimeException e) {
- if (!ran)
+ if (!ran) {
afterExecute(task, e);
+ }
throw e;
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java?rev=646280&r1=646279&r2=646280&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java Wed Apr 9 04:39:42 2008
@@ -47,79 +47,88 @@
* </ul>
* If you need to maintain the order of events per session, please use
* {@link OrderedThreadPoolExecutor}.
- *
+ *
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
private static final Runnable EXIT_SIGNAL = new Runnable() {
- public void run() {}
+ public void run() {
+ throw new Error(
+ "This method shouldn't be called. " +
+ "Please file a bug report.");
+ }
};
+
private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
return true;
}
- public void offered(ThreadPoolExecutor executor, IoEvent event) {}
- public void polled(ThreadPoolExecutor executor, IoEvent event) {}
+ public void offered(ThreadPoolExecutor executor, IoEvent event) {
+ // NOOP
+ }
+ public void polled(ThreadPoolExecutor executor, IoEvent event) {
+ // NOOP
+ }
};
private final Set<Worker> workers = new HashSet<Worker>();
-
+
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile int largestPoolSize;
private final AtomicInteger idleWorkers = new AtomicInteger();
-
+
private long completedTaskCount;
private volatile boolean shutdown;
-
+
private final IoEventQueueHandler queueHandler;
-
+
public UnorderedThreadPoolExecutor() {
this(16);
}
-
+
public UnorderedThreadPoolExecutor(int maximumPoolSize) {
this(0, maximumPoolSize);
}
-
+
public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
}
-
+
public UnorderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
}
-
+
public UnorderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
+ int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
IoEventQueueHandler queueHandler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
}
public UnorderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
+ int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
}
public UnorderedThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
+ int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
if (corePoolSize < 0) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
-
+
if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
}
-
+
if (queueHandler == null) {
queueHandler = NOOP_QUEUE_HANDLER;
}
@@ -128,7 +137,7 @@
this.maximumPoolSize = maximumPoolSize;
this.queueHandler = queueHandler;
}
-
+
public IoEventQueueHandler getQueueHandler() {
return queueHandler;
}
@@ -149,13 +158,13 @@
idleWorkers.incrementAndGet();
thread.start();
workers.add(worker);
-
+
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
}
}
}
-
+
private void addWorkerIfNecessary() {
if (idleWorkers.get() == 0) {
synchronized (workers) {
@@ -165,7 +174,7 @@
}
}
}
-
+
private void removeWorker() {
synchronized (workers) {
if (workers.size() <= corePoolSize) {
@@ -174,12 +183,12 @@
getQueue().offer(EXIT_SIGNAL);
}
}
-
+
@Override
public int getMaximumPoolSize() {
return maximumPoolSize;
}
-
+
@Override
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
@@ -196,20 +205,20 @@
}
}
}
-
+
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
-
+
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
-
+
synchronized (workers) {
while (!isTerminated()) {
long waitTime = deadline - System.currentTimeMillis();
if (waitTime <= 0) {
break;
}
-
+
workers.wait(waitTime);
}
}
@@ -226,7 +235,7 @@
if (!shutdown) {
return false;
}
-
+
synchronized (workers) {
return workers.isEmpty();
}
@@ -237,7 +246,7 @@
if (shutdown) {
return;
}
-
+
shutdown = true;
synchronized (workers) {
@@ -250,7 +259,7 @@
@Override
public List<Runnable> shutdownNow() {
shutdown();
-
+
List<Runnable> answer = new ArrayList<Runnable>();
Runnable task;
while ((task = getQueue().poll()) != null) {
@@ -259,11 +268,11 @@
Thread.yield(); // Let others take the signal.
continue;
}
-
+
getQueueHandler().polled(this, (IoEvent) task);
answer.add(task);
}
-
+
return answer;
}
@@ -274,24 +283,24 @@
}
checkTaskType(task);
-
+
IoEvent e = (IoEvent) task;
boolean offeredEvent = queueHandler.accept(this, e);
if (offeredEvent) {
getQueue().offer(e);
}
-
+
addWorkerIfNecessary();
-
+
if (offeredEvent) {
queueHandler.offered(this, e);
}
}
-
+
private void rejectTask(Runnable task) {
getRejectedExecutionHandler().rejectedExecution(task, this);
}
-
+
private void checkTaskType(Runnable task) {
if (!(task instanceof IoEvent)) {
throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
@@ -312,7 +321,7 @@
for (Worker w: workers) {
answer += w.completedTaskCount;
}
-
+
return answer;
}
}
@@ -364,9 +373,10 @@
}
}
}
-
+
@Override
public void purge() {
+ // Nothing to purge in this implementation.
}
@Override
@@ -391,7 +401,7 @@
if (corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
}
-
+
synchronized (workers) {
if (this.corePoolSize > corePoolSize) {
for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
@@ -401,21 +411,21 @@
this.corePoolSize = corePoolSize;
}
}
-
+
private class Worker implements Runnable {
-
+
private volatile long completedTaskCount;
private Thread thread;
-
+
public void run() {
thread = Thread.currentThread();
-
+
try {
for (;;) {
Runnable task = fetchTask();
-
+
idleWorkers.decrementAndGet();
-
+
if (task == null) {
synchronized (workers) {
if (workers.size() > corePoolSize) {
@@ -425,11 +435,11 @@
}
}
}
-
+
if (task == EXIT_SIGNAL) {
break;
}
-
+
try {
if (task != null) {
queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
@@ -484,8 +494,9 @@
afterExecute(task, null);
completedTaskCount ++;
} catch (RuntimeException e) {
- if (!ran)
+ if (!ran) {
afterExecute(task, e);
+ }
throw e;
}
}