You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/17 08:43:05 UTC

svn commit: r576271 - in /mina/trunk: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/transport/socket/nio/ core/src/main/java/org/apache/mina/transport/vmpipe/ core/src/main/java/org/apache/mina/util/ core/src/test/java/o...

Author: trustin
Date: Sun Sep 16 23:43:04 2007
New Revision: 576271

URL: http://svn.apache.org/viewvc?rev=576271&view=rev
Log:
* Removed IoSession parameter from IoFilterChain because it's redundant
* Removed WriteRequest parameter from IoProcessor.flush because it's unnecesary
* Multi-thread support for both DatagramAcceptor and DatagramConnector (need to test on various platforms)
* Removed IoSessionRecycler because all datagram sessions are now connected datagram socket.


Removed:
    mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringIoSessionRecycler.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
    mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
    mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
    mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
    mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java
    mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Sun Sep 16 23:43:04 2007
@@ -34,8 +34,6 @@
  *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
- *
- * TODO Provide abstraction for bind, unbind and connect (+ cancellation)
  */
 public abstract class AbstractIoProcessor implements IoProcessor {
 
@@ -96,7 +94,7 @@
         startupWorker();
     }
 
-    public void flush(IoSession session, WriteRequest writeRequest) {
+    public void flush(IoSession session) {
         boolean needsWakeup = flushingSessions.isEmpty();
         if (scheduleFlush((AbstractIoSession) session) && needsWakeup) {
             wakeup();
@@ -160,7 +158,7 @@
                 if (notified) {
                     // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
                     // and call ConnectFuture.setException().
-                    session.getFilterChain().fireExceptionCaught(session, e);
+                    session.getFilterChain().fireExceptionCaught(e);
                     scheduleRemove(session);
                     wakeup();
                 } else {
@@ -193,7 +191,7 @@
                     doRemove(session);
                     removedSessions ++;
                 } catch (Exception e) {
-                    session.getFilterChain().fireExceptionCaught(session, e);
+                    session.getFilterChain().fireExceptionCaught(e);
                 } finally {
                     clearWriteRequestQueue(session);
                     ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
@@ -256,10 +254,8 @@
                 buf.flip();
             }
 
-            session.increaseReadBytes(readBytes);
-
             if (readBytes > 0) {
-                session.getFilterChain().fireMessageReceived(session, buf);
+                session.getFilterChain().fireMessageReceived(buf);
                 buf = null;
 
                 if (session.getTransportMetadata().hasFragmentation()) {
@@ -282,9 +278,9 @@
             }
         } catch (IOException e) {
             scheduleRemove(session);
-            session.getFilterChain().fireExceptionCaught(session, e);
+            session.getFilterChain().fireExceptionCaught(e);
         } catch (Throwable e) {
-            session.getFilterChain().fireExceptionCaught(session, e);
+            session.getFilterChain().fireExceptionCaught(e);
         }
     }
 
@@ -298,7 +294,7 @@
                 try {
                     notifyIdleness(session, currentTime);
                 } catch (Exception e) {
-                    session.getFilterChain().fireExceptionCaught(session, e);
+                    session.getFilterChain().fireExceptionCaught(e);
                 }
             }
         }
@@ -327,7 +323,7 @@
         if (idleTime > 0 && lastIoTime != 0
                 && currentTime - lastIoTime >= idleTime) {
             session.increaseIdleCount(status);
-            session.getFilterChain().fireSessionIdle(session, status);
+            session.getFilterChain().fireSessionIdle(status);
         }
     }
 
@@ -335,8 +331,7 @@
                                     long currentTime, long writeTimeout, long lastIoTime) throws Exception {
         if (writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout
                 && (interestOps(session) & SelectionKey.OP_WRITE) != 0) {
-            session.getFilterChain().fireExceptionCaught(session,
-                    new WriteTimeoutException());
+            session.getFilterChain().fireExceptionCaught(new WriteTimeoutException());
         }
     }
 
@@ -369,7 +364,7 @@
                     }
                 } catch (Exception e) {
                     scheduleRemove(session);
-                    session.getFilterChain().fireExceptionCaught(session, e);
+                    session.getFilterChain().fireExceptionCaught(e);
                 }
                 break;
             case CLOSED:
@@ -400,7 +395,7 @@
                 if (buf.hasRemaining()) {
                     req.getFuture().setWritten(false);
                 } else {
-                    session.getFilterChain().fireMessageSent(session, req);
+                    session.getFilterChain().fireMessageSent(req);
                 }
             } else {
                 req.getFuture().setWritten(false);
@@ -424,66 +419,58 @@
         int maxWrittenBytes = session.getConfig().getMaxReadBufferSize();
         int writtenBytes = 0;
 
-        try {
-            do {
-                // Check for pending writes.
-                WriteRequest req = writeRequestQueue.peek();
-
-                if (req == null) {
-                    break;
-                }
-
-                Object message = req.getMessage();
-                if (message instanceof FileRegion) {
-                    FileRegion region = (FileRegion) message;
-
-                    if (region.getCount() <= 0) {
-                        // File has been sent, remove from queue
-                        writeRequestQueue.poll();
-                        session.increaseWrittenMessages();
-                        session.getFilterChain().fireMessageSent(session, req);
-                        continue;
-                    }
+        do {
+            // Check for pending writes.
+            WriteRequest req = writeRequestQueue.peek();
 
-                    if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
-                        long localWrittenBytes = transferFile(session, region);
-                        region.setPosition(region.getPosition() + localWrittenBytes);
-                        writtenBytes += localWrittenBytes;
-                    }
+            if (req == null) {
+                break;
+            }
 
-                    if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
-                        // Kernel buffer is full or wrote too much.
-                        interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
-                        return false;
-                    }
+            Object message = req.getMessage();
+            if (message instanceof FileRegion) {
+                FileRegion region = (FileRegion) message;
+
+                if (region.getCount() <= 0) {
+                    // File has been sent, remove from queue
+                    writeRequestQueue.poll();
+                    session.getFilterChain().fireMessageSent(req);
+                    continue;
+                }
 
-                } else {
-                    ByteBuffer buf = (ByteBuffer) message;
-                    if (buf.remaining() == 0) {
-                        // Buffer has been completely sent, remove request form queue
-                        writeRequestQueue.poll();
-
-                        session.increaseWrittenMessages();
-
-                        buf.reset();
-                        session.getFilterChain().fireMessageSent(session, req);
-                        continue;
-                    }
+                if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+                    long localWrittenBytes = transferFile(session, region);
+                    region.setPosition(region.getPosition() + localWrittenBytes);
+                    writtenBytes += localWrittenBytes;
+                }
 
-                    if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
-                        writtenBytes += write(session, buf);
-                    }
+                if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much.
+                    interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+                    return false;
+                }
 
-                    if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
-                        // Kernel buffer is full or wrote too much.
-                        interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
-                        return false;
-                    }
+            } else {
+                ByteBuffer buf = (ByteBuffer) message;
+                if (buf.remaining() == 0) {
+                    // Buffer has been completely sent, remove request form queue
+                    writeRequestQueue.poll();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(req);
+                    continue;
                 }
-            } while (writtenBytes < maxWrittenBytes);
-        } finally {
-            session.increaseWrittenBytes(writtenBytes);
-        }
+
+                if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+                    writtenBytes += write(session, buf);
+                }
+
+                if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much.
+                    interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+                    return false;
+                }
+            }
+        } while (writtenBytes < maxWrittenBytes);
 
         return true;
     }
