You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/03/21 13:10:51 UTC

svn commit: r639601 - in /mina/trunk: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/transport/socket/nio/ transport-apr/src/main/java/org/apache/mina/transport/socket/apr/

Author: trustin
Date: Fri Mar 21 05:10:25 2008
New Revision: 639601

URL: http://svn.apache.org/viewvc?rev=639601&view=rev
Log:
Resolved issue: DIRMINA-514 (Session closing problem on Mac OS X)
* Applied Rob Butler's patch, which will fix this problem hopefully


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
    mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java?rev=639601&r1=639600&r2=639601&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java Fri Mar 21 05:10:25 2008
@@ -48,10 +48,10 @@
      * It improves memory utilization and write throughput significantly.
      */
     private static final int WRITE_SPIN_COUNT = 256;
-    
+
     private static final Map<Class<?>, AtomicInteger> threadIds =
         new CopyOnWriteMap<Class<?>, AtomicInteger>();
-    
+
     private final Object lock = new Object();
     private final String threadName;
     private final Executor executor;
@@ -74,11 +74,11 @@
         if (executor == null) {
             throw new NullPointerException("executor");
         }
-        
+
         this.threadName = nextThreadName();
         this.executor = executor;
     }
-    
+
     private String nextThreadName() {
         Class<?> cls = getClass();
         AtomicInteger threadId = threadIds.get(cls);
@@ -89,18 +89,18 @@
         } else {
             newThreadId = threadId.incrementAndGet();
         }
-        
+
         return cls.getSimpleName() + '-' + newThreadId;
     }
-    
+
     public final boolean isDisposing() {
         return disposing;
     }
-    
+
     public final boolean isDisposed() {
         return disposed;
     }
-    
+
     public final void dispose() {
         if (disposed) {
             return;
@@ -112,11 +112,11 @@
                 startupWorker();
             }
         }
-        
+
         disposalFuture.awaitUninterruptibly();
         disposed = true;
     }
-    
+
     protected abstract void dispose0() throws Exception;
 
     /**
@@ -126,6 +126,7 @@
      * @throws Exception if some low level IO error occurs
      */
     protected abstract boolean select(int timeout) throws Exception;
+    protected abstract boolean isSelectorEmpty();
     protected abstract void wakeup();
     protected abstract Iterator<T> allSessions();
     protected abstract Iterator<T> selectedSessions();
@@ -248,7 +249,7 @@
                 break;
             }
 
-            
+
             if (addNow(session)) {
                 addedSessions ++;
             }
@@ -344,14 +345,14 @@
     private void clearWriteRequestQueue(T session) {
         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
-        
+
         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
-    
+
         if ((req = writeRequestQueue.poll(session)) != null) {
             Object m = req.getMessage();
             if (m instanceof IoBuffer) {
                 IoBuffer buf = (IoBuffer) req.getMessage();
-    
+
                 // The first unwritten empty buffer must be
                 // forwarded to the filter chain.
                 if (buf.hasRemaining()) {
@@ -363,13 +364,13 @@
             } else {
                 failedRequests.add(req);
             }
-    
+
             // Discard others.
             while ((req = writeRequestQueue.poll(session)) != null) {
                 failedRequests.add(req);
             }
         }
-        
+
         // Create an exception and notify.
         if (!failedRequests.isEmpty()) {
             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
@@ -501,12 +502,12 @@
             scheduleRemove(session);
             return false;
         }
-        
-        final boolean hasFragmentation = 
+
+        final boolean hasFragmentation =
             session.getTransportMetadata().hasFragmentation();
 
         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-        
+
         // Set limitation for the number of written bytes for read-write
         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
         // performance in my experience while not breaking fairness much.
@@ -526,7 +527,7 @@
                     }
                     session.setCurrentWriteRequest(req);
                 }
-    
+
                 int localWrittenBytes = 0;
                 Object message = req.getMessage();
                 if (message instanceof IoBuffer) {
@@ -537,7 +538,7 @@
                     localWrittenBytes = writeFile(
                             session, req, hasFragmentation,
                             maxWrittenBytes - writtenBytes);
-                    
+
                     // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
                     // If there's still data to be written in the FileRegion, return 0 indicating that we need
                     // to pause until writing may resume.
@@ -549,9 +550,9 @@
                 } else {
                     throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
                 }
