You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/03/09 13:26:41 UTC
svn commit: r751661 -
/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
Author: elecharny
Date: Mon Mar 9 12:26:41 2009
New Revision: 751661
URL: http://svn.apache.org/viewvc?rev=751661&view=rev
Log:
o Added some Javadoc and comments
o Renamed the add() method to handleNewSession()
o Minor refactoring
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=751661&r1=751660&r2=751661&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Mon Mar 9 12:26:41 2009
@@ -162,7 +162,7 @@
synchronized (disposalLock) {
if (!disposing) {
disposing = true;
- startupWorker();
+ startupProcessor();
}
}
@@ -219,7 +219,7 @@
protected abstract Iterator<T> selectedSessions();
/**
- * Get the sate of a session (preparing, open, closed)
+ * Get the state of a session (preparing, open, closed)
* @param session the {@link IoSession} to inspect
* @return the state of the session
*/
@@ -327,7 +327,7 @@
// Adds the session to the newSession queue and starts the worker
newSessions.add(session);
- startupWorker();
+ startupProcessor();
}
/**
@@ -335,7 +335,7 @@
*/
public final void remove(T session) {
scheduleRemove(session);
- startupWorker();
+ startupProcessor();
}
private void scheduleRemove(T session) {
@@ -372,17 +372,28 @@
trafficControllingSessions.add(session);
}
- private void startupWorker() {
+ /**
+ * Starts the inner Processor, asking the executor to pick a thread in its
+ * pool. The Runnable will be renamed
+ */
+ private void startupProcessor() {
synchronized (lock) {
if (processor == null) {
processor = new Processor();
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}
+
+ // Just stop the select() and start it again, so that the processor
+ // can be activated immediately.
wakeup();
}
- private int add() {
+ /**
+ * Handle newly created sessions
+ * @return The number of new sessions
+ */
+ private int handleNewSessions() {
int addedSessions = 0;
// Loop on the new sessions blocking queue, to count
@@ -391,13 +402,13 @@
T session = newSessions.poll();
if (session == null) {
- // We don't have anymore new sessions
+ // We don't have new sessions
break;
}
if (addNow(session)) {
- // The new session has been added to the
+ // A new session has been created
addedSessions ++;
}
}
@@ -541,12 +552,16 @@
}
}
+ /**
+ * Deal with session ready for the read or write operations, or both.
+ */
private void process(T session) {
-
+ // Process Reads
if (isReadable(session) && !session.isReadSuspended()) {
read(session);
}
+ // Process writes
if (isWritable(session) && !session.isWriteSuspended()) {
scheduleFlush(session);
}
@@ -624,6 +639,7 @@
for (; ;) {
session.setScheduledForFlush(false);
SessionState state = state(session);
+
switch (state) {
case OPEN:
try {
@@ -858,11 +874,14 @@
for (;;) {
try {
+ // TODO: Why do we use a timeout here ???
int selected = select(1000);
- nSessions += add();
+ nSessions += handleNewSessions();
updateTrafficMask();
+ // Now, if we have had some incoming or outgoing events,
+ // deal with them
if (selected > 0) {
process();
}