@@ -511,7 +498,7 @@
                 try {
                     interestOps(session, ops & mask);
                 } catch (Exception e) {
-                    session.getFilterChain().fireExceptionCaught(session, e);
+                    session.getFilterChain().fireExceptionCaught(e);
                 }
                 break;
             case CLOSED:

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Sun Sep 16 23:43:04 2007
@@ -146,7 +146,7 @@
             }
         }
 
-        getFilterChain().fireFilterClose(this);
+        getFilterChain().fireFilterClose();
         return closeFuture;
     }
 
@@ -188,7 +188,7 @@
 
         WriteFuture future = new DefaultWriteFuture(this);
         getFilterChain().fireFilterWrite(
-                this, new DefaultWriteRequest(message, future, remoteAddress));
+                new DefaultWriteRequest(message, future, remoteAddress));
 
         if (message instanceof File) {
             final FileChannel finalChannel = channel;

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Sun Sep 16 23:43:04 2007
@@ -39,14 +39,14 @@
     /**
      * A session attribute that stores a {@link ConnectFuture} related with
      * the {@link IoSession}.  {@link DefaultIoFilterChain} clears this
-     * attribute and notifies the future when {@link #fireSessionOpened(IoSession)}
-     * or {@link #fireExceptionCaught(IoSession, Throwable)} is invoked
+     * attribute and notifies the future when {@link #fireSessionOpened()}
+     * or {@link #fireExceptionCaught(Throwable)} is invoked
      */
     public static final String CONNECT_FUTURE = DefaultIoFilterChain.class
             .getName()
             + ".connectFuture";
 
-    private final IoSession session;
+    private final AbstractIoSession session;
 
     private final Map<String, Entry> name2entry = new HashMap<String, Entry>();
 
@@ -271,7 +271,7 @@
         }
     }
 