-                
+
                 writtenBytes += localWrittenBytes;
-    
+
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much.
                     setInterestedInWrite(session, true);
@@ -695,13 +696,13 @@
 
                     if (nSessions == 0) {
                         synchronized (lock) {
-                            if (newSessions.isEmpty()) {
+                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                 worker = null;
                                 break;
                             }
                         }
                     }
-                    
+
                     // Disconnect all sessions immediately if disposal has been
                     // requested so that we exit this loop eventually.
                     if (isDisposing()) {
@@ -720,7 +721,7 @@
                     }
                 }
             }
-            
+
             workerThread = null;
             if (isDisposing()) {
                 try {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=639601&r1=639600&r2=639601&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Fri Mar 21 05:10:25 2008
@@ -47,7 +47,7 @@
             throw new RuntimeIoException("Failed to open a selector.", e);
         }
     }
-    
+
     private final Selector selector;
 
     public NioProcessor(Executor executor) {
@@ -66,6 +66,11 @@
     }
 
     @Override
+    protected boolean isSelectorEmpty() {
+        return selector.keys().isEmpty();
+    }
+
+    @Override
     protected void wakeup() {
         selector.wakeup();
     }
@@ -198,7 +203,7 @@
     protected static class IoSessionIterator implements Iterator<NioSession> {
         private final Iterator<SelectionKey> i;
         private IoSessionIterator(Set<SelectionKey> keys) {
-            i = keys.iterator(); 
+            i = keys.iterator();
         }
         public boolean hasNext() {
             return i.hasNext();

Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=639601&r1=639600&r2=639601&view=diff
==============================================================================
--- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original)
+++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Fri Mar 21 05:10:25 2008
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.apr;
 
@@ -39,17 +39,17 @@
 
 /**
  * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
- * 
+ *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
 
 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
     private static final int POLLSET_SIZE = 1024;
-    
+
     private final Map<Long, AprSession> allSessions =
         new HashMap<Long, AprSession>(POLLSET_SIZE);
-    
+
     private final Object wakeupLock = new Object();
     private long wakeupSocket;
     private volatile boolean toBeWakenUp;
@@ -62,7 +62,7 @@
 
     public AprIoProcessor(Executor executor) {
         super(executor);
-        
+
         try {
             wakeupSocket = Socket.create(
                     Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
@@ -77,7 +77,7 @@
 
         // initialize a memory pool for APR functions
         bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
-        
+
         boolean success = false;
         long newPollset;
         try {
@@ -94,7 +94,7 @@
                         Poll.APR_POLLSET_THREADSAFE,
                         Long.MAX_VALUE);
             }
-            
+
             pollset = newPollset;
             if (pollset < 0) {
                 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
@@ -130,7 +130,7 @@
             if (rv != -120001) {
                 throwException(rv);
             }
-            
+
             rv = Poll.maintain(pollset, polledSockets, true);
             if (rv > 0) {
                 for (int i = 0; i < rv; i ++) {
@@ -139,7 +139,7 @@
                     if (session == null) {
                         continue;
                     }
-                    
+
                     int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
                                (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
 
@@ -169,23 +169,28 @@
                 if (session == null) {
                     continue;
                 }
-                
+
                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
-                
+
                 polledSessions.add(session);
             }
-            
+
             return !polledSessions.isEmpty();
         }
     }
 
     @Override
+    protected boolean isSelectorEmpty() {
+        return allSessions.isEmpty();
+    }
+
+    @Override
     protected void wakeup() {
         if (toBeWakenUp) {
             return;
         }
-        
+
         // Add a dummy socket to the pollset.
         synchronized (wakeupLock) {
             toBeWakenUp = true;
@@ -208,12 +213,12 @@
         long s = session.getDescriptor();
         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
         Socket.timeoutSet(s, 0);
-        
+
         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
         if (rv != Status.APR_SUCCESS) {
             throwException(rv);
         }
-        
+
         session.setInterestedInRead(true);
         allSessions.put(s, session);
     }
@@ -342,7 +347,7 @@
                 buf.skip(writtenBytes);
             }
         }
-        
+
         if (writtenBytes < 0) {
             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
                 writtenBytes = 0;