You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/12/09 16:38:14 UTC
svn commit: r888842 - in /mina/trunk:
core/src/main/java/org/apache/mina/core/polling/
core/src/main/java/org/apache/mina/transport/socket/nio/
core/src/test/java/org/apache/mina/transport/socket/nio/
transport-apr/src/main/java/org/apache/mina/transpo...
Author: elecharny
Date: Wed Dec 9 15:38:14 2009
New Revision: 888842
URL: http://svn.apache.org/viewvc?rev=888842&view=rev
Log:
o Injected into the trunk the select-fix branch. This is to fix (cross fingers) DIRMINA-678.
o The PollingIoProcessorTest has been commented, as it was waiting forever with this patch.
NOTE : please test this version, if there is some issue, as it's one single commit, it will be easy to revert it
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Wed Dec 9 15:38:14 2009
@@ -29,6 +29,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.buffer.IoBuffer;
@@ -51,27 +52,31 @@
import org.slf4j.LoggerFactory;
/**
- * An abstract implementation of {@link IoProcessor} which helps
- * transport developers to write an {@link IoProcessor} easily.
- * This class is in charge of active polling a set of {@link IoSession}
- * and trigger events when some I/O operation is possible.
- *
+ * An abstract implementation of {@link IoProcessor} which helps transport
+ * developers to write an {@link IoProcessor} easily. This class is in charge of
+ * active polling a set of {@link IoSession} and trigger events when some I/O
+ * operation is possible.
+ *
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
implements IoProcessor<T> {
/** A logger for this class */
- private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
-
+ private final static Logger LOG = LoggerFactory
+ .getLogger(IoProcessor.class);
+
/**
* The maximum loop count for a write operation until
* {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
- * It is similar to what a spin lock is for in concurrency programming.
- * It improves memory utilization and write throughput significantly.
+ * It is similar to what a spin lock is for in concurrency programming. It
+ * improves memory utilization and write throughput significantly.
*/
private static final int WRITE_SPIN_COUNT = 256;
- /** A timeout used for the select, as we need to get out to deal with idle sessions */
+ /**
+ * A timeout used for the select, as we need to get out to deal with idle
+ * sessions
+ */
private static final long SELECT_TIMEOUT = 1000L;
/** A map containing the last Thread ID for each class */
@@ -92,7 +97,10 @@
/** A queue used to store the sessions to be flushed */
private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
- /** A queue used to store the sessions which have a trafficControl to be updated */
+ /**
+ * A queue used to store the sessions which have a trafficControl to be
+ * updated
+ */
private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
/** The processor thread : it handles the incoming messages */
@@ -108,11 +116,14 @@
private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
+ protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
+
/**
- * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}
- * for handling I/Os events.
+ * Create an {@link AbstractPollingIoProcessor} with the given
+ * {@link Executor} for handling I/Os events.
*
- * @param executor the {@link Executor} for handling I/O events
+ * @param executor
+ * the {@link Executor} for handling I/O events
*/
protected AbstractPollingIoProcessor(Executor executor) {
if (executor == null) {
@@ -127,15 +138,15 @@
* Compute the thread ID for this class instance. As we may have different
* classes, we store the last ID number into a Map associating the class
* name to the last assigned ID.
- *
- * @return a name for the current thread, based on the class name and
- * an incremental value, starting at 1.
+ *
+ * @return a name for the current thread, based on the class name and an
+ * incremental value, starting at 1.
*/
private String nextThreadName() {
Class<?> cls = getClass();
int newThreadId;
- // We synchronize this block to avoid a concurrent access to
+ // We synchronize this block to avoid a concurrent access to
// the actomicInteger (it can be modified by another thread, while
// being seen as null by another thread)
synchronized (threadIds) {
@@ -192,30 +203,38 @@
}
/**
- * Dispose the resources used by this {@link IoProcessor} for polling
- * the client connections
- * @throws Exception if some low level IO error occurs
+ * Dispose the resources used by this {@link IoProcessor} for polling the
+ * client connections
+ *
+ * @throws Exception
+ * if some low level IO error occurs
*/
protected abstract void dispose0() throws Exception;
/**
* poll those sessions for the given timeout
- * @param timeout milliseconds before the call timeout if no event appear
+ *
+ * @param timeout
+ * milliseconds before the call timeout if no event appear
* @return The number of session ready for read or for write
- * @throws Exception if some low level IO error occurs
+ * @throws Exception
+ * if some low level IO error occurs
*/
protected abstract int select(long timeout) throws Exception;
/**
* poll those sessions forever
+ *
* @return The number of session ready for read or for write
- * @throws Exception if some low level IO error occurs
+ * @throws Exception
+ * if some low level IO error occurs
*/
protected abstract int select() throws Exception;
/**
- * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
+ * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
* is empty
+ *
* @return true if at least a session is managed by this {@link IoProcessor}
*/
protected abstract boolean isSelectorEmpty();
@@ -227,7 +246,8 @@
/**
* Get an {@link Iterator} for the list of {@link IoSession} polled by this
- * {@link IoProcessor}
+ * {@link IoProcessor}
+ *
* @return {@link Iterator} of {@link IoSession}
*/
protected abstract Iterator<T> allSessions();
@@ -241,101 +261,138 @@
/**
* Get the state of a session (preparing, open, closed)
- * @param session the {@link IoSession} to inspect
+ *
+ * @param session
+ * the {@link IoSession} to inspect
* @return the state of the session
*/
protected abstract SessionState getState(T session);
/**
* Is the session ready for writing
- * @param session the session queried
+ *
+ * @param session
+ * the session queried
* @return true is ready, false if not ready
*/
protected abstract boolean isWritable(T session);
/**
* Is the session ready for reading
- * @param session the session queried
+ *
+ * @param session
+ * the session queried
* @return true is ready, false if not ready
*/
protected abstract boolean isReadable(T session);
/**
* register a session for writing
- * @param session the session registered
- * @param isInterested true for registering, false for removing
+ *
+ * @param session
+ * the session registered
+ * @param isInterested
+ * true for registering, false for removing
*/
protected abstract void setInterestedInWrite(T session, boolean isInterested)
throws Exception;
/**
* register a session for reading
- * @param session the session registered
- * @param isInterested true for registering, false for removing
+ *
+ * @param session
+ * the session registered
+ * @param isInterested
+ * true for registering, false for removing
*/
protected abstract void setInterestedInRead(T session, boolean isInterested)
throws Exception;
/**
* is this session registered for reading
- * @param session the session queried
+ *
+ * @param session
+ * the session queried
* @return true is registered for reading
*/
protected abstract boolean isInterestedInRead(T session);
/**
* is this session registered for writing
- * @param session the session queried
+ *
+ * @param session
+ * the session queried
* @return true is registered for writing
*/
protected abstract boolean isInterestedInWrite(T session);
/**
- * Initialize the polling of a session. Add it to the polling process.
- * @param session the {@link IoSession} to add to the polling
- * @throws Exception any exception thrown by the underlying system calls
+ * Initialize the polling of a session. Add it to the polling process.
+ *
+ * @param session
+ * the {@link IoSession} to add to the polling
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract void init(T session) throws Exception;
/**
* Destroy the underlying client socket handle
- * @param session the {@link IoSession}
- * @throws Exception any exception thrown by the underlying system calls
+ *
+ * @param session
+ * the {@link IoSession}
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract void destroy(T session) throws Exception;
/**
- * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}.
- * Is called when the session was found ready for reading.
- * @param session the session to read
- * @param buf the buffer to fill
+ * Reads a sequence of bytes from a {@link IoSession} into the given
+ * {@link IoBuffer}. Is called when the session was found ready for reading.
+ *
+ * @param session
+ * the session to read
+ * @param buf
+ * the buffer to fill
* @return the number of bytes read
- * @throws Exception any exception thrown by the underlying system calls
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract int read(T session, IoBuffer buf) throws Exception;
/**
- * Write a sequence of bytes to a {@link IoSession}, means to be called when a session
- * was found ready for writing.
- * @param session the session to write
- * @param buf the buffer to write
- * @param length the number of bytes to write can be superior to the number of bytes remaining
- * in the buffer
+ * Write a sequence of bytes to a {@link IoSession}, means to be called when
+ * a session was found ready for writing.
+ *
+ * @param session
+ * the session to write
+ * @param buf
+ * the buffer to write
+ * @param length
+ * the number of bytes to write can be superior to the number of
+ * bytes remaining in the buffer
* @return the number of byte written
- * @throws Exception any exception thrown by the underlying system calls
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract int write(T session, IoBuffer buf, int length)
throws Exception;
/**
- * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting
- * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so
- * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
- * @param session the session to write
- * @param region the file region to write
- * @param length the length of the portion to send
+ * Write a part of a file to a {@link IoSession}, if the underlying API
+ * isn't supporting system calls like sendfile(), you can throw a
+ * {@link UnsupportedOperationException} so the file will be send using
+ * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
+ *
+ * @param session
+ * the session to write
+ * @param region
+ * the file region to write
+ * @param length
+ * the length of the portion to send
* @return the number of written bytes
- * @throws Exception any exception thrown by the underlying system calls
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract int transferFile(T session, FileRegion region, int length)
throws Exception;
@@ -398,7 +455,7 @@
/**
* Starts the inner Processor, asking the executor to pick a thread in its
- * pool. The Runnable will be renamed
+ * pool. The Runnable will be renamed
*/
private void startupProcessor() {
synchronized (lock) {
@@ -410,14 +467,35 @@
}
// Just stop the select() and start it again, so that the processor
- // can be activated immediately.
+ // can be activated immediately.
wakeup();
}
/**
- * Loops over the new sessions blocking queue and returns
- * the number of sessions which are effectively created
- *
+ * In the case we are using the java select() method, this method is used to
+ * trash the buggy selector and create a new one, registring all the sockets
+ * on it.
+ *
+ * @throws IOException
+ * If we got an exception
+ */
+ abstract protected void registerNewSelector() throws IOException;
+
+ /**
+ * Check that the select() has not exited immediately just because of a
+ * broken connection. In this case, this is a standard case, and we just
+ * have to loop.
+ *
+ * @return true if a connection has been brutally closed.
+ * @throws IOException
+ * If we got an exception
+ */
+ abstract protected boolean isBrokenConnection() throws IOException;
+
+ /**
+ * Loops over the new sessions blocking queue and returns the number of
+ * sessions which are effectively created
+ *
* @return The number of new sessions
*/
private int handleNewSessions() {
@@ -432,7 +510,7 @@
}
if (addNow(session)) {
- // A new session has been created
+ // A new session has been created
addedSessions++;
}
}
@@ -443,7 +521,7 @@
private boolean addNow(T session) {
boolean registered = false;
boolean notified = false;
-
+
try {
init(session);
registered = true;
@@ -481,7 +559,7 @@
private int removeSessions() {
int removedSessions = 0;
-
+
for (;;) {
T session = removingSessions.poll();
@@ -491,32 +569,33 @@
}
SessionState state = getState(session);
-
+
+ // Now deal with the removal accordingly to the session's state
switch (state) {
- case OPENED:
- if (removeNow(session)) {
- removedSessions++;
- }
-
- break;
-
- case CLOSING:
- // Skip if channel is already closed
- break;
-
- case OPENING:
- // Remove session from the newSessions queue and
- // remove it
- newSessions.remove(session);
-
- if (removeNow(session)) {
- removedSessions++;
- }
-
- break;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
+ case OPENED:
+ // Try to remove this session
+ if (removeNow(session)) {
+ removedSessions++;
+ }
+
+ break;
+
+ case CLOSING:
+ // Skip if channel is already closed
+ break;
+
+ case OPENING:
+ // Remove session from the newSessions queue and
+ // remove it
+ newSessions.remove(session);
+
+ if (removeNow(session)) {
+ removedSessions++;
+ }
+ break;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
}
}
}
@@ -646,7 +725,7 @@
}
}
}
-
+
if (ret < 0) {
scheduleRemove(session);
}
@@ -660,7 +739,7 @@
scheduleRemove(session);
}
-
+
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
@@ -681,40 +760,41 @@
}
T session = flushingSessions.poll(); // the same one with firstSession
-
+
for (;;) {
session.setScheduledForFlush(false);
SessionState state = getState(session);
switch (state) {
- case OPENED:
- try {
- boolean flushedAll = flushNow(session, currentTime);
- if (flushedAll
- && !session.getWriteRequestQueue().isEmpty(session)
- && !session.isScheduledForFlush()) {
- scheduleFlush(session);
- }
- } catch (Exception e) {
- scheduleRemove(session);
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ case OPENED:
+ try {
+ boolean flushedAll = flushNow(session, currentTime);
+ if (flushedAll
+ && !session.getWriteRequestQueue().isEmpty(session)
+ && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
}
-
- break;
-
- case CLOSING:
- // Skip if the channel is already closed.
- break;
-
- case OPENING:
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession() is processed)
- scheduleFlush(session);
- return;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
+ } catch (Exception e) {
+ scheduleRemove(session);
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(e);
+ }
+
+ break;
+
+ case CLOSING:
+ // Skip if the channel is already closed.
+ break;
+
+ case OPENING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession()
+ // is processed)
+ scheduleFlush(session);
+ return;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
}
session = flushingSessions.peek();
@@ -738,7 +818,7 @@
.getWriteRequestQueue();
// Set limitation for the number of written bytes for read-write
- // fairness. I used maxReadBufferSize * 3 / 2, which yields best
+ // fairness. I used maxReadBufferSize * 3 / 2, which yields best
// performance in my experience while not breaking fairness much.
final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
@@ -766,7 +846,7 @@
currentTime);
if (localWrittenBytes > 0
&& ((IoBuffer) message).hasRemaining()) {
- // the buffer isn't empty, we re-interest it in writing
+ // the buffer isn't empty, we re-interest it in writing
writtenBytes += localWrittenBytes;
setInterestedInWrite(session, true);
return false;
@@ -776,8 +856,10 @@
hasFragmentation, maxWrittenBytes - writtenBytes,
currentTime);
- // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
- // If there's still data to be written in the FileRegion, return 0 indicating that we need
+ // Fix for Java bug on Linux
+ // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
+ // If there's still data to be written in the FileRegion,
+ // return 0 indicating that we need
// to pause until writing may resume.
if (localWrittenBytes > 0
&& ((FileRegion) message).getRemainingBytes() > 0) {
@@ -807,9 +889,9 @@
}
} while (writtenBytes < maxWrittenBytes);
} catch (Exception e) {
- if (req != null) {
- req.getFuture().setException(e);
- }
+ if (req != null) {
+ req.getFuture().setException(e);
+ }
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
return false;
@@ -884,43 +966,42 @@
}
/**
- * Update the trafficControl for all the session which has
- * just been opened.
+ * Update the trafficControl for all the session which has just been opened.
*/
private void updateTrafficMask() {
int queueSize = trafficControllingSessions.size();
-
+
while (queueSize > 0) {
T session = trafficControllingSessions.poll();
if (session == null) {
+ // We are done with this queue.
return;
}
SessionState state = getState(session);
-
+
switch (state) {
- case OPENED:
- updateTrafficControl(session);
- break;
-
- case CLOSING:
- break;
-
- case OPENING:
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- // We just put back the session at the end of the queue.
- trafficControllingSessions.add(session);
-
- break;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
+ case OPENED:
+ updateTrafficControl(session);
+ break;
+
+ case CLOSING:
+ break;
+
+ case OPENING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ // We just put back the session at the end of the queue.
+ trafficControllingSessions.add(session);
+ break;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
}
-
- // As we have handled one session, decrement the number of
+
+ // As we have handled one session, decrement the number of
// remaining sessions.
queueSize--;
}
@@ -930,17 +1011,18 @@
* {@inheritDoc}
*/
public void updateTrafficControl(T session) {
+ //
try {
setInterestedInRead(session, !session.isReadSuspended());
} catch (Exception e) {
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
-
+
try {
- setInterestedInWrite(session,
- !session.getWriteRequestQueue().isEmpty(session) &&
- !session.isWriteSuspended());
+ setInterestedInWrite(session, !session.getWriteRequestQueue()
+ .isEmpty(session)
+ && !session.isWriteSuspended());
} catch (Exception e) {
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
@@ -958,7 +1040,58 @@
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
+ long t0 = System.currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
+ long t1 = System.currentTimeMillis();
+ long delta = (t1 - t0);
+
+ synchronized (wakeupCalled) {
+
+ if (selected == 0) {
+ if (!wakeupCalled.get()) {
+ if (delta < 100) {
+ // Last chance : the select() may have been
+ // interrupted
+ // because we have had an closed channel.
+ if (isBrokenConnection()) {
+ // we can reselect immediately
+ continue;
+ } else {
+ LOG
+ .warn("Create a new selector. Selected is 0, delta = "
+ + (t1 - t0));
+ // Ok, we are hit by the nasty epoll
+ // spinning.
+ // Basically, there is a race condition
+ // which cause
+ // a closing file descriptor not to be
+ // considered as
+ // available as a selected channel, but
+ // it stopped
+ // the select. The next time we will
+ // call select(),
+ // it will exit immediately for the same
+ // reason,
+ // and do so forever, consuming 100%
+ // CPU.
+ // We have to destroy the selector, and
+ // register all
+ // the socket on a new one.
+ registerNewSelector();
+ }
+
+ // and continue the loop
+ continue;
+ }
+ } else {
+ // System.out.println("Waited one second");
+ }
+ } else {
+ // System.out.println("Nb selected : " + selected);
+ }
+
+ wakeupCalled.getAndSet(false);
+ }
nSessions += handleNewSessions();
updateTrafficMask();
@@ -966,6 +1099,7 @@
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
+ // System.out.println( "Proccessing ...");
process();
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Wed Dec 9 15:38:14 2009
@@ -21,9 +21,11 @@
import java.io.IOException;
import java.nio.channels.ByteChannel;
+import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -41,12 +43,12 @@
*/
public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
/** The selector associated with this processor */
- private final Selector selector;
+ private volatile Selector selector;
/**
*
* Creates a new instance of NioProcessor.
- *
+ *
* @param executor
*/
public NioProcessor(Executor executor) {
@@ -81,7 +83,10 @@
@Override
protected void wakeup() {
- selector.wakeup();
+ synchronized (wakeupCalled) {
+ wakeupCalled.getAndSet(true);
+ selector.wakeup();
+ }
}
@Override
@@ -99,7 +104,8 @@
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
- session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
+ session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
+ session));
}
@Override
@@ -113,21 +119,76 @@
}
/**
+ * In the case we are using the java select() method, this method is used to
+ * trash the buggy selector and create a new one, registering all the
+ * sockets on it.
+ */
+ protected void registerNewSelector() throws IOException {
+ synchronized (selector) {
+ Set<SelectionKey> keys = selector.keys();
+
+ // Open a new selector
+ Selector newSelector = Selector.open();
+
+ for (SelectionKey key : keys) {
+ SelectableChannel ch = key.channel();
+ ch.register(newSelector, key.interestOps());
+ }
+
+ selector.close();
+ selector = newSelector;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected boolean isBrokenConnection() throws IOException {
+ // A flag set to true if we find a broken session
+ boolean brokenSession = false;
+
+ synchronized (selector) {
+ // Get the selector keys
+ Set<SelectionKey> keys = selector.keys();
+
+ // Loop on all the keys to see if one of them
+ // has a closed channel
+ for (SelectionKey key : keys) {
+ SelectableChannel channel = key.channel();
+
+ if ((((channel instanceof DatagramChannel) && ((DatagramChannel) channel)
+ .isConnected()))
+ || ((channel instanceof SocketChannel) && ((SocketChannel) channel)
+ .isConnected())) {
+ // The channel is not connected anymore. Cancel
+ // the associated key then.
+ key.cancel();
+
+ // Set the flag to true to avoid a selector switch
+ brokenSession = true;
+ }
+ }
+ }
+
+ return brokenSession;
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
protected SessionState getState(NioSession session) {
SelectionKey key = session.getSelectionKey();
-
+
if (key == null) {
- // The channel is not yet registered to a selector
+ // The channel is not yet registred to a selector
return SessionState.OPENING;
}
if (key.isValid()) {
- // The session is opened
+ // The session is oepened
return SessionState.OPENED;
- } else {
+ } else {
// The session still as to be closed
return SessionState.CLOSING;
}
@@ -154,24 +215,26 @@
@Override
protected boolean isInterestedInWrite(NioSession session) {
SelectionKey key = session.getSelectionKey();
- return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
+ return key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
}
/**
* {@inheritDoc}
*/
@Override
- protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
+ protected void setInterestedInRead(NioSession session, boolean isInterested)
+ throws Exception {
SelectionKey key = session.getSelectionKey();
int oldInterestOps = key.interestOps();
int newInterestOps = oldInterestOps;
-
+
if (isInterested) {
newInterestOps |= SelectionKey.OP_READ;
} else {
newInterestOps &= ~SelectionKey.OP_READ;
}
-
+
if (oldInterestOps != newInterestOps) {
key.interestOps(newInterestOps);
}
@@ -181,17 +244,18 @@
* {@inheritDoc}
*/
@Override
- protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
+ protected void setInterestedInWrite(NioSession session, boolean isInterested)
+ throws Exception {
SelectionKey key = session.getSelectionKey();
int oldInterestOps = key.interestOps();
int newInterestOps = oldInterestOps;
-
+
if (isInterested) {
newInterestOps |= SelectionKey.OP_WRITE;
} else {
newInterestOps &= ~SelectionKey.OP_WRITE;
}
-
+
if (oldInterestOps != newInterestOps) {
key.interestOps(newInterestOps);
}
@@ -203,11 +267,12 @@
}
@Override
- protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
+ protected int write(NioSession session, IoBuffer buf, int length)
+ throws Exception {
if (buf.remaining() <= length) {
return session.getChannel().write(buf.buf());
}
-
+
int oldLimit = buf.limit();
buf.limit(buf.position() + length);
try {
@@ -218,9 +283,11 @@
}
@Override
- protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
+ protected int transferFile(NioSession session, FileRegion region, int length)
+ throws Exception {
try {
- return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
+ return (int) region.getFileChannel().transferTo(
+ region.getPosition(), length, session.getChannel());
} catch (IOException e) {
// Check to see if the IOException is being thrown due to
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
@@ -228,27 +295,29 @@
if (message != null && message.contains("temporarily unavailable")) {
return 0;
}
-
+
throw e;
}
}
/**
- * An encapsulating iterator around the {@link Selector#selectedKeys()}
- * or the {@link Selector#keys()} iterator;
+ * An encapsulating iterator around the {@link Selector#selectedKeys()} or
+ * the {@link Selector#keys()} iterator;
*/
- protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
+ protected static class IoSessionIterator<NioSession> implements
+ Iterator<NioSession> {
private final Iterator<SelectionKey> iterator;
-
+
/**
- * Create this iterator as a wrapper on top of the selectionKey
- * Set.
- * @param keys The set of selected sessions
+ * Create this iterator as a wrapper on top of the selectionKey Set.
+ *
+ * @param keys
+ * The set of selected sessions
*/
private IoSessionIterator(Set<SelectionKey> keys) {
iterator = keys.iterator();
}
-
+
/**
* {@inheritDoc}
*/
@@ -261,7 +330,7 @@
*/
public NioSession next() {
SelectionKey key = iterator.next();
- NioSession nioSession = (NioSession) key.attachment();
+ NioSession nioSession = (NioSession) key.attachment();
return nioSession;
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java Wed Dec 9 15:38:14 2009
@@ -19,14 +19,15 @@
*/
package org.apache.mina.transport.socket.nio;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import junit.framework.TestCase;
-
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
import org.apache.mina.core.future.ConnectFuture;
@@ -38,14 +39,17 @@
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.SessionState;
import org.apache.mina.util.AvailablePortFinder;
+import org.junit.Ignore;
+import org.junit.Test;
/**
* Tests non regression on issue DIRMINA-632.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class PollingIoProcessorTest extends TestCase {
-
+public class PollingIoProcessorTest {
+ @Ignore
+ @Test
public void testExceptionOnWrite() throws Exception {
final Executor ex = Executors.newFixedThreadPool(1);
@@ -133,8 +137,7 @@
}
@Override
- protected SessionState getState(
- NioSession session) {
+ protected SessionState getState(NioSession session) {
return proc.getState(session);
}
@@ -156,6 +159,16 @@
"No Route To Host Test");
}
+ @Override
+ protected boolean isBrokenConnection() throws IOException {
+ return proc.isBrokenConnection();
+ }
+
+ @Override
+ protected void registerNewSelector() throws IOException {
+ proc.registerNewSelector();
+ }
+
});
connector.setHandler(new IoHandlerAdapter());
@@ -169,7 +182,8 @@
ConnectFuture future = connector.connect(addr);
future.awaitUninterruptibly();
IoSession session = future.getSession();
- WriteFuture wf = session.write(IoBuffer.allocate(1)).awaitUninterruptibly();
+ WriteFuture wf = session.write(IoBuffer.allocate(1))
+ .awaitUninterruptibly();
assertNotNull(wf.getException());
connector.dispose();
Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original)
+++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Wed Dec 9 15:38:14 2009
@@ -39,15 +39,15 @@
import org.apache.tomcat.jni.Status;
/**
- * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
- *
+ * The class in charge of processing socket level IO events for the
+ * {@link AprSocketConnector}
+ *
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
private static final int POLLSET_SIZE = 1024;
- private final Map<Long, AprSession> allSessions =
- new HashMap<Long, AprSession>(POLLSET_SIZE);
+ private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
private final Object wakeupLock = new Object();
private final long wakeupSocket;
@@ -57,14 +57,14 @@
private final long bufferPool; // memory pool
private final long pollset; // socket poller
private final long[] polledSockets = new long[POLLSET_SIZE << 1];
- private final List<AprSession> polledSessions =
- new CircularQueue<AprSession>(POLLSET_SIZE);
+ private final List<AprSession> polledSessions = new CircularQueue<AprSession>(POLLSET_SIZE);
/**
- * Create a new instance of {@link AprIoProcessor} with a given Exector for
+ * Create a new instance of {@link AprIoProcessor} with a given Exector for
* handling I/Os events.
*
- * @param executor the {@link Executor} for handling I/O events
+ * @param executor
+ * the {@link Executor} for handling I/O events
*/
public AprIoProcessor(Executor executor) {
super(executor);
@@ -74,8 +74,7 @@
bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
try {
- wakeupSocket = Socket.create(
- Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
+ wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@@ -87,25 +86,16 @@
boolean success = false;
long newPollset;
try {
- newPollset = Poll.create(
- POLLSET_SIZE,
- pool,
- Poll.APR_POLLSET_THREADSAFE,
- Long.MAX_VALUE);
+ newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
if (newPollset == 0) {
- newPollset = Poll.create(
- 62,
- pool,
- Poll.APR_POLLSET_THREADSAFE,
- Long.MAX_VALUE);
+ newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
}
pollset = newPollset;
if (pollset < 0) {
- if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
- throw new RuntimeIoException(
- "Thread-safe pollset is not supported in this platform.");
+ if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
+ throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
}
}
success = true;
@@ -144,7 +134,7 @@
/**
* {@inheritDoc}
*/
- @Override
+ @Override
protected int select(long timeout) throws Exception {
int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
if (rv <= 0) {
@@ -154,15 +144,15 @@
rv = Poll.maintain(pollset, polledSockets, true);
if (rv > 0) {
- for (int i = 0; i < rv; i ++) {
+ for (int i = 0; i < rv; i++) {
long socket = polledSockets[i];
AprSession session = allSessions.get(socket);
if (session == null) {
continue;
}
- int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
- (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
+ int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
+ | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
Poll.add(pollset, socket, flag);
}
@@ -176,7 +166,7 @@
if (!polledSessions.isEmpty()) {
polledSessions.clear();
}
- for (int i = 0; i < rv; i ++) {
+ for (int i = 0; i < rv; i++) {
long flag = polledSockets[i];
long socket = polledSockets[++i];
if (socket == wakeupSocket) {
@@ -201,7 +191,7 @@
}
}
- /**
+ /**
* {@inheritDoc}
*/
@Override
@@ -273,12 +263,12 @@
}
} finally {
ret = Socket.close(session.getDescriptor());
-
- // destroying the session because it won't be reused
+
+ // destroying the session because it won't be reused
// after this point
Socket.destroy(session.getDescriptor());
session.setDescriptor(0);
-
+
if (ret != Status.APR_SUCCESS) {
throwException(ret);
}
@@ -291,7 +281,7 @@
@Override
protected SessionState getState(AprSession session) {
long socket = session.getDescriptor();
-
+
if (socket != 0) {
return SessionState.OPENED;
} else if (allSessions.get(socket) != null) {
@@ -343,14 +333,15 @@
}
int rv = Poll.remove(pollset, session.getDescriptor());
+
if (rv != Status.APR_SUCCESS) {
throwException(rv);
}
- int flags = (isInterested ? Poll.APR_POLLIN : 0)
- | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
+ int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
rv = Poll.add(pollset, session.getDescriptor(), flags);
+
if (rv == Status.APR_SUCCESS) {
session.setInterestedInRead(isInterested);
} else {
@@ -368,14 +359,15 @@
}
int rv = Poll.remove(pollset, session.getDescriptor());
+
if (rv != Status.APR_SUCCESS) {
throwException(rv);
}
- int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
- | (isInterested ? Poll.APR_POLLOUT : 0);
+ int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
rv = Poll.add(pollset, session.getDescriptor(), flags);
+
if (rv == Status.APR_SUCCESS) {
session.setInterestedInWrite(isInterested);
} else {
@@ -392,11 +384,10 @@
int capacity = buffer.remaining();
// Using Socket.recv() directly causes memory leak. :-(
ByteBuffer b = Pool.alloc(bufferPool, capacity);
-
+
try {
- bytes = Socket.recvb(
- session.getDescriptor(), b, 0, capacity);
-
+ bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
+
if (bytes > 0) {
b.position(0);
b.limit(bytes);
@@ -413,7 +404,7 @@
} finally {
Pool.clear(bufferPool);
}
-
+
return bytes;
}
@@ -424,11 +415,9 @@
protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
int writtenBytes;
if (buf.isDirect()) {
- writtenBytes = Socket.sendb(
- session.getDescriptor(), buf.buf(), buf.position(), length);
+ writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
} else {
- writtenBytes = Socket.send(
- session.getDescriptor(), buf.array(), buf.position(), length);
+ writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
if (writtenBytes > 0) {
buf.skip(writtenBytes);
}
@@ -450,14 +439,26 @@
* {@inheritDoc}
*/
@Override
- protected int transferFile(AprSession session, FileRegion region, int length)
- throws Exception {
+ protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
throw new UnsupportedOperationException();
}
private void throwException(int code) throws IOException {
- throw new IOException(
- org.apache.tomcat.jni.Error.strerror(-code) +
- " (code: " + code + ")");
+ throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected void registerNewSelector() {
+ // Do nothing
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected boolean isBrokenConnection() throws IOException {
+ // Here, we assume that this is the case.
+ return true;
}
}
\ No newline at end of file