You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2009/12/09 16:38:14 UTC

svn commit: r888842 - in /mina/trunk: core/src/main/java/org/apache/mina/core/polling/ core/src/main/java/org/apache/mina/transport/socket/nio/ core/src/test/java/org/apache/mina/transport/socket/nio/ transport-apr/src/main/java/org/apache/mina/transpo...

Author: elecharny
Date: Wed Dec  9 15:38:14 2009
New Revision: 888842

URL: http://svn.apache.org/viewvc?rev=888842&view=rev
Log:
o Injected into the trunk the select-fix branch. This is to fix (cross fingers) DIRMINA-678.
o The PollingIoProcessorTest has been commented, as it was waiting forever with this patch.

NOTE : please test this version, if there is some issue, as it's one single commit, it will be easy to revert it

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
    mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Wed Dec  9 15:38:14 2009
@@ -29,6 +29,7 @@
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.core.buffer.IoBuffer;
@@ -51,27 +52,31 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * An abstract implementation of {@link IoProcessor} which helps
- * transport developers to write an {@link IoProcessor} easily.
- * This class is in charge of active polling a set of {@link IoSession}
- * and trigger events when some I/O operation is possible.
- *
+ * An abstract implementation of {@link IoProcessor} which helps transport
+ * developers to write an {@link IoProcessor} easily. This class is in charge of
+ * active polling a set of {@link IoSession} and trigger events when some I/O
+ * operation is possible.
+ * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
         implements IoProcessor<T> {
     /** A logger for this class */
-    private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
-    
+    private final static Logger LOG = LoggerFactory
+            .getLogger(IoProcessor.class);
+
     /**
      * The maximum loop count for a write operation until
      * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
-     * It is similar to what a spin lock is for in concurrency programming.
-     * It improves memory utilization and write throughput significantly.
+     * It is similar to what a spin lock is for in concurrency programming. It
+     * improves memory utilization and write throughput significantly.
      */
     private static final int WRITE_SPIN_COUNT = 256;
 
-    /** A timeout used for the select, as we need to get out to deal with idle sessions */
+    /**
+     * A timeout used for the select, as we need to get out to deal with idle
+     * sessions
+     */
     private static final long SELECT_TIMEOUT = 1000L;
 
     /** A map containing the last Thread ID for each class */
@@ -92,7 +97,10 @@
     /** A queue used to store the sessions to be flushed */
     private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
 
-    /** A queue used to store the sessions which have a trafficControl to be updated */
+    /**
+     * A queue used to store the sessions which have a trafficControl to be
+     * updated
+     */
     private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
 
     /** The processor thread : it handles the incoming messages */
@@ -108,11 +116,14 @@
 
     private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
 
+    protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
+
     /**
-     * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}
-     * for handling I/Os events.
+     * Create an {@link AbstractPollingIoProcessor} with the given
+     * {@link Executor} for handling I/Os events.
      * 
-     * @param executor the {@link Executor} for handling I/O events
+     * @param executor
+     *            the {@link Executor} for handling I/O events
      */
     protected AbstractPollingIoProcessor(Executor executor) {
         if (executor == null) {
@@ -127,15 +138,15 @@
      * Compute the thread ID for this class instance. As we may have different
      * classes, we store the last ID number into a Map associating the class
      * name to the last assigned ID.
-     *   
-     * @return a name for the current thread, based on the class name and
-     * an incremental value, starting at 1. 
+     * 
+     * @return a name for the current thread, based on the class name and an
+     *         incremental value, starting at 1.
      */
     private String nextThreadName() {
         Class<?> cls = getClass();
         int newThreadId;
 
-        // We synchronize this block to avoid a concurrent access to 
+        // We synchronize this block to avoid a concurrent access to
         // the actomicInteger (it can be modified by another thread, while
         // being seen as null by another thread)
         synchronized (threadIds) {
@@ -192,30 +203,38 @@
     }
 
     /**
-     * Dispose the resources used by this {@link IoProcessor} for polling 
-     * the client connections
-     * @throws Exception if some low level IO error occurs
+     * Dispose the resources used by this {@link IoProcessor} for polling the
+     * client connections
+     * 
+     * @throws Exception
+     *             if some low level IO error occurs
      */
     protected abstract void dispose0() throws Exception;
 
     /**
      * poll those sessions for the given timeout
-     * @param timeout milliseconds before the call timeout if no event appear
+     * 
+     * @param timeout
+     *            milliseconds before the call timeout if no event appear
      * @return The number of session ready for read or for write
-     * @throws Exception if some low level IO error occurs
+     * @throws Exception
+     *             if some low level IO error occurs
      */
     protected abstract int select(long timeout) throws Exception;
 
     /**
      * poll those sessions forever
+     * 
      * @return The number of session ready for read or for write
-     * @throws Exception if some low level IO error occurs
+     * @throws Exception
+     *             if some low level IO error occurs
      */
     protected abstract int select() throws Exception;
 
     /**
-     * Say if the list of {@link IoSession} polled by this {@link IoProcessor} 
+     * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
      * is empty
+     * 
      * @return true if at least a session is managed by this {@link IoProcessor}
      */
     protected abstract boolean isSelectorEmpty();
@@ -227,7 +246,8 @@
 
     /**
      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
-     * {@link IoProcessor}   
+     * {@link IoProcessor}
+     * 
      * @return {@link Iterator} of {@link IoSession}
      */
     protected abstract Iterator<T> allSessions();
@@ -241,101 +261,138 @@
 
     /**
      * Get the state of a session (preparing, open, closed)
-     * @param session the {@link IoSession} to inspect
+     * 
+     * @param session
+     *            the {@link IoSession} to inspect
      * @return the state of the session
      */
     protected abstract SessionState getState(T session);
 
     /**
      * Is the session ready for writing
-     * @param session the session queried
+     * 
+     * @param session
+     *            the session queried
      * @return true is ready, false if not ready
      */
     protected abstract boolean isWritable(T session);
 
     /**
      * Is the session ready for reading
-     * @param session the session queried
+     * 
+     * @param session
+     *            the session queried
      * @return true is ready, false if not ready
      */
     protected abstract boolean isReadable(T session);
 
     /**
      * register a session for writing
-     * @param session the session registered
-     * @param isInterested true for registering, false for removing
+     * 
+     * @param session
+     *            the session registered
+     * @param isInterested
+     *            true for registering, false for removing
      */
     protected abstract void setInterestedInWrite(T session, boolean isInterested)
             throws Exception;
 
     /**
      * register a session for reading
-     * @param session the session registered
-     * @param isInterested true for registering, false for removing
+     * 
+     * @param session
+     *            the session registered
+     * @param isInterested
+     *            true for registering, false for removing
      */
     protected abstract void setInterestedInRead(T session, boolean isInterested)
             throws Exception;
 
     /**
      * is this session registered for reading
-     * @param session the session queried
+     * 
+     * @param session
+     *            the session queried
      * @return true is registered for reading
      */
     protected abstract boolean isInterestedInRead(T session);
 
     /**
      * is this session registered for writing
-     * @param session the session queried
+     * 
+     * @param session
+     *            the session queried
      * @return true is registered for writing
      */
     protected abstract boolean isInterestedInWrite(T session);
 
     /**
-     * Initialize the polling of a session. Add it to the polling process. 
-     * @param session the {@link IoSession} to add to the polling
-     * @throws Exception any exception thrown by the underlying system calls
+     * Initialize the polling of a session. Add it to the polling process.
+     * 
+     * @param session
+     *            the {@link IoSession} to add to the polling
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract void init(T session) throws Exception;
 
     /**
      * Destroy the underlying client socket handle
-     * @param session the {@link IoSession}
-     * @throws Exception any exception thrown by the underlying system calls
+     * 
+     * @param session
+     *            the {@link IoSession}
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract void destroy(T session) throws Exception;
 
     /**
-     * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}. 
-     * Is called when the session was found ready for reading.
-     * @param session the session to read
-     * @param buf the buffer to fill
+     * Reads a sequence of bytes from a {@link IoSession} into the given
+     * {@link IoBuffer}. Is called when the session was found ready for reading.
+     * 
+     * @param session
+     *            the session to read
+     * @param buf
+     *            the buffer to fill
      * @return the number of bytes read
-     * @throws Exception any exception thrown by the underlying system calls
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract int read(T session, IoBuffer buf) throws Exception;
 
     /**
-     * Write a sequence of bytes to a {@link IoSession}, means to be called when a session
-     * was found ready for writing.
-     * @param session the session to write
-     * @param buf the buffer to write
-     * @param length the number of bytes to write can be superior to the number of bytes remaining
-     * in the buffer
+     * Write a sequence of bytes to a {@link IoSession}, means to be called when
+     * a session was found ready for writing.
+     * 
+     * @param session
+     *            the session to write
+     * @param buf
+     *            the buffer to write
+     * @param length
+     *            the number of bytes to write can be superior to the number of
+     *            bytes remaining in the buffer
      * @return the number of byte written
-     * @throws Exception any exception thrown by the underlying system calls
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract int write(T session, IoBuffer buf, int length)
             throws Exception;
 
     /**
-     * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting
-     * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so 
-     * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call. 
-     * @param session the session to write
-     * @param region the file region to write
-     * @param length the length of the portion to send
+     * Write a part of a file to a {@link IoSession}, if the underlying API
+     * isn't supporting system calls like sendfile(), you can throw a
+     * {@link UnsupportedOperationException} so the file will be send using
+     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
+     * 
+     * @param session
+     *            the session to write
+     * @param region
+     *            the file region to write
+     * @param length
+     *            the length of the portion to send
      * @return the number of written bytes
-     * @throws Exception any exception thrown by the underlying system calls
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract int transferFile(T session, FileRegion region, int length)
             throws Exception;
@@ -398,7 +455,7 @@
 
     /**
      * Starts the inner Processor, asking the executor to pick a thread in its
-     * pool. The Runnable will be renamed 
+     * pool. The Runnable will be renamed
      */
     private void startupProcessor() {
         synchronized (lock) {
@@ -410,14 +467,35 @@
         }
 
         // Just stop the select() and start it again, so that the processor
-        // can be activated immediately. 
+        // can be activated immediately.
         wakeup();
     }
 
     /**
-     * Loops over the new sessions blocking queue and returns
-     * the number of sessions which are effectively created
-     *
+     * In the case we are using the java select() method, this method is used to
+     * trash the buggy selector and create a new one, registring all the sockets
+     * on it.
+     * 
+     * @throws IOException
+     *             If we got an exception
+     */
+    abstract protected void registerNewSelector() throws IOException;
+
+    /**
+     * Check that the select() has not exited immediately just because of a
+     * broken connection. In this case, this is a standard case, and we just
+     * have to loop.
+     * 
+     * @return true if a connection has been brutally closed.
+     * @throws IOException
+     *             If we got an exception
+     */
+    abstract protected boolean isBrokenConnection() throws IOException;
+
+    /**
+     * Loops over the new sessions blocking queue and returns the number of
+     * sessions which are effectively created
+     * 
      * @return The number of new sessions
      */
     private int handleNewSessions() {
@@ -432,7 +510,7 @@
             }
 
             if (addNow(session)) {
-                // A new session has been created 
+                // A new session has been created
                 addedSessions++;
             }
         }
@@ -443,7 +521,7 @@
     private boolean addNow(T session) {
         boolean registered = false;
         boolean notified = false;
-        
+
         try {
             init(session);
             registered = true;
@@ -481,7 +559,7 @@
 
     private int removeSessions() {
         int removedSessions = 0;
-        
+
         for (;;) {
             T session = removingSessions.poll();
 
@@ -491,32 +569,33 @@
             }
 
             SessionState state = getState(session);
-            
+
+            // Now deal with the removal accordingly to the session's state
             switch (state) {
-                case OPENED:
-                    if (removeNow(session)) {
-                        removedSessions++;
-                    }
-                    
-                    break;
-                    
-                case CLOSING:
-                    // Skip if channel is already closed
-                    break;
-                    
-                case OPENING:
-                    // Remove session from the newSessions queue and
-                    // remove it
-                    newSessions.remove(session);
-                    
-                    if (removeNow(session)) {
-                        removedSessions++;
-                    }
-                    
-                    break;
-                    
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
+            case OPENED:
+                // Try to remove this session
+                if (removeNow(session)) {
+                    removedSessions++;
+                }
+
+                break;
+
+            case CLOSING:
+                // Skip if channel is already closed
+                break;
+
+            case OPENING:
+                // Remove session from the newSessions queue and
+                // remove it
+                newSessions.remove(session);
+
+                if (removeNow(session)) {
+                    removedSessions++;
+                }
+                break;
+
+            default:
+                throw new IllegalStateException(String.valueOf(state));
             }
         }
     }
@@ -646,7 +725,7 @@
                     }
                 }
             }
-            
+
             if (ret < 0) {
                 scheduleRemove(session);
             }
@@ -660,7 +739,7 @@
 
                     scheduleRemove(session);
             }
