You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2010/01/16 04:10:26 UTC
svn commit: r899880 -
/mina/trunk/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
Author: elecharny
Date: Sat Jan 16 03:10:26 2010
New Revision: 899880
URL: http://svn.apache.org/viewvc?rev=899880&view=rev
Log:
o Removed some useless code
o Reorganized some code
o Added logs
o Added missing Javadoc
o Minor refactoring
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java?rev=899880&r1=899879&r2=899880&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java Sat Jan 16 03:10:26 2010
@@ -74,63 +74,93 @@
* @param <T> the type of the {@link IoSession} to be managed by the specified
* {@link IoProcessor}.
*/
-public class SimpleIoProcessorPool<T extends AbstractIoSession> implements
- IoProcessor<T> {
+public class SimpleIoProcessorPool<T extends AbstractIoSession> implements IoProcessor<T> {
+ /** A logger for this class */
+ private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
- private static final int DEFAULT_SIZE = Runtime.getRuntime()
- .availableProcessors() + 1;
+ /** The default pool size, when no size is provided. */
+ private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
- private static final AttributeKey PROCESSOR = new AttributeKey(
- SimpleIoProcessorPool.class, "processor");
-
- private final static Logger LOGGER = LoggerFactory
- .getLogger(SimpleIoProcessorPool.class);
+ /** A key used to store the processor pool in the session's Attributes */
+ private static final AttributeKey PROCESSOR = new AttributeKey( SimpleIoProcessorPool.class, "processor");
+ /** The pool table */
private final IoProcessor<T>[] pool;
+ /** A protected counter used to loop on the pool when selecting a new one */
private final AtomicInteger processorDistributor = new AtomicInteger();
+ /** The contained which is passed to the IoProcessor when they are created */
private final Executor executor;
+ /** A flag set to true if we had to create an executor */
private final boolean createdExecutor;
+ /** A lock to protect the disposal against concurrent calls */
private final Object disposalLock = new Object();
+ /** A flg set to true if the IoProcessor in the pool are being disposed */
private volatile boolean disposing;
+ /** A flag set to true if all the IoProcessor contained in the pool have been disposed */
private volatile boolean disposed;
+ /**
+ * Creates a new instance of SimpleIoProcessorPool with a default
+ * size of NbCPUs +1.
+ *
+ * @param processorType The type of IoProcessor to use
+ */
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
this(processorType, null, DEFAULT_SIZE);
}
- public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
- int size) {
+ /**
+ * Creates a new instance of SimpleIoProcessorPool with a defined
+ * number of IoProcessors in the pool
+ *
+ * @param processorType The type of IoProcessor to use
+ * @param size The number of IoProcessor in the pool
+ */
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, int size) {
this(processorType, null, size);
}
- public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
- Executor executor) {
+ /**
+ * Creates a new instance of SimpleIoProcessorPool with an executor
+ *
+ * @param processorType The type of IoProcessor to use
+ * @param executor The {@link Executor}
+ */
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor) {
this(processorType, executor, DEFAULT_SIZE);
}
+ /**
+ * Creates a new instance of SimpleIoProcessorPool with an executor
+ *
+ * @param processorType The type of IoProcessor to use
+ * @param executor The {@link Executor}
+ */
@SuppressWarnings("unchecked")
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
Executor executor, int size) {
if (processorType == null) {
throw new NullPointerException("processorType");
}
+
if (size <= 0) {
throw new IllegalArgumentException("size: " + size
+ " (expected: positive integer)");
}
- if (executor == null) {
- this.executor = executor = Executors.newCachedThreadPool();
- this.createdExecutor = true;
+ // Create the executor if none is provided
+ createdExecutor = (executor == null);
+
+ if (createdExecutor) {
+ this.executor = Executors.newCachedThreadPool();
} else {
this.executor = executor;
- this.createdExecutor = false;
}
pool = new IoProcessor[size];
@@ -143,17 +173,15 @@
// We create at least one processor
try {
try {
- processorConstructor = processorType
- .getConstructor(ExecutorService.class);
- pool[0] = processorConstructor.newInstance(executor);
+ processorConstructor = processorType.getConstructor(ExecutorService.class);
+ pool[0] = processorConstructor.newInstance(this.executor);
} catch (NoSuchMethodException e) {
// To the next step...
}
try {
- processorConstructor = processorType
- .getConstructor(Executor.class);
- pool[0] = processorConstructor.newInstance(executor);
+ processorConstructor = processorType.getConstructor(Executor.class);
+ pool[0] = processorConstructor.newInstance(this.executor);
} catch (NoSuchMethodException e) {
// To the next step...
}
@@ -165,32 +193,32 @@
} catch (NoSuchMethodException e) {
// To the next step...
}
- } catch (RuntimeException e) {
- throw e;
+ } catch (RuntimeException re) {
+ LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
+ throw re;
} catch (Exception e) {
- throw new RuntimeIoException(
- "Failed to create a new instance of "
- + processorType.getName(), e);
+ String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
+ LOGGER.error(msg, e);
+ throw new RuntimeIoException(msg , e);
}
if (processorConstructor == null) {
// Raise an exception if no proper constructor is found.
- throw new IllegalArgumentException(String
- .valueOf(processorType)
- + " must have a public constructor "
- + "with one "
- + ExecutorService.class.getSimpleName()
- + " parameter, "
- + "a public constructor with one "
- + Executor.class.getSimpleName()
- + " parameter or a public default constructor.");
+ String msg = String.valueOf(processorType)
+ + " must have a public constructor with one "
+ + ExecutorService.class.getSimpleName()
+ + " parameter, a public constructor with one "
+ + Executor.class.getSimpleName()
+ + " parameter or a public default constructor.";
+ LOGGER.error(msg);
+ throw new IllegalArgumentException(msg);
}
// Constructor found now use it for all subsequent instantiations
for (int i = 1; i < pool.length; i++) {
try {
if (usesExecutorArg) {
- pool[i] = processorConstructor.newInstance(executor);
+ pool[i] = processorConstructor.newInstance(this.executor);
} else {
pool[i] = processorConstructor.newInstance();
}
@@ -198,6 +226,7 @@
// Won't happen because it has been done previously
}
}
+
success = true;
} finally {
if (!success) {
@@ -206,30 +235,51 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
public final void add(T session) {
getProcessor(session).add(session);
}
+ /**
+ * {@inheritDoc}
+ */
public final void flush(T session) {
getProcessor(session).flush(session);
}
+ /**
+ * {@inheritDoc}
+ */
public final void remove(T session) {
getProcessor(session).remove(session);
}
+ /**
+ * {@inheritDoc}
+ */
public final void updateTrafficControl(T session) {
getProcessor(session).updateTrafficControl(session);
}
+ /**
+ * {@inheritDoc}
+ */
public boolean isDisposed() {
return disposed;
}
+ /**
+ * {@inheritDoc}
+ */
public boolean isDisposing() {
return disposing;
}
+ /**
+ * {@inheritDoc}
+ */
public final void dispose() {
if (disposed) {
return;
@@ -238,8 +288,11 @@
synchronized (disposalLock) {
if (!disposing) {
disposing = true;
+
+ // Loop on all the IoProcessor and release them
for (int i = pool.length - 1; i >= 0; i--) {
- if (pool[i] == null || pool[i].isDisposing()) {
+ if ((pool[i] == null) || pool[i].isDisposing()) {
+ // Already done
continue;
}
@@ -263,27 +316,31 @@
disposed = true;
}
+ /**
+ * Find the processor associated to a session. If it hasen't be stored into
+ * the session's attributes, pick a new processor and stores it.
+ */
@SuppressWarnings("unchecked")
private IoProcessor<T> getProcessor(T session) {
- IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
- if (p == null) {
- p = nextProcessor();
- IoProcessor<T> oldp = (IoProcessor<T>) session
- .setAttributeIfAbsent(PROCESSOR, p);
- if (oldp != null) {
- p = oldp;
- }
+ IoProcessor<T> processor = (IoProcessor<T>) session.getAttribute(PROCESSOR);
+
+ if (processor == null) {
+ processor = nextProcessor();
+ session.setAttributeIfAbsent(PROCESSOR, processor);
}
- return p;
+ return processor;
}
+ /**
+ * Get a new Processor in the pool, using a round-robin algorithm.
+ */
private IoProcessor<T> nextProcessor() {
if (disposed) {
throw new IllegalStateException(
"A disposed processor cannot be accessed.");
}
- return pool[Math.abs(processorDistributor.getAndIncrement())
- % pool.length];
+
+ return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
}
}