-    public void fireSessionCreated(IoSession session) {
+    public void fireSessionCreated() {
         Entry head = this.head;
         callNextSessionCreated(head, session);
     }
@@ -280,11 +280,11 @@
         try {
             entry.getFilter().sessionCreated(entry.getNextFilter(), session);
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireSessionOpened(IoSession session) {
+    public void fireSessionOpened() {
         Entry head = this.head;
         callNextSessionOpened(head, session);
     }
@@ -293,16 +293,16 @@
         try {
             entry.getFilter().sessionOpened(entry.getNextFilter(), session);
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireSessionClosed(IoSession session) {
+    public void fireSessionClosed() {
         // Update future.
         try {
             session.getCloseFuture().setClosed();
         } catch (Throwable t) {
-            fireExceptionCaught(session, t);
+            fireExceptionCaught(t);
         }
 
         // And start the chain.
@@ -315,11 +315,11 @@
             entry.getFilter().sessionClosed(entry.getNextFilter(), session);
 
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireSessionIdle(IoSession session, IdleStatus status) {
+    public void fireSessionIdle(IdleStatus status) {
         Entry head = this.head;
         callNextSessionIdle(head, session, status);
     }
@@ -330,11 +330,15 @@
             entry.getFilter().sessionIdle(entry.getNextFilter(), session,
                     status);
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireMessageReceived(IoSession session, Object message) {
+    public void fireMessageReceived(Object message) {
+        if (message instanceof ByteBuffer) {
+            session.increaseReadBytes(((ByteBuffer) message).remaining());
+        }
+
         Entry head = this.head;
         callNextMessageReceived(head, session, message);
     }
@@ -345,15 +349,27 @@
             entry.getFilter().messageReceived(entry.getNextFilter(), session,
                     message);
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireMessageSent(IoSession session, WriteRequest request) {
+    public void fireMessageSent(WriteRequest request) {
+        Object message = request.getMessage();
+        if (message instanceof ByteBuffer) {
+            ByteBuffer b = (ByteBuffer) message;
+            if (b.hasRemaining()) {
+                session.increaseWrittenBytes(((ByteBuffer) message).remaining());
+            } else {
+                session.increaseWrittenMessages();
+            }
+        } else {
+            session.increaseWrittenMessages();
+        }
+
         try {
             request.getFuture().setWritten(true);
         } catch (Throwable t) {
-            fireExceptionCaught(session, t);
+            fireExceptionCaught(t);
         }
 
         Entry head = this.head;
@@ -366,11 +382,11 @@
             entry.getFilter().messageSent(entry.getNextFilter(), session,
                     writeRequest);
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireExceptionCaught(IoSession session, Throwable cause) {
+    public void fireExceptionCaught(Throwable cause) {
         // Notify the related ConnectFuture
         // if the session is created from SocketConnector.
         ConnectFuture future = (ConnectFuture) session
@@ -396,7 +412,7 @@
         }
     }
 
-    public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
+    public void fireFilterWrite(WriteRequest writeRequest) {
         Entry tail = this.tail;
         callPreviousFilterWrite(tail, session, writeRequest);
     }
@@ -408,11 +424,11 @@
                     writeRequest);
         } catch (Throwable e) {
             writeRequest.getFuture().setWritten(false);
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
-    public void fireFilterClose(IoSession session) {
+    public void fireFilterClose() {
         Entry tail = this.tail;
         callPreviousFilterClose(tail, session);
     }
@@ -421,7 +437,7 @@
         try {
             entry.getFilter().filterClose(entry.getNextFilter(), session);
         } catch (Throwable e) {
-            fireExceptionCaught(session, e);
+            fireExceptionCaught(e);
         }
     }
 
@@ -579,7 +595,7 @@
 
             s.getWriteRequestQueue().add(writeRequest);
             if (s.getTrafficMask().isWritable()) {
-                s.getProcessor().flush(s, writeRequest);
+                s.getProcessor().flush(s);
             }
         }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Sun Sep 16 23:43:04 2007
@@ -103,8 +103,9 @@
             public void add(IoSession session) {
             }
 
-            public void flush(IoSession session, WriteRequest writeRequest) {
-                getFilterChain().fireMessageSent(session, writeRequest);
+            public void flush(IoSession session) {
+                getFilterChain().fireMessageSent(
+                        ((DummySession) session).getWriteRequestQueue().poll());
             }
 
             public void remove(IoSession session) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Sun Sep 16 23:43:04 2007
@@ -102,7 +102,7 @@
         if (idleTime > 0 && lastIoTime != 0
                 && currentTime - lastIoTime >= idleTime) {
             session.increaseIdleCount(status);
-            session.getFilterChain().fireSessionIdle(session, status);
+            session.getFilterChain().fireSessionIdle(status);
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java Sun Sep 16 23:43:04 2007
@@ -180,63 +180,63 @@
      * call this method at all.  Please use this method only when you implement a new transport
      * or fire a virtual event.
      */
-    public void fireSessionCreated(IoSession session);
+    public void fireSessionCreated();
 
     /**
      * Fires a {@link IoHandler#sessionOpened(IoSession)} event.  Most users don't need to call
      * this method at all.  Please use this method only when you implement a new transport or
      * fire a virtual event.
      */
-    public void fireSessionOpened(IoSession session);
+    public void fireSessionOpened();
 
     /**
      * Fires a {@link IoHandler#sessionClosed(IoSession)} event.  Most users don't need to call
      * this method at all.  Please use this method only when you implement a new transport or
      * fire a virtual event.
      */
-    public void fireSessionClosed(IoSession session);
+    public void fireSessionClosed();
 
     /**
      * Fires a {@link IoHandler#sessionIdle(IoSession, IdleStatus)} event.  Most users don't
      * need to call this method at all.  Please use this method only when you implement a new
      * transport or fire a virtual event.
      */
-    public void fireSessionIdle(IoSession session, IdleStatus status);
+    public void fireSessionIdle(IdleStatus status);
 
     /**
-     * Fires a {@link #fireMessageReceived(IoSession, Object)} event.  Most users don't need to
+     * Fires a {@link #fireMessageReceived(Object)} event.  Most users don't need to
      * call this method at all.  Please use this method only when you implement a new transport
      * or fire a virtual event.
      */
-    public void fireMessageReceived(IoSession session, Object message);
+    public void fireMessageReceived(Object message);
 
     /**
      * Fires a {@link IoHandler#sessionOpened(IoSession)} event.  Most users don't need to call
      * this method at all.  Please use this method only when you implement a new transport or
      * fire a virtual event.
      */
-    public void fireMessageSent(IoSession session, WriteRequest request);
+    public void fireMessageSent(WriteRequest request);
 
     /**
      * Fires a {@link IoHandler#exceptionCaught(IoSession, Throwable)} event.  Most users don't
      * need to call this method at all.  Please use this method only when you implement a new
      * transport or fire a virtual event.
      */
-    public void fireExceptionCaught(IoSession session, Throwable cause);
+    public void fireExceptionCaught(Throwable cause);
 
     /**
      * Fires a {@link IoSession#write(Object)} event.  Most users don't need to call this
      * method at all.  Please use this method only when you implement a new transport or fire a
      * virtual event.
      */
-    public void fireFilterWrite(IoSession session, WriteRequest writeRequest);
+    public void fireFilterWrite(WriteRequest writeRequest);
 
     /**
      * Fires a {@link IoSession#close()} event.  Most users don't need to call this method at
      * all.  Please use this method only when you implement a new transport or fire a virtual
      * event.
      */
-    public void fireFilterClose(IoSession session);
+    public void fireFilterClose();
 
     /**
      * Represents a name-filter pair that an {@link IoFilterChain} contains.

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java Sun Sep 16 23:43:04 2007
@@ -39,10 +39,8 @@
     /**
      * Flushes the internal write request queue of the specified
      * {@code session}.
-     *
-     * @param writeRequest the write request added right now
      */
-    void flush(IoSession session, WriteRequest writeRequest);
+    void flush(IoSession session);
 
     /**
      * Controls the traffic of the specified {@code session} as specified

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java Sun Sep 16 23:43:04 2007
@@ -157,8 +157,8 @@
         }
 
         // Fire session events.
-        session.getFilterChain().fireSessionCreated(session);
-        session.getFilterChain().fireSessionOpened(session);
+        session.getFilterChain().fireSessionCreated();
+        session.getFilterChain().fireSessionOpened();
 
         // Fire listener events.
         synchronized (listeners) {
@@ -183,7 +183,7 @@
         }
 
         // Fire session events.
-        session.getFilterChain().fireSessionClosed(session);
+        session.getFilterChain().fireSessionClosed();
 
         // Fire listener events.
         try {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Sun Sep 16 23:43:04 2007
@@ -28,21 +28,22 @@
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.AbstractIoAcceptor;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.ExpiringIoSessionRecycler;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoProcessor;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
 import org.apache.mina.util.NamePreservingRunnable;
 import org.apache.mina.util.NewThreadExecutor;
 
@@ -54,27 +55,24 @@
  */
 public class DatagramAcceptor extends AbstractIoAcceptor implements
         IoAcceptor {
-    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringIoSessionRecycler();
 
     private static volatile int nextId = 0;
 
-    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
-
     private final Executor executor;
 
     private final int id = nextId++;
 
     private final Selector selector;
-
-    private final IoProcessor processor = new DatagramAcceptorProcessor();
+    
+    private final DatagramConnector connector;
 
     private DatagramChannel channel;
 
     private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
 
     private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
-
-    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+    
+    private final ConcurrentMap<SocketAddress, Object> cache = new ConcurrentHashMap<SocketAddress, Object>();
 
     private Worker worker;
 
@@ -89,10 +87,14 @@
      * Creates a new instance.
      */
     public DatagramAcceptor(Executor executor) {
-        super(new DefaultDatagramSessionConfig());
+        this(Runtime.getRuntime().availableProcessors() + 1, executor);
+    }
 
-        // The default reuseAddress should be 'true' for an accepted socket.
-        getSessionConfig().setReuseAddress(true);
+    /**
+     * Creates a new instance.
+     */
+    public DatagramAcceptor(int processorCount, Executor executor) {
+        super(new DefaultDatagramSessionConfig());
 
         try {
             this.selector = Selector.open();
@@ -101,6 +103,11 @@
         }
 
         this.executor = executor;
+        this.connector = new DatagramConnector(
+                this, "DatagramAcceptor-" + id, processorCount, executor);
+
+        // The default reuseAddress should be 'true' for an accepted socket.
+        getSessionConfig().setReuseAddress(true);
     }
 
     @Override
@@ -127,12 +134,28 @@
         return (InetSocketAddress) super.getLocalAddress();
     }
 
+    // This method is added to work around a problem with
+    // bean property access mechanism.
+
+    /**
+     * @see org.apache.mina.common.AbstractIoAcceptor#setLocalAddress(java.net.SocketAddress)
+     * @param localAddress the local address
+     */
+    public void setLocalAddress(InetSocketAddress localAddress) {
+        super.setLocalAddress(localAddress);
+    }
+    
+    @Override
+    protected IoServiceListenerSupport getListeners() {
+        return super.getListeners();
+    }
+
     @Override
     protected void doBind() throws IOException {
         RegistrationRequest request = new RegistrationRequest();
 
-        startupWorker();
         registerQueue.add(request);
+        startupWorker();
         selector.wakeup();
 
         synchronized (request) {
@@ -156,8 +179,8 @@
     protected void doUnbind() {
         CancellationRequest request = new CancellationRequest();
 
-        startupWorker();
         cancelQueue.add(request);
+        startupWorker();
         selector.wakeup();
 
         synchronized (request) {
@@ -184,99 +207,28 @@
                 throw new IllegalStateException(
                         "Can't create a session from a unbound service.");
             }
-
-            return newSessionWithoutLock(remoteAddress);
-        }
-    }
-
-    private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
-        Selector selector = this.selector;
-        DatagramChannel ch = this.channel;
-        SelectionKey key = ch.keyFor(selector);
-
-        IoSession session;
-        IoSessionRecycler sessionRecycler = getSessionRecycler();
-        synchronized (sessionRecycler) {
-            session = sessionRecycler.recycle(getLocalAddress(), remoteAddress);
-            if (session != null) {
-                return session;
-            }
-
-            // If a new session needs to be created.
-            DatagramSessionImpl datagramSession = new DatagramSessionImpl(
-                    this, ch, getHandler(),
-                    (InetSocketAddress) remoteAddress);
-            datagramSession.setSelectionKey(key);
-
-            getSessionRecycler().put(datagramSession);
-            session = datagramSession;
-        }
-
-        try {
-            this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
-            getListeners().fireSessionCreated(session);
-        } catch (Throwable t) {
-            ExceptionMonitor.getInstance().exceptionCaught(t);
-        }
-
-        return session;
-    }
-
-    /**
-     * Returns the {@link IoSessionRecycler} for this service.
-     */
-    public IoSessionRecycler getSessionRecycler() {
-        return sessionRecycler;
-    }
-
-    /**
-     * Sets the {@link IoSessionRecycler} for this service.
-     *
-     * @param sessionRecycler <tt>null</tt> to use the default recycler
-     */
-    public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
-        synchronized (bindLock) {
-            if (isBound()) {
-                throw new IllegalStateException(
-                        "sessionRecycler can't be set while the acceptor is bound.");
-            }
-
-            if (sessionRecycler == null) {
-                sessionRecycler = DEFAULT_RECYCLER;
-            }
-            this.sessionRecycler = sessionRecycler;
-        }
-    }
-
-    @Override
-    protected IoServiceListenerSupport getListeners() {
-        return super.getListeners();
-    }
-
-    IoProcessor getProcessor() {
-        return processor;
-    }
-
-    private class DatagramAcceptorProcessor implements IoProcessor {
-
-        public void add(IoSession session) {
-        }
-
-        public void flush(IoSession session, WriteRequest writeRequest) {
-            if (scheduleFlush((DatagramSessionImpl) session)) {
-                Selector selector = DatagramAcceptor.this.selector;
-                if (selector != null) {
-                    selector.wakeup();
-                }
+            
+            Object data;
+            synchronized (cache) { 
+                data = cache.get(remoteAddress);
+                if (data == null) {
+                    ConnectFuture future = connector.connect(remoteAddress, getLocalAddress());
+                    cache.put(remoteAddress, future);
+                    future.awaitUninterruptibly();
+                    return future.getSession();
+                }
+            }
+            
+            if (data instanceof ConnectFuture) {
+                ConnectFuture future = (ConnectFuture) data;
+                future.awaitUninterruptibly();
+                return future.getSession();
+            } else if (data instanceof IoSession) {
+                return ((IoSession) data);
+            } else {
+                throw new IllegalStateException();
             }
         }
-
-        public void remove(IoSession session) {
-            getListeners().fireSessionDestroyed(session);
-        }
-
-        public void updateTrafficMask(IoSession session) {
-        }
     }
 
     private synchronized void startupWorker() {
@@ -286,15 +238,6 @@
         }
     }
 
-    private boolean scheduleFlush(DatagramSessionImpl session) {
-        if (session.setScheduledForFlush(true)) {
-            flushingSessions.add(session);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
     private class Worker implements Runnable {
         public void run() {
             Thread.currentThread().setName("DatagramAcceptor-" + id);
@@ -309,7 +252,6 @@
                         processReadySessions(selector.selectedKeys());
                     }
 
-                    flushSessions();
                     cancelKeys();
 
                     if (selector.keys().isEmpty()) {
@@ -346,12 +288,6 @@
                 if (key.isReadable()) {
                     readSession(ch);
                 }
-
-                if (key.isWritable()) {
-                    for (IoSession session : getManagedSessions()) {
-                        scheduleFlush((DatagramSessionImpl) session);
-                    }
-                }
             } catch (Throwable t) {
                 ExceptionMonitor.getInstance().exceptionCaught(t);
             }
@@ -359,104 +295,55 @@
     }
 
     private void readSession(DatagramChannel channel) throws Exception {
-        ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
-                .getReceiveBufferSize());
+        final ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
+                .getReadBufferSize());
 
-        SocketAddress remoteAddress = channel.receive(readBuf.buf());
+        final SocketAddress remoteAddress = channel.receive(readBuf.buf());
         if (remoteAddress != null) {
-            DatagramSessionImpl session = (DatagramSessionImpl) newSessionWithoutLock(remoteAddress);
-
             readBuf.flip();
-
-            ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
-            newBuf.put(readBuf);
-            newBuf.flip();
-
-            session.increaseReadBytes(newBuf.remaining());
-            session.getFilterChain().fireMessageReceived(session, newBuf);
-        }
-    }
-
-    private void flushSessions() {
-        for (; ;) {
-            DatagramSessionImpl session = flushingSessions.poll();
-            if (session == null) {
-                break;
-            }
-
-            session.setScheduledForFlush(false);
-
-            try {
-                boolean flushedAll = flush(session);
-                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
-                    scheduleFlush(session);
-                }
-            } catch (IOException e) {
-                session.getFilterChain().fireExceptionCaught(session, e);
-            }
-        }
-    }
-
-    private boolean flush(DatagramSessionImpl session) throws IOException {
-        // Clear OP_WRITE
-        SelectionKey key = session.getSelectionKey();
-        if (key == null) {
-            scheduleFlush(session);
-            return false;
-        }
-        if (!key.isValid()) {
-            return false;
-        }
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-        DatagramChannel ch = session.getChannel();
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
-        int writtenBytes = 0;
-        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
-        try {
-            for (; ;) {
-                WriteRequest req = writeRequestQueue.peek();
-                if (req == null) {
-                    break;
-                }
-
-                ByteBuffer buf = (ByteBuffer) req.getMessage();
-                if (buf.remaining() == 0) {
-                    // pop and fire event
-                    writeRequestQueue.poll();
-                    session.increaseWrittenMessages();
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(session, req);
-                    continue;
-                }
-
-                SocketAddress destination = req.getDestination();
-                if (destination == null) {
-                    destination = session.getRemoteAddress();
-                }
-
-                int localWrittenBytes = ch.send(buf.buf(), destination);
-                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
-                    // Kernel buffer is full or wrote too much
-                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    return false;
-                } else {
-                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-                    // pop and fire event
-                    writeRequestQueue.poll();
-                    writtenBytes += localWrittenBytes;
-                    session.increaseWrittenMessages();
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(session, req);
-                }
+            Object data;
+            ConnectFuture future = null;
+            synchronized (cache) {
+                data = cache.get(remoteAddress);
+                if (data == null) {
+                    future = connector.connect(remoteAddress, getLocalAddress());
+                    cache.put(remoteAddress, future);
+                }
+            }
+            
+            if (data == null) {
+                future.addListener(new IoFutureListener() {
+                    public void operationComplete(IoFuture future) {
+                        ConnectFuture f = (ConnectFuture) future;
+                        if (f.getException() == null) {
+                            IoSession s = f.getSession();
+                            cache.put(remoteAddress, s);
+                            s.getCloseFuture().addListener(new IoFutureListener() {
+                                public void operationComplete(IoFuture future) {
+                                    cache.remove(remoteAddress);
+                                }
+                            });
+                            s.getFilterChain().fireMessageReceived(readBuf);
+                        } else {
+                            ExceptionMonitor.getInstance().exceptionCaught(f.getException());
+                        }
+                    }
+                });
+            } else if (data instanceof ConnectFuture) {
+                future = (ConnectFuture) data;
+                future.addListener(new IoFutureListener() {
+                    public void operationComplete(IoFuture future) {
+                        ConnectFuture f = (ConnectFuture) future;
+                        if (f.getException() == null) {
+                            IoSession s = f.getSession();
+                            s.getFilterChain().fireMessageReceived(readBuf);
+                        }
+                    }
+                });
+            } else if (data instanceof IoSession) {
+                ((IoSession) data).getFilterChain().fireMessageReceived(readBuf);
             }
-        } finally {
-            session.increaseWrittenBytes(writtenBytes);
         }
-
-        return true;
     }
 
     private void registerNew() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Sun Sep 16 23:43:04 2007
@@ -28,9 +28,13 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
 import org.apache.mina.common.DefaultIoFilterChain;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoProcessor;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.util.NewThreadExecutor;
@@ -45,8 +49,11 @@
     private static volatile int nextId = 0;
 
     private final int id = nextId++;
+    private final IoService parent;
+    private final int processorCount;
+    private final NIOProcessor[] ioProcessors;
 
-    private final IoProcessor processor;
+    private int processorDistributor = 0;
 
     /**
      * Creates a new instance.
@@ -59,9 +66,51 @@
      * Creates a new instance.
      */
     public DatagramConnector(Executor executor) {
+        this(Runtime.getRuntime().availableProcessors() + 1, executor);
+    }
+
+    /**
+     * Creates a new instance.
+     */
+    public DatagramConnector(int processorCount, Executor executor) {
+        this(null, null, processorCount, executor);
+    }
+    
+    DatagramConnector(
+            IoService parent, String threadNamePrefix, int processorCount, Executor executor) {
         super(new DefaultDatagramSessionConfig());
+        
+        // DotagramAcceptor can use DatagramConnector as a child.
+        if (parent == null) {
+            parent = this;
+        }
+        if (threadNamePrefix == null) {
+            threadNamePrefix = "DatagramConnector-" + id;
+        }
+        this.parent = parent;
+        
+        if (processorCount < 1) {
+            throw new IllegalArgumentException(
+                    "Must have at least one processor");
+        }
+
+        this.processorCount = processorCount;
+        ioProcessors = new NIOProcessor[processorCount];
+
+        // create an array of SocketIoProcessors that will be used for
+        // handling sessions.
+        for (int i = 0; i < processorCount; i++) {
+            ioProcessors[i] = new NIOProcessor(
+                    threadNamePrefix + '.' + i, executor);
+        }
+    }
+
+    private NIOProcessor nextProcessor() {
+        if (this.processorDistributor == Integer.MAX_VALUE) {
+            this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
+        }
 
-        processor = new NIOProcessor("DatagramConnector-" + id, executor);
+        return ioProcessors[processorDistributor++ % processorCount];
     }
 
     public TransportMetadata getTransportMetadata() {
@@ -69,8 +118,66 @@
     }
 
     @Override
+    protected IoServiceListenerSupport getListeners() {
+        if (parent == this) {
+            return super.getListeners();
+        } else {
+            return ((DatagramAcceptor) parent).getListeners();
+        }
+    }
+
+    @Override
     public DatagramSessionConfig getSessionConfig() {
-        return (DatagramSessionConfig) super.getSessionConfig();
+        if (parent == this) {
+            return (DatagramSessionConfig) super.getSessionConfig();
+        } else {
+            return (DatagramSessionConfig) parent.getSessionConfig();
+        }
+    }
+
+    @Override
+    public DefaultIoFilterChainBuilder getFilterChain() {
+        if (parent == this) {
+            return super.getFilterChain();
+        } else {
+            return parent.getFilterChain();
+        }
+    }
+
+    @Override
+    public IoFilterChainBuilder getFilterChainBuilder() {
+        if (parent == this) {
+            return super.getFilterChainBuilder();
+        } else {
+            return parent.getFilterChainBuilder();
+        }
+    }
+
+    @Override
+    public void setFilterChainBuilder(IoFilterChainBuilder builder) {
+        if (parent == this) {
+            super.setFilterChainBuilder(builder);
+        } else {
+            parent.setFilterChainBuilder(builder);
+        }
+    }
+
+    @Override
+    public IoHandler getHandler() {
+        if (parent == this) {
+            return super.getHandler();
+        } else {
+            return parent.getHandler();
+        }
+    }
+
+    @Override
+    public void setHandler(IoHandler handler) {
+        if (parent == this) {
+            super.setHandler(handler);
+        } else {
+            parent.setHandler(handler);
+        }
     }
 
     @Override
@@ -81,13 +188,17 @@
         IoSession session = null;
         try {
             ch = DatagramChannel.open();
-            session = new DatagramSessionImpl(this, ch, getHandler());
-
+            ch.socket().setReuseAddress(getSessionConfig().isReuseAddress());
+            ch.socket().setReuseAddress(true);
+            ch.socket().setBroadcast(getSessionConfig().isBroadcast());
+            
             if (localAddress != null) {
                 ch.socket().bind(localAddress);
             }
             ch.connect(remoteAddress);
 
+            NIOProcessor processor = nextProcessor();
+            session = new DatagramSessionImpl(parent, ch, processor);
             ConnectFuture future = new DefaultConnectFuture();
             // DefaultIoFilterChain will notify the connect future.
             session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
@@ -107,9 +218,5 @@
                 }
             }
         }
-    }
-
-    IoProcessor getProcessor() {
-        return processor;
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Sun Sep 16 23:43:04 2007
@@ -26,10 +26,8 @@
 import java.nio.channels.SelectionKey;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.DefaultTransportMetadata;
-import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoProcessor;
@@ -66,37 +64,20 @@
     private final InetSocketAddress localAddress;
 
     private final InetSocketAddress remoteAddress;
+    
+    private final IoProcessor processor;
 
     private SelectionKey key;
 
     /**
-     * Creates a new acceptor instance.
-     */
-    DatagramSessionImpl(
-            DatagramAcceptor service,
-            DatagramChannel ch, IoHandler defaultHandler,
-            InetSocketAddress remoteAddress) {
-        this.service = service;
-        this.ch = ch;
-        this.handler = defaultHandler;
-        this.remoteAddress = remoteAddress;
-
-        // We didn't set the localAddress by calling getLocalSocketAddress() to avoid
-        // the case that getLocalSocketAddress() returns IPv6 address while
-        // serviceAddress represents the same address in IPv4.
-        this.localAddress = service.getLocalAddress();
-
-        this.config.setAll(service.getSessionConfig());
-    }
-
-    /**
      * Creates a new connector instance.
      */
-    DatagramSessionImpl(DatagramConnector service,
-                        DatagramChannel ch, IoHandler defaultHandler) {
+    DatagramSessionImpl(IoService service,
+                        DatagramChannel ch, IoProcessor processor) {
         this.service = service;
         this.ch = ch;
-        this.handler = defaultHandler;
+        this.handler = service.getHandler();
+        this.processor = processor;
         this.remoteAddress = (InetSocketAddress) ch.socket()
                 .getRemoteSocketAddress();
         this.localAddress = (InetSocketAddress) ch.socket()
@@ -111,11 +92,7 @@
 
     @Override
     protected IoProcessor getProcessor() {
-        if (service instanceof DatagramAcceptor) {
-            return ((DatagramAcceptor) service).getProcessor();
-        } else {
-            return ((DatagramConnector) service).getProcessor();
-        }
+        return processor;
     }
 
     public DatagramSessionConfig getConfig() {
@@ -150,16 +127,6 @@
     }
 
     @Override
-    public CloseFuture close() {
-        if (service instanceof IoAcceptor) {
-            ((DatagramAcceptor) service).getSessionRecycler()
-                    .remove(this);
-        }
-        CloseFuture answer = super.close();
-        return answer;
-    }
-
-    @Override
     public WriteFuture write(Object message, SocketAddress destination) {
         if (!this.config.isBroadcast()) {
             throw new IllegalStateException("Non-broadcast session");
@@ -197,7 +164,6 @@
                     ch.socket().setReceiveBufferSize(receiveBufferSize);
                     // Re-retrieve the effective receive buffer size.
                     receiveBufferSize = ch.socket().getReceiveBufferSize();
-                    DatagramSessionImpl.this.config.setReadBufferSize(receiveBufferSize);
                 } catch (SocketException e) {
                     throw new RuntimeIOException(e);
                 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Sep 16 23:43:04 2007
@@ -271,7 +271,6 @@
         }
 
         if (request.exception != null) {
-            // TODO better exception handling.
             if (request.exception instanceof RuntimeException) {
                 throw (RuntimeException) request.exception;
             } else if (request.exception instanceof IOException) {
@@ -472,8 +471,7 @@
 
                 // and notify.
                 getListeners().fireServiceActivated();
-            } catch (Throwable e) // TODO better exception handling.
-            {
+            } catch (Throwable e) {
                 req.exception = e;
             } finally {
                 synchronized (req) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Sun Sep 16 23:43:04 2007
@@ -86,14 +86,7 @@
                     if (!s.getTrafficMask().isReadable()) {
                         s.receivedMessageQueue.add(data);
                     } else {
-                        int byteCount = 1;
-                        if (data instanceof ByteBuffer) {
-                            byteCount = ((ByteBuffer) data).remaining();
-                        }
-
-                        s.increaseReadBytes(byteCount);
-
-                        super.fireMessageReceived(s, data);
+                        super.fireMessageReceived(data);
                     }
                 } finally {
                     s.getLock().unlock();
@@ -102,22 +95,22 @@
                 s.receivedMessageQueue.add(data);
             }
         } else if (type == IoEventType.WRITE) {
-            super.fireFilterWrite(session, (WriteRequest) data);
+            super.fireFilterWrite((WriteRequest) data);
         } else if (type == IoEventType.MESSAGE_SENT) {
-            super.fireMessageSent(session, (WriteRequest) data);
+            super.fireMessageSent((WriteRequest) data);
         } else if (type == IoEventType.EXCEPTION_CAUGHT) {
-            super.fireExceptionCaught(session, (Throwable) data);
+            super.fireExceptionCaught((Throwable) data);
         } else if (type == IoEventType.SESSION_IDLE) {
-            super.fireSessionIdle(session, (IdleStatus) data);
+            super.fireSessionIdle((IdleStatus) data);
         } else if (type == IoEventType.SESSION_OPENED) {
-            super.fireSessionOpened(session);
+            super.fireSessionOpened();
             sessionOpened = true;
         } else if (type == IoEventType.SESSION_CREATED) {
-            super.fireSessionCreated(session);
+            super.fireSessionCreated();
         } else if (type == IoEventType.SESSION_CLOSED) {
-            super.fireSessionClosed(session);
+            super.fireSessionClosed();
         } else if (type == IoEventType.CLOSE) {
-            super.fireFilterClose(session);
+            super.fireFilterClose();
         }
     }
 
@@ -127,52 +120,52 @@
     }
 
     @Override
-    public void fireFilterClose(IoSession session) {
-        pushEvent(new IoEvent(IoEventType.CLOSE, session, null));
+    public void fireFilterClose() {
+        pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
     }
 
     @Override
-    public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
-        pushEvent(new IoEvent(IoEventType.WRITE, session, writeRequest));
+    public void fireFilterWrite(WriteRequest writeRequest) {
+        pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
     }
 
     @Override
-    public void fireExceptionCaught(IoSession session, Throwable cause) {
-        pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, session, cause));
+    public void fireExceptionCaught(Throwable cause) {
+        pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
     }
 
     @Override
-    public void fireMessageSent(IoSession session, WriteRequest request) {
-        pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, request));
+    public void fireMessageSent(WriteRequest request) {
+        pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
     }
 
     @Override
-    public void fireSessionClosed(IoSession session) {
-        pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, session, null));
+    public void fireSessionClosed() {
+        pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
     }
 
     @Override
-    public void fireSessionCreated(IoSession session) {
-        pushEvent(new IoEvent(IoEventType.SESSION_CREATED, session, null));
+    public void fireSessionCreated() {
+        pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
     }
 
     @Override
-    public void fireSessionIdle(IoSession session, IdleStatus status) {
-        pushEvent(new IoEvent(IoEventType.SESSION_IDLE, session, status));
+    public void fireSessionIdle(IdleStatus status) {
+        pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
     }
 
     @Override
-    public void fireSessionOpened(IoSession session) {
-        pushEvent(new IoEvent(IoEventType.SESSION_OPENED, session, null));
+    public void fireSessionOpened() {
+        pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
     }
 
     @Override
-    public void fireMessageReceived(IoSession session, Object message) {
-        pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, session, message));
+    public void fireMessageReceived(Object message) {
+        pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
     }
 
     private class VmPipeIoProcessor implements IoProcessor {
-        public void flush(IoSession session, WriteRequest writeRequest) {
+        public void flush(IoSession session) {
             VmPipeSessionImpl s = (VmPipeSessionImpl) session;
             Queue<WriteRequest> queue = s.getWriteRequestQueue();
             if (queue.isEmpty()) {
@@ -183,13 +176,11 @@
                     try {
                         WriteRequest req;
                         while ((req = queue.poll()) != null) {
-                            int byteCount = 0;
                             Object message = req.getMessage();
                             Object messageCopy = message;
                             if (message instanceof ByteBuffer) {
                                 ByteBuffer rb = (ByteBuffer) message;
                                 rb.mark();
-                                byteCount = rb.remaining();
                                 ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
                                 wb.put(rb);
                                 wb.flip();
@@ -197,12 +188,9 @@
                                 messageCopy = wb;
                             }
 
-                            s.increaseWrittenBytes(byteCount);
-                            s.increaseWrittenMessages();
-
                             s.getRemoteSession().getFilterChain().fireMessageReceived(
-                                    s.getRemoteSession(), messageCopy);
-                            s.getFilterChain().fireMessageSent(s, req);
+                                    messageCopy);
+                            s.getFilterChain().fireMessageSent(req);
                         }
                     } finally {
                         s.getLock().unlock();
@@ -240,15 +228,12 @@
                 List<Object> data = new ArrayList<Object>();
                 s.receivedMessageQueue.drainTo(data);
                 for (Object aData : data) {
-                    // TODO Optimize inefficient data transfer.
-                    // Data will be returned to pendingDataQueue
-                    // if getTraffic().isReadable() is false.
-                    VmPipeFilterChain.this.fireMessageReceived(s, aData);
+                    VmPipeFilterChain.this.fireMessageReceived(aData);
                 }
             }
 
             if (s.getTrafficMask().isWritable()) {
-                flush(s, null); // The second parameter is unused.
+                flush(s); // The second parameter is unused.
             }
         }
     }

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java Sun Sep 16 23:43:04 2007
@@ -196,13 +196,13 @@
     }
 
     private void run(String expectedResult) {
-        chain.fireSessionCreated(session);
-        chain.fireSessionOpened(session);
-        chain.fireMessageReceived(session, new Object());
-        chain.fireFilterWrite(session, new DefaultWriteRequest(new Object()));
-        chain.fireSessionIdle(session, IdleStatus.READER_IDLE);
-        chain.fireExceptionCaught(session, new Exception());
-        chain.fireSessionClosed(session);
+        chain.fireSessionCreated();
+        chain.fireSessionOpened();
+        chain.fireMessageReceived(new Object());
+        chain.fireFilterWrite(new DefaultWriteRequest(new Object()));
+        chain.fireSessionIdle(IdleStatus.READER_IDLE);
+        chain.fireExceptionCaught(new Exception());
+        chain.fireSessionClosed();
 
         result = formatResult(result);
         expectedResult = formatResult(expectedResult);

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java Sun Sep 16 23:43:04 2007
@@ -93,8 +93,8 @@
 
         // Test creation
         listener.sessionCreated(session);
-        chain.fireSessionCreated(session);
-        chain.fireSessionOpened(session);
+        chain.fireSessionCreated();
+        chain.fireSessionOpened();
 
         listenerControl.replay();
         chainControl.replay();
@@ -111,7 +111,7 @@
         // Test destruction & other side effects
         listenerControl.reset();
         chainControl.reset();
-        chain.fireSessionClosed(session);
+        chain.fireSessionClosed();
         listener.sessionDestroyed(session);
 
         listenerControl.replay();
@@ -154,8 +154,8 @@
         // Activate a service and create a session.
         listener.serviceActivated(acceptor);
         listener.sessionCreated(session);
-        chain.fireSessionCreated(session);
-        chain.fireSessionOpened(session);
+        chain.fireSessionCreated();
+        chain.fireSessionOpened();
 
         listenerControl.replay();
         chainControl.replay();
@@ -174,8 +174,8 @@
         listener.serviceDeactivated(acceptor);
         acceptorControl.expectAndReturn(acceptor.isDisconnectOnUnbind(), true);
         listener.sessionDestroyed(session);
-        chain.fireFilterClose(session);
-        chain.fireSessionClosed(session);
+        chain.fireFilterClose();
+        chain.fireSessionClosed();
 
         listenerControl.replay();
         acceptorControl.replay();
@@ -229,8 +229,8 @@
         // Creating a session should activate a service automatically.
         listener.serviceActivated(connector);
         listener.sessionCreated(session);
-        chain.fireSessionCreated(session);
-        chain.fireSessionOpened(session);
+        chain.fireSessionCreated();
+        chain.fireSessionOpened();
 
         listenerControl.replay();
         chainControl.replay();
@@ -245,7 +245,7 @@
         listenerControl.reset();
         chainControl.reset();
         listener.sessionDestroyed(session);
-        chain.fireSessionClosed(session);
+        chain.fireSessionClosed();
         listener.serviceDeactivated(connector);
 
         listenerControl.replay();

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java Sun Sep 16 23:43:04 2007
@@ -21,14 +21,17 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.Collection;
 import java.util.Date;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.socket.nio.DatagramAcceptor;
@@ -56,6 +59,8 @@
     protected abstract SocketAddress createSocketAddress(int port);
 
     protected abstract int getPort(SocketAddress address);
+    
+    protected abstract IoConnector newConnector();
 
     protected void bind(boolean reuseAddress) throws IOException {
         acceptor.setHandler(new EchoProtocolHandler());
@@ -146,6 +151,36 @@
         for (int i = 0; i < 1024; i++) {
             acceptor.unbind();
             acceptor.bind();
+        }
+    }
+    
+    public void testUnbindDisconnectsClients() throws Exception {
+        bind(true);
+        IoConnector connector = newConnector();
+        IoSession[] sessions = new IoSession[5];
+        connector.setHandler(new IoHandlerAdapter());
+        for (int i = 0; i < sessions.length; i++) {
+            ConnectFuture future = connector.connect(createSocketAddress(port));
+            future.awaitUninterruptibly();
+            sessions[i] = future.getSession();
+            Assert.assertTrue(sessions[i].isConnected());
+            Assert.assertTrue(sessions[i].write(ByteBuffer.allocate(1)).awaitUninterruptibly().isWritten());
+        }
+
+        // Wait for the server side sessions to be created.
+        Thread.sleep(500);
+
+        Collection<IoSession> managedSessions = acceptor.getManagedSessions();
+        Assert.assertEquals(5, managedSessions.size());
+
+        acceptor.unbind();
+
+        // Wait for the client side sessions to close.
+        Thread.sleep(500);
+
+        Assert.assertEquals(0, managedSessions.size());
+        for (IoSession element : managedSessions) {
+            Assert.assertFalse(element.isConnected());
         }
     }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java Sun Sep 16 23:43:04 2007
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
+import org.apache.mina.common.IoConnector;
 import org.apache.mina.transport.AbstractBindTest;
 
 /**
@@ -38,11 +39,16 @@
 
     @Override
     protected SocketAddress createSocketAddress(int port) {
-        return new InetSocketAddress(port);
+        return new InetSocketAddress("localhost", port);
     }
 
     @Override
     protected int getPort(SocketAddress address) {
         return ((InetSocketAddress) address).getPort();
+    }
+    
+    @Override
+    protected IoConnector newConnector() {
+        return new DatagramConnector();
     }
 }

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java Sun Sep 16 23:43:04 2007
@@ -21,14 +21,8 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Collection;
 
-import junit.framework.Assert;
-
-import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.AbstractBindTest;
 
 /**
@@ -45,42 +39,16 @@
 
     @Override
     protected SocketAddress createSocketAddress(int port) {
-        return new InetSocketAddress(port);
+        return new InetSocketAddress("localhost", port);
     }
 
     @Override
     protected int getPort(SocketAddress address) {
         return ((InetSocketAddress) address).getPort();
     }
-
-    public void testUnbindDisconnectsClients() throws Exception {
-        // TODO: This test is almost identical to the test with the same name in VmPipeBindTest
-        bind(false);
-
-        IoConnector connector = new SocketConnector();
-        IoSession[] sessions = new IoSession[5];
-        connector.setHandler(new IoHandlerAdapter());
-        for (int i = 0; i < sessions.length; i++) {
-            ConnectFuture future = connector.connect(new InetSocketAddress(
-                    "localhost", port));
-            future.awaitUninterruptibly();
-            sessions[i] = future.getSession();
-            Assert.assertTrue(sessions[i].isConnected());
-        }
-
-        // Wait for the server side sessions to be created.
-        Thread.sleep(500);
-
-        Collection<IoSession> managedSessions = acceptor.getManagedSessions();
-        Assert.assertEquals(5, managedSessions.size());
-
-        acceptor.unbind();
-
-        // Wait for the client side sessions to close.
-        Thread.sleep(500);
-
-        for (IoSession element : sessions) {
-            Assert.assertFalse(element.isConnected());
-        }
+    
+    @Override
+    protected IoConnector newConnector() {
+        return new SocketConnector();
     }
 }

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java Sun Sep 16 23:43:04 2007
@@ -20,14 +20,8 @@
 package org.apache.mina.transport.vmpipe;
 
 import java.net.SocketAddress;
-import java.util.Collection;
 
-import junit.framework.Assert;
-
-import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.AbstractBindTest;
 
 /**
@@ -52,38 +46,8 @@
         return ((VmPipeAddress) address).getPort();
     }
 
-    public void testUnbindDisconnectsClients() throws Exception {
-        // TODO: This test is almost identical to the test with the same name in SocketBindTest
-        bind(false);
-
-        SocketAddress addr = createSocketAddress(port);
-
-        IoConnector connector = new VmPipeConnector();
-        connector.setHandler(new IoHandlerAdapter());
-        IoSession[] sessions = new IoSession[5];
-        for (int i = 0; i < sessions.length; i++) {
-            ConnectFuture future = connector.connect(addr);
-            future.awaitUninterruptibly();
-            sessions[i] = future.getSession();
-            Assert.assertTrue(sessions[i].isConnected());
-        }
-
-        // Wait for the server side sessions to be created.
-        Thread.sleep(500);
-
-        Collection<IoSession> managedSessions = acceptor.getManagedSessions();
-        Assert.assertEquals(5, managedSessions.size());
-        for (IoSession element : sessions) {
-            Assert.assertFalse(managedSessions.contains(element));
-        }
-
-        acceptor.unbind();
-
-        // Wait for the client side sessions to close.
-        Thread.sleep(500);
-
-        for (IoSession element : sessions) {
-            Assert.assertFalse(element.isConnected());
-        }
+    @Override
+    protected IoConnector newConnector() {
+        return new VmPipeConnector();
     }
 }

Modified: mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java (original)
+++ mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java Sun Sep 16 23:43:04 2007
@@ -168,11 +168,9 @@
             ByteBuffer buf = (ByteBuffer) req.getMessage();
             if (buf.remaining() == 0) {
                 getWriteRequestQueue().poll();
-                this.increaseWrittenMessages();
-
                 buf.reset();
 
-                this.getFilterChain().fireMessageSent(this, req);
+                this.getFilterChain().fireMessageSent(req);
                 continue;
             }
 
@@ -180,9 +178,8 @@
             try {
                 outputStream.write(buf.array());
                 buf.position(buf.position() + writtenBytes);
-                this.increaseWrittenBytes(writtenBytes);
             } catch (IOException e) {
-                this.getFilterChain().fireExceptionCaught(this, e);
+                this.getFilterChain().fireExceptionCaught(e);
             }
         }
     }
@@ -209,17 +206,16 @@
                         int readBytes = inputStream.read(data);
 
                         if (readBytes > 0) {
-                            increaseReadBytes(readBytes);
                             ByteBuffer buf = ByteBuffer
                                     .wrap(data, 0, readBytes);
                             buf.put(data, 0, readBytes);
                             buf.flip();
                             getFilterChain().fireMessageReceived(
-                                    SerialSessionImpl.this, buf);
+                                    buf);
                         }
                     } catch (IOException e) {
                         getFilterChain().fireExceptionCaught(
-                                SerialSessionImpl.this, e);
+                                e);
                     }
                 }
             }
@@ -242,7 +238,7 @@
     public void add(IoSession session) {
     }
 
-    public void flush(IoSession session, WriteRequest writeRequest) {
+    public void flush(IoSession session) {
         if (writeWorker == null) {
             writeWorker = new WriteWorker();
             writeWorker.start();
@@ -266,7 +262,7 @@
         }
 
         port.close();
-        flush(session, null);
+        flush(session);
         synchronized (readReadyMonitor) {
             readReadyMonitor.notifyAll();
         }