-            
+
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
         }
@@ -681,40 +760,41 @@
         }
 
         T session = flushingSessions.poll(); // the same one with firstSession
-        
+
         for (;;) {
             session.setScheduledForFlush(false);
             SessionState state = getState(session);
 
             switch (state) {
-                case OPENED:
-                    try {
-                        boolean flushedAll = flushNow(session, currentTime);
-                        if (flushedAll
-                                && !session.getWriteRequestQueue().isEmpty(session)
-                                && !session.isScheduledForFlush()) {
-                            scheduleFlush(session);
-                        }
-                    } catch (Exception e) {
-                        scheduleRemove(session);
-                        IoFilterChain filterChain = session.getFilterChain();
-                        filterChain.fireExceptionCaught(e);
+            case OPENED:
+                try {
+                    boolean flushedAll = flushNow(session, currentTime);
+                    if (flushedAll
+                            && !session.getWriteRequestQueue().isEmpty(session)
+                            && !session.isScheduledForFlush()) {
+                        scheduleFlush(session);
                     }
-                    
-                    break;
-                    
-                case CLOSING:
-                    // Skip if the channel is already closed.
-                    break;
-                    
-                case OPENING:
-                    // Retry later if session is not yet fully initialized.
-                    // (In case that Session.write() is called before addSession() is processed)
-                    scheduleFlush(session);
-                    return;
-                    
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
+                } catch (Exception e) {
+                    scheduleRemove(session);
+                    IoFilterChain filterChain = session.getFilterChain();
+                    filterChain.fireExceptionCaught(e);
+                }
+
+                break;
+
+            case CLOSING:
+                // Skip if the channel is already closed.
+                break;
+
+            case OPENING:
+                // Retry later if session is not yet fully initialized.
+                // (In case that Session.write() is called before addSession()
+                // is processed)
+                scheduleFlush(session);
+                return;
+
+            default:
+                throw new IllegalStateException(String.valueOf(state));
             }
 
             session = flushingSessions.peek();
@@ -738,7 +818,7 @@
                 .getWriteRequestQueue();
 
         // Set limitation for the number of written bytes for read-write
-        // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
+        // fairness. I used maxReadBufferSize * 3 / 2, which yields best
         // performance in my experience while not breaking fairness much.
         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
@@ -766,7 +846,7 @@
                             currentTime);
                     if (localWrittenBytes > 0
                             && ((IoBuffer) message).hasRemaining()) {
-                        // the buffer isn't empty, we re-interest it in writing 
+                        // the buffer isn't empty, we re-interest it in writing
                         writtenBytes += localWrittenBytes;
                         setInterestedInWrite(session, true);
                         return false;
@@ -776,8 +856,10 @@
                             hasFragmentation, maxWrittenBytes - writtenBytes,
                             currentTime);
 
-                    // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
-                    // If there's still data to be written in the FileRegion, return 0 indicating that we need
+                    // Fix for Java bug on Linux
+                    // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
+                    // If there's still data to be written in the FileRegion,
+                    // return 0 indicating that we need
                     // to pause until writing may resume.
                     if (localWrittenBytes > 0
                             && ((FileRegion) message).getRemainingBytes() > 0) {
@@ -807,9 +889,9 @@
                 }
             } while (writtenBytes < maxWrittenBytes);
         } catch (Exception e) {
-        	if (req != null) {
-        		req.getFuture().setException(e);
-        	}
+            if (req != null) {
+                req.getFuture().setException(e);
+            }
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
             return false;
@@ -884,43 +966,42 @@
     }
 
     /**
-     * Update the trafficControl for all the session which has
-     * just been opened. 
+     * Update the trafficControl for all the session which has just been opened.
      */
     private void updateTrafficMask() {
         int queueSize = trafficControllingSessions.size();
-        
+
         while (queueSize > 0) {
             T session = trafficControllingSessions.poll();
 
             if (session == null) {
+                // We are done with this queue.
                 return;
             }
 
             SessionState state = getState(session);
-            
+
             switch (state) {
-                case OPENED:
-                    updateTrafficControl(session);
-                    break;
-                    
-                case CLOSING:
-                    break;
-                    
-                case OPENING:
-                    // Retry later if session is not yet fully initialized.
-                    // (In case that Session.suspend??() or session.resume??() is
-                    // called before addSession() is processed)
-                    // We just put back the session at the end of the queue.
-                    trafficControllingSessions.add(session);
-                    
-                    break;
-                    
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
+            case OPENED:
+                updateTrafficControl(session);
+                break;
+
+            case CLOSING:
+                break;
+
+            case OPENING:
+                // Retry later if session is not yet fully initialized.
+                // (In case that Session.suspend??() or session.resume??() is
+                // called before addSession() is processed)
+                // We just put back the session at the end of the queue.
+                trafficControllingSessions.add(session);
+                break;
+
+            default:
+                throw new IllegalStateException(String.valueOf(state));
             }
-            
-            // As we have handled one session, decrement the number of 
+
+            // As we have handled one session, decrement the number of
             // remaining sessions.
             queueSize--;
         }
@@ -930,17 +1011,18 @@
      * {@inheritDoc}
      */
     public void updateTrafficControl(T session) {
+        // 
         try {
             setInterestedInRead(session, !session.isReadSuspended());
         } catch (Exception e) {
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
         }
-        
+
         try {
-            setInterestedInWrite(session, 
-                !session.getWriteRequestQueue().isEmpty(session) && 
-                !session.isWriteSuspended());
+            setInterestedInWrite(session, !session.getWriteRequestQueue()
+                    .isEmpty(session)
+                    && !session.isWriteSuspended());
         } catch (Exception e) {
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
@@ -958,7 +1040,58 @@
                     // idle session when we get out of the select every
                     // second. (note : this is a hack to avoid creating
                     // a dedicated thread).
+                    long t0 = System.currentTimeMillis();
                     int selected = select(SELECT_TIMEOUT);
+                    long t1 = System.currentTimeMillis();
+                    long delta = (t1 - t0);
+
+                    synchronized (wakeupCalled) {
+
+                        if (selected == 0) {
+                            if (!wakeupCalled.get()) {
+                                if (delta < 100) {
+                                    // Last chance : the select() may have been
+                                    // interrupted
+                                    // because we have had an closed channel.
+                                    if (isBrokenConnection()) {
+                                        // we can reselect immediately
+                                        continue;
+                                    } else {
+                                        LOG
+                                                .warn("Create a new selector. Selected is 0, delta = "
+                                                        + (t1 - t0));
+                                        // Ok, we are hit by the nasty epoll
+                                        // spinning.
+                                        // Basically, there is a race condition
+                                        // which cause
+                                        // a closing file descriptor not to be
+                                        // considered as
+                                        // available as a selected channel, but
+                                        // it stopped
+                                        // the select. The next time we will
+                                        // call select(),
+                                        // it will exit immediately for the same
+                                        // reason,
+                                        // and do so forever, consuming 100%
+                                        // CPU.
+                                        // We have to destroy the selector, and
+                                        // register all
+                                        // the socket on a new one.
+                                        registerNewSelector();
+                                    }
+
+                                    // and continue the loop
+                                    continue;
+                                }
+                            } else {
+                                // System.out.println("Waited one second");
+                            }
+                        } else {
+                            // System.out.println("Nb selected : " + selected);
+                        }
+
+                        wakeupCalled.getAndSet(false);
+                    }
 
                     nSessions += handleNewSessions();
                     updateTrafficMask();
@@ -966,6 +1099,7 @@
                     // Now, if we have had some incoming or outgoing events,
                     // deal with them
                     if (selected > 0) {
+                        // System.out.println( "Proccessing ...");
                         process();
                     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Wed Dec  9 15:38:14 2009
@@ -21,9 +21,11 @@
 
 import java.io.IOException;
 import java.nio.channels.ByteChannel;
+import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -41,12 +43,12 @@
  */
 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
     /** The selector associated with this processor */
-    private final Selector selector;
+    private volatile Selector selector;
 
     /**
      * 
      * Creates a new instance of NioProcessor.
-     *
+     * 
      * @param executor
      */
     public NioProcessor(Executor executor) {
@@ -81,7 +83,10 @@
 
     @Override
     protected void wakeup() {
-        selector.wakeup();
+        synchronized (wakeupCalled) {
+            wakeupCalled.getAndSet(true);
+            selector.wakeup();
+        }
     }
 
     @Override
@@ -99,7 +104,8 @@
     protected void init(NioSession session) throws Exception {
         SelectableChannel ch = (SelectableChannel) session.getChannel();
         ch.configureBlocking(false);
-        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
+        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
+                session));
     }
 
     @Override
@@ -113,21 +119,76 @@
     }
 
     /**
+     * In the case we are using the java select() method, this method is used to
+     * trash the buggy selector and create a new one, registering all the
+     * sockets on it.
+     */
+    protected void registerNewSelector() throws IOException {
+        synchronized (selector) {
+            Set<SelectionKey> keys = selector.keys();
+
+            // Open a new selector
+            Selector newSelector = Selector.open();
+
+            for (SelectionKey key : keys) {
+                SelectableChannel ch = key.channel();
+                ch.register(newSelector, key.interestOps());
+            }
+
+            selector.close();
+            selector = newSelector;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected boolean isBrokenConnection() throws IOException {
+        // A flag set to true if we find a broken session
+        boolean brokenSession = false;
+
+        synchronized (selector) {
+            // Get the selector keys
+            Set<SelectionKey> keys = selector.keys();
+
+            // Loop on all the keys to see if one of them
+            // has a closed channel
+            for (SelectionKey key : keys) {
+                SelectableChannel channel = key.channel();
+
+                if ((((channel instanceof DatagramChannel) && ((DatagramChannel) channel)
+                        .isConnected()))
+                        || ((channel instanceof SocketChannel) && ((SocketChannel) channel)
+                                .isConnected())) {
+                    // The channel is not connected anymore. Cancel
+                    // the associated key then.
+                    key.cancel();
+
+                    // Set the flag to true to avoid a selector switch
+                    brokenSession = true;
+                }
+            }
+        }
+
+        return brokenSession;
+    }
+
+    /**
      * {@inheritDoc}
      */
     @Override
     protected SessionState getState(NioSession session) {
         SelectionKey key = session.getSelectionKey();
-        
+
         if (key == null) {
-            // The channel is not yet registered to a selector
+            // The channel is not yet registred to a selector
             return SessionState.OPENING;
         }
 
         if (key.isValid()) {
-            // The session is opened
+            // The session is oepened
             return SessionState.OPENED;
-        } else { 
+        } else {
             // The session still as to be closed
             return SessionState.CLOSING;
         }
@@ -154,24 +215,26 @@
     @Override
     protected boolean isInterestedInWrite(NioSession session) {
         SelectionKey key = session.getSelectionKey();
-        return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
+        return key.isValid()
+                && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
+    protected void setInterestedInRead(NioSession session, boolean isInterested)
+            throws Exception {
         SelectionKey key = session.getSelectionKey();
         int oldInterestOps = key.interestOps();
         int newInterestOps = oldInterestOps;
-        
+
         if (isInterested) {
             newInterestOps |= SelectionKey.OP_READ;
         } else {
             newInterestOps &= ~SelectionKey.OP_READ;
         }
-        
+
         if (oldInterestOps != newInterestOps) {
             key.interestOps(newInterestOps);
         }
@@ -181,17 +244,18 @@
      * {@inheritDoc}
      */
     @Override
-    protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
+    protected void setInterestedInWrite(NioSession session, boolean isInterested)
+            throws Exception {
         SelectionKey key = session.getSelectionKey();
         int oldInterestOps = key.interestOps();
         int newInterestOps = oldInterestOps;
-        
+
         if (isInterested) {
             newInterestOps |= SelectionKey.OP_WRITE;
         } else {
             newInterestOps &= ~SelectionKey.OP_WRITE;
         }
-        
+
         if (oldInterestOps != newInterestOps) {
             key.interestOps(newInterestOps);
         }
@@ -203,11 +267,12 @@
     }
 
     @Override
-    protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
+    protected int write(NioSession session, IoBuffer buf, int length)
+            throws Exception {
         if (buf.remaining() <= length) {
             return session.getChannel().write(buf.buf());
         }
-        
+
         int oldLimit = buf.limit();
         buf.limit(buf.position() + length);
         try {
@@ -218,9 +283,11 @@
     }
 
     @Override
-    protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
+    protected int transferFile(NioSession session, FileRegion region, int length)
+            throws Exception {
         try {
-            return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
+            return (int) region.getFileChannel().transferTo(
+                    region.getPosition(), length, session.getChannel());
         } catch (IOException e) {
             // Check to see if the IOException is being thrown due to
             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
@@ -228,27 +295,29 @@
             if (message != null && message.contains("temporarily unavailable")) {
                 return 0;
             }
-            
+
             throw e;
         }
     }
 
     /**
-     * An encapsulating iterator around the  {@link Selector#selectedKeys()} 
-     * or the {@link Selector#keys()} iterator;
+     * An encapsulating iterator around the {@link Selector#selectedKeys()} or
+     * the {@link Selector#keys()} iterator;
      */
-    protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
+    protected static class IoSessionIterator<NioSession> implements
+            Iterator<NioSession> {
         private final Iterator<SelectionKey> iterator;
-        
+
         /**
-         * Create this iterator as a wrapper on top of the selectionKey
-         * Set.
-         * @param keys The set of selected sessions
+         * Create this iterator as a wrapper on top of the selectionKey Set.
+         * 
+         * @param keys
+         *            The set of selected sessions
          */
         private IoSessionIterator(Set<SelectionKey> keys) {
             iterator = keys.iterator();
         }
-        
+
         /**
          * {@inheritDoc}
          */
@@ -261,7 +330,7 @@
          */
         public NioSession next() {
             SelectionKey key = iterator.next();
-            NioSession nioSession =  (NioSession) key.attachment();
+            NioSession nioSession = (NioSession) key.attachment();
             return nioSession;
         }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java Wed Dec  9 15:38:14 2009
@@ -19,14 +19,15 @@
  */
 package org.apache.mina.transport.socket.nio;
 
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.NoRouteToHostException;
 import java.util.Iterator;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
-import junit.framework.TestCase;
-
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.file.FileRegion;
 import org.apache.mina.core.future.ConnectFuture;
@@ -38,14 +39,17 @@
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.core.session.SessionState;
 import org.apache.mina.util.AvailablePortFinder;
+import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * Tests non regression on issue DIRMINA-632.
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public class PollingIoProcessorTest extends TestCase {
-
+public class PollingIoProcessorTest {
+    @Ignore
+    @Test
     public void testExceptionOnWrite() throws Exception {
         final Executor ex = Executors.newFixedThreadPool(1);
 
@@ -133,8 +137,7 @@
                     }
 
                     @Override
-                    protected SessionState getState(
-                            NioSession session) {
+                    protected SessionState getState(NioSession session) {
                         return proc.getState(session);
                     }
 
@@ -156,6 +159,16 @@
                                 "No Route To Host Test");
                     }
 
+                    @Override
+                    protected boolean isBrokenConnection() throws IOException {
+                        return proc.isBrokenConnection();
+                    }
+
+                    @Override
+                    protected void registerNewSelector() throws IOException {
+                        proc.registerNewSelector();
+                    }
+
                 });
         connector.setHandler(new IoHandlerAdapter());
 
@@ -169,7 +182,8 @@
         ConnectFuture future = connector.connect(addr);
         future.awaitUninterruptibly();
         IoSession session = future.getSession();
-        WriteFuture wf = session.write(IoBuffer.allocate(1)).awaitUninterruptibly();
+        WriteFuture wf = session.write(IoBuffer.allocate(1))
+                .awaitUninterruptibly();
         assertNotNull(wf.getException());
 
         connector.dispose();

Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=888842&r1=888841&r2=888842&view=diff
==============================================================================
--- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original)
+++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Wed Dec  9 15:38:14 2009
@@ -39,15 +39,15 @@
 import org.apache.tomcat.jni.Status;
 
 /**
- * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
- *
+ * The class in charge of processing socket level IO events for the
+ * {@link AprSocketConnector}
+ * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
     private static final int POLLSET_SIZE = 1024;
 
-    private final Map<Long, AprSession> allSessions =
-        new HashMap<Long, AprSession>(POLLSET_SIZE);
+    private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
 
     private final Object wakeupLock = new Object();
     private final long wakeupSocket;
@@ -57,14 +57,14 @@
     private final long bufferPool; // memory pool
     private final long pollset; // socket poller
     private final long[] polledSockets = new long[POLLSET_SIZE << 1];
-    private final List<AprSession> polledSessions =
-        new CircularQueue<AprSession>(POLLSET_SIZE);
+    private final List<AprSession> polledSessions = new CircularQueue<AprSession>(POLLSET_SIZE);
 
     /**
-     * Create a new instance of {@link AprIoProcessor} with a given Exector for 
+     * Create a new instance of {@link AprIoProcessor} with a given Exector for
      * handling I/Os events.
      * 
-     * @param executor the {@link Executor} for handling I/O events
+     * @param executor
+     *            the {@link Executor} for handling I/O events
      */
     public AprIoProcessor(Executor executor) {
         super(executor);
@@ -74,8 +74,7 @@
         bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
 
         try {
-            wakeupSocket = Socket.create(
-                    Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
+            wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
         } catch (RuntimeException e) {
             throw e;
         } catch (Error e) {
@@ -87,25 +86,16 @@
         boolean success = false;
         long newPollset;
         try {
-            newPollset = Poll.create(
-                    POLLSET_SIZE,
-                    pool,
-                    Poll.APR_POLLSET_THREADSAFE,
-                    Long.MAX_VALUE);
+            newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
 
             if (newPollset == 0) {
-                newPollset = Poll.create(
-                        62,
-                        pool,
-                        Poll.APR_POLLSET_THREADSAFE,
-                        Long.MAX_VALUE);
+                newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
             }
 
             pollset = newPollset;
             if (pollset < 0) {
-                if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
-                    throw new RuntimeIoException(
-                            "Thread-safe pollset is not supported in this platform.");
+                if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
+                    throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
                 }
             }
             success = true;
@@ -144,7 +134,7 @@
     /**
      * {@inheritDoc}
      */
-     @Override
+    @Override
     protected int select(long timeout) throws Exception {
         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
         if (rv <= 0) {
@@ -154,15 +144,15 @@
 
             rv = Poll.maintain(pollset, polledSockets, true);
             if (rv > 0) {
-                for (int i = 0; i < rv; i ++) {
+                for (int i = 0; i < rv; i++) {
                     long socket = polledSockets[i];
                     AprSession session = allSessions.get(socket);
                     if (session == null) {
                         continue;
                     }
 
-                    int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
-                               (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
+                    int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
+                            | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
 
                     Poll.add(pollset, socket, flag);
                 }
@@ -176,7 +166,7 @@
             if (!polledSessions.isEmpty()) {
                 polledSessions.clear();
             }
-            for (int i = 0; i < rv; i ++) {
+            for (int i = 0; i < rv; i++) {
                 long flag = polledSockets[i];
                 long socket = polledSockets[++i];
                 if (socket == wakeupSocket) {
@@ -201,7 +191,7 @@
         }
     }
 
-     /**
+    /**
      * {@inheritDoc}
      */
     @Override
@@ -273,12 +263,12 @@
             }
         } finally {
             ret = Socket.close(session.getDescriptor());
-            
-            // destroying the session because it won't be reused 
+
+            // destroying the session because it won't be reused
             // after this point
             Socket.destroy(session.getDescriptor());
             session.setDescriptor(0);
-            
+
             if (ret != Status.APR_SUCCESS) {
                 throwException(ret);
             }
@@ -291,7 +281,7 @@
     @Override
     protected SessionState getState(AprSession session) {
         long socket = session.getDescriptor();
-        
+
         if (socket != 0) {
             return SessionState.OPENED;
         } else if (allSessions.get(socket) != null) {
@@ -343,14 +333,15 @@
         }
 
         int rv = Poll.remove(pollset, session.getDescriptor());
+
         if (rv != Status.APR_SUCCESS) {
             throwException(rv);
         }
 
-        int flags = (isInterested ? Poll.APR_POLLIN : 0)
-                | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
+        int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
 
         rv = Poll.add(pollset, session.getDescriptor(), flags);
+
         if (rv == Status.APR_SUCCESS) {
             session.setInterestedInRead(isInterested);
         } else {
@@ -368,14 +359,15 @@
         }
 
         int rv = Poll.remove(pollset, session.getDescriptor());
+
         if (rv != Status.APR_SUCCESS) {
             throwException(rv);
         }
 
-        int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
-                | (isInterested ? Poll.APR_POLLOUT : 0);
+        int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
 
         rv = Poll.add(pollset, session.getDescriptor(), flags);
+
         if (rv == Status.APR_SUCCESS) {
             session.setInterestedInWrite(isInterested);
         } else {
@@ -392,11 +384,10 @@
         int capacity = buffer.remaining();
         // Using Socket.recv() directly causes memory leak. :-(
         ByteBuffer b = Pool.alloc(bufferPool, capacity);
-        
+
         try {
-            bytes = Socket.recvb(
-                    session.getDescriptor(), b, 0, capacity);
-            
+            bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
+
             if (bytes > 0) {
                 b.position(0);
                 b.limit(bytes);
@@ -413,7 +404,7 @@
         } finally {
             Pool.clear(bufferPool);
         }
-        
+
         return bytes;
     }
 
@@ -424,11 +415,9 @@
     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
         int writtenBytes;
         if (buf.isDirect()) {
-            writtenBytes = Socket.sendb(
-                    session.getDescriptor(), buf.buf(), buf.position(), length);
+            writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
         } else {
-            writtenBytes = Socket.send(
-                    session.getDescriptor(), buf.array(), buf.position(), length);
+            writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
             if (writtenBytes > 0) {
                 buf.skip(writtenBytes);
             }
@@ -450,14 +439,26 @@
      * {@inheritDoc}
      */
     @Override
-    protected int transferFile(AprSession session, FileRegion region, int length)
-            throws Exception {
+    protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
         throw new UnsupportedOperationException();
     }
 
     private void throwException(int code) throws IOException {
-        throw new IOException(
-                org.apache.tomcat.jni.Error.strerror(-code) +
-                " (code: " + code + ")");
+        throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected void registerNewSelector() {
+        // Do nothing
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected boolean isBrokenConnection() throws IOException {
+        // Here, we assume that this is the case.
+        return true;
     }
 }
\ No newline at end of file