You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/11/02 04:55:13 UTC

svn commit: r330182 - in /directory/network/trunk/src: java/org/apache/mina/common/ java/org/apache/mina/common/support/ java/org/apache/mina/transport/socket/nio/support/ java/org/apache/mina/transport/vmpipe/support/ test/org/apache/mina/common/ test...

Author: trustin
Date: Tue Nov  1 19:55:00 2005
New Revision: 330182

URL: http://svn.apache.org/viewcvs?rev=330182&view=rev
Log:
Resolving issue: DIRMINA-2 - Traffic control
* Applied Niklas's patch, but modified it a little bit:
** Added TrafficMask
** Added IoSession.get/setTrafficMask()
* Renamed and reordered methods in SocketIoProcessor to look cleaner

TODO: traffic control for Datagram and VmPipe is not yet done.

Added:
    directory/network/trunk/src/java/org/apache/mina/common/TrafficMask.java   (with props)
Modified:
    directory/network/trunk/src/java/org/apache/mina/common/IoSession.java
    directory/network/trunk/src/java/org/apache/mina/common/support/BaseIoSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    directory/network/trunk/src/test/org/apache/mina/common/FutureTest.java
    directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
    directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
    directory/network/trunk/src/test/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java

Modified: directory/network/trunk/src/java/org/apache/mina/common/IoSession.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoSession.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoSession.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoSession.java Tue Nov  1 19:55:00 2005
@@ -152,7 +152,7 @@
     int getIdleTime( IdleStatus status );
 
     /**
-     * Returnd idle time for the specified type of idleness in milliseconds.
+     * Returns idle time for the specified type of idleness in milliseconds.
      */
     long getIdleTimeInMillis( IdleStatus status );
 
@@ -160,7 +160,7 @@
      * Sets idle time for the specified type of idleness in seconds.
      */
     void setIdleTime( IdleStatus status, int idleTime );
-
+    
     /**
      * Returns write timeout in seconds.
      */
@@ -176,6 +176,18 @@
      */
     void setWriteTimeout( int writeTimeout );
     
+    /**
+     * Returns the current {@link TrafficMask} of this session.
+     */
+    TrafficMask getTrafficMask();
+    
+    /**
+     * Sets the {@link TrafficMask} of this session which will result
+     * the parent {@link IoSessionManager} to start to control the traffic
+     * of this session immediately.
+     */
+    void setTrafficMask( TrafficMask trafficMask );
+
     /**
      * Returns the total number of bytes which were read from this session.
      */

Added: directory/network/trunk/src/java/org/apache/mina/common/TrafficMask.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/TrafficMask.java?rev=330182&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/TrafficMask.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/common/TrafficMask.java Tue Nov  1 19:55:00 2005
@@ -0,0 +1,160 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * A type-safe mask that is used to control the traffic of {@link IoSession}
+ * with {@link IoSession#setTrafficMask(TrafficMask)}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class TrafficMask
+{
+    /**
+     * This mask suspends both reads and writes.
+     */
+    public static final TrafficMask NONE = new TrafficMask( 0, "none" );
+    /**
+     * This mask suspends writes, and resumes reads if reads were suspended.
+     */
+    public static final TrafficMask READ = new TrafficMask( SelectionKey.OP_READ, "read" );
+    /**
+     * This mask suspends reads, and resumes writes if writes were suspended.
+     */
+    public static final TrafficMask WRITE = new TrafficMask( SelectionKey.OP_WRITE, "write" );
+    /**
+     * This mask resumes both reads and writes if any of them were suspended.
+     */
+    public static final TrafficMask ALL =
+        new TrafficMask( SelectionKey.OP_READ | SelectionKey.OP_WRITE, "all" );
+    
+    /**
+     * Returns an appropriate {@link TrafficMask} instance from the
+     * specified <tt>interestOps</tt>.
+     * @see SelectionKey
+     */
+    public static TrafficMask getInstance( int interestOps )
+    {
+        boolean read = ( interestOps & SelectionKey.OP_READ ) != 0;
+        boolean write = ( interestOps & SelectionKey.OP_WRITE ) != 0;
+        if( read )
+        {
+            if( write )
+            {
+                return ALL;
+            }
+            else
+            {
+                return READ;
+            }
+        }
+        else if( write )
+        {
+            return WRITE;
+        }
+        else
+        {
+            return NONE;
+        }
+    }
+    
+    private final int interestOps;
+    private final String name;
+    
+    private TrafficMask( int interestOps, String name )
+    {
+        this.interestOps = interestOps;
+        this.name = name;
+    }
+
+    /**
+     * Returns the name of this mask.
+     */
+    public String getName()
+    {
+        return name;
+    }
+    
+    /**
+     * Returns <tt>true</tt> if this mask allows a read operation.
+     */
+    public boolean isReadable()
+    {
+        return ( interestOps & SelectionKey.OP_READ ) != 0;
+    }
+    
+    /**
+     * Returns <tt>true</tt> if this mask allows a write operation.
+     */
+    public boolean isWritable()
+    {
+        return ( interestOps & SelectionKey.OP_WRITE ) != 0;
+    }
+    
+    /**
+     * Returns an interestOps of {@link SelectionKey} for this mask.
+     */
+    public int getInterestOps()
+    {
+        return interestOps;
+    }
+
+    /**
+     * Peforms an <tt>AND</tt> operation on this mask with the specified
+     * <tt>mask</tt> and returns the result.
+     */
+    public TrafficMask and( TrafficMask mask )
+    {
+        return getInstance( interestOps & mask.interestOps );
+    }
+    
+    /**
+     * Peforms an <tt>OR</tt> operation on this mask with the specified
+     * <tt>mask</tt> and returns the result.
+     */
+    public TrafficMask or( TrafficMask mask )
+    {
+        return getInstance( interestOps | mask.interestOps );
+    }
+    
+    /**
+     * Returns a negated mask of this one.
+     */
+    public TrafficMask not()
+    {
+        return getInstance( ~interestOps );
+    }
+    
+    /**
+     * Peforms an <tt>XOR</tt> operation on this mask with the specified
+     * <tt>mask</tt> and returns the result.
+     */
+    public TrafficMask xor( TrafficMask mask )
+    {
+        return getInstance( interestOps ^ mask.interestOps );
+    }
+    
+    public String toString()
+    {
+        return name;
+    }
+}

Propchange: directory/network/trunk/src/java/org/apache/mina/common/TrafficMask.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/network/trunk/src/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/support/BaseIoSession.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/support/BaseIoSession.java Tue Nov  1 19:55:00 2005
@@ -24,6 +24,8 @@
 
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManager;
+import org.apache.mina.common.TrafficMask;
 
 /**
  * Base implementation of {@link IoSession}.
@@ -41,6 +43,7 @@
     private int idleTimeForWrite;
     private int idleTimeForBoth;
     private int writeTimeout;
+    private TrafficMask trafficMask = TrafficMask.ALL; 
     
     // Status variables
     private long readBytes;
@@ -171,6 +174,33 @@
         this.writeTimeout = writeTimeout;
     }
 
+    public TrafficMask getTrafficMask()
+    {
+        return trafficMask;
+    }
+    
+    public void setTrafficMask( TrafficMask trafficMask )
+    {
+        if( trafficMask == null )
+        {
+            throw new NullPointerException( "trafficMask" );
+        }
+        
+        if( this.trafficMask == trafficMask )
+        {
+            return;
+        }
+        
+        this.trafficMask = trafficMask;
+        updateTrafficMask();
+    }
+
+    /**
+     * Signals the {@link IoSessionManager} that the {@link TrafficMask} of this
+     * session has been changed.
+     */
+    protected abstract void updateTrafficMask();
+    
     public long getReadBytes()
     {
         return readBytes;

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Tue Nov  1 19:55:00 2005
@@ -185,4 +185,9 @@
     {
         ch.socket().setTrafficClass( tc );
     }
+
+    protected void updateTrafficMask()
+    {
+        // TODO: Implement me.
+    }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java Tue Nov  1 19:55:00 2005
@@ -295,7 +295,7 @@
                     RegistrationRequest req = ( RegistrationRequest ) key.attachment();
                     SocketSessionImpl session = new SocketSessionImpl( filters, ch, req.handler );
                     session.getManagerFilterChain().sessionCreated( session );
-                    SocketIoProcessor.getInstance().addSession( session );
+                    SocketIoProcessor.getInstance().addNew( session );
                     success = true;
                 }
                 catch( Throwable t )

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java Tue Nov  1 19:55:00 2005
@@ -250,7 +250,7 @@
         {
             ExceptionUtil.throwException( e );
         }
-        SocketIoProcessor.getInstance().addSession( session );
+        SocketIoProcessor.getInstance().addNew( session );
         return session;
     }
 

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Tue Nov  1 19:55:00 2005
@@ -68,12 +68,9 @@
     private final Selector selector;
 
     private final Queue newSessions = new Queue();
-
     private final Queue removingSessions = new Queue();
-
     private final Queue flushingSessions = new Queue();
-
-    private final Queue readableSessions = new Queue();
+    private final Queue trafficControllingSessions = new Queue();
 
     private Worker worker;
 
@@ -89,7 +86,7 @@
         return instance;
     }
 
-    public void addSession( SocketSessionImpl session )
+    void addNew( SocketSessionImpl session )
     {
         synchronized( this )
         {
@@ -103,7 +100,7 @@
         selector.wakeup();
     }
 
-    void removeSession( SocketSessionImpl session )
+    void remove( SocketSessionImpl session )
     {
         scheduleRemove( session );
         startupWorker();
@@ -119,22 +116,43 @@
         }
     }
 
-    void flushSession( SocketSessionImpl session )
+    void flush( SocketSessionImpl session )
     {
         scheduleFlush( session );
         selector.wakeup();
     }
 
-    void addReadableSession( SocketSessionImpl session )
+    void updateTrafficMask( SocketSessionImpl session )
+    {
+        scheduleTrafficControl( session );
+        selector.wakeup();
+    }
+
+    private void scheduleRemove( SocketSessionImpl session )
+    {
+        synchronized( removingSessions )
+        {
+            removingSessions.push( session );
+        }
+    }
+
+    private void scheduleFlush( SocketSessionImpl session )
     {
-        synchronized( readableSessions )
+        synchronized( flushingSessions )
         {
-            readableSessions.push( session );
+            flushingSessions.push( session );
         }
-        selector.wakeup();
     }
 
-    private void addSessions()
+    private void scheduleTrafficControl( SocketSessionImpl session )
+    {
+        synchronized( trafficControllingSessions )
+        {
+            trafficControllingSessions.push( session );
+        }
+    }
+
+    private void doAddNew()
     {
         if( newSessions.isEmpty() )
             return;
@@ -175,7 +193,7 @@
         }
     }
 
-    private void removeSessions()
+    private void doRemove()
     {
         if( removingSessions.isEmpty() )
             return;
@@ -227,7 +245,7 @@
         }
     }
 
-    private void processSessions( Set selectedKeys )
+    private void process( Set selectedKeys )
     {
         Iterator it = selectedKeys.iterator();
 
@@ -236,12 +254,12 @@
             SelectionKey key = ( SelectionKey ) it.next();
             SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
 
-            if( key.isReadable() )
+            if( key.isReadable() && session.getTrafficMask().isReadable() )
             {
                 read( session );
             }
 
-            if( key.isWritable() )
+            if( key.isWritable() && session.getTrafficMask().isWritable() )
             {
                 scheduleFlush( session );
             }
@@ -300,23 +318,7 @@
         }
     }
 
-    private void scheduleRemove( SocketSessionImpl session )
-    {
-        synchronized( removingSessions )
-        {
-            removingSessions.push( session );
-        }
-    }
-
-    private void scheduleFlush( SocketSessionImpl session )
-    {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
-    }
-
-    private void notifyIdleSessions()
+    private void notifyIdleness()
     {
         // process idle sessions
         long currentTime = System.currentTimeMillis();
@@ -330,35 +332,35 @@
                 {
                     SelectionKey key = ( SelectionKey ) it.next();
                     SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
-                    notifyIdleSession( session, currentTime );
+                    notifyIdleness( session, currentTime );
                 }
             }
         }
     }
 
-    private void notifyIdleSession( SocketSessionImpl session, long currentTime )
+    private void notifyIdleness( SocketSessionImpl session, long currentTime )
     {
-        notifyIdleSession0(
+        notifyIdleness0(
                 session, currentTime,
                 session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
                 IdleStatus.BOTH_IDLE,
                 Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
-        notifyIdleSession0(
+        notifyIdleness0(
                 session, currentTime,
                 session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
                 IdleStatus.READER_IDLE,
                 Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
-        notifyIdleSession0(
+        notifyIdleness0(
                 session, currentTime,
                 session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
                 IdleStatus.WRITER_IDLE,
                 Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
 
-        notifyWriteTimeoutSession( session, currentTime, session
+        notifyWriteTimeout( session, currentTime, session
                 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
     }
 
-    private void notifyIdleSession0( SocketSessionImpl session, long currentTime,
+    private void notifyIdleness0( SocketSessionImpl session, long currentTime,
                                     long idleTime, IdleStatus status,
                                     long lastIoTime )
     {
@@ -370,7 +372,7 @@
         }
     }
 
-    private void notifyWriteTimeoutSession( SocketSessionImpl session,
+    private void notifyWriteTimeout( SocketSessionImpl session,
                                            long currentTime,
                                            long writeTimeout, long lastIoTime )
     {
@@ -385,7 +387,7 @@
         }
     }
 
-    private void flushSessions()
+    private void doFlush()
     {
         if( flushingSessions.size() == 0 )
             return;
@@ -420,7 +422,7 @@
             {
                 try
                 {
-                    flush( session );
+                    doFlush( session );
                 }
                 catch( CancelledKeyException e )
                 {
@@ -458,7 +460,7 @@
         }
     }
 
-    private void flush( SocketSessionImpl session ) throws IOException
+    private void doFlush( SocketSessionImpl session ) throws IOException
     {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
@@ -507,6 +509,60 @@
         }
     }
 
+    private void doUpdateTrafficMask() 
+    {
+        if( trafficControllingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SocketSessionImpl session;
+
+            synchronized( trafficControllingSessions )
+            {
+                session = ( SocketSessionImpl ) trafficControllingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleTrafficControl( session );
+                break;
+            }
+
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, set OP_WRITE to trigger flushing.
+            int ops = SelectionKey.OP_READ;
+            Queue writeRequestQueue = session.getWriteRequestQueue();
+            synchronized( writeRequestQueue )
+            {
+                if( !writeRequestQueue.isEmpty() )
+                {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getTrafficMask().getInterestOps();
+
+            try
+            {
+                key.interestOps( ops & mask );
+            }
+            catch( CancelledKeyException e )
+            {
+                // Connection is closed unexpectedly.
+                scheduleRemove( session );
+            }
+        }
+    }
+    
     private class Worker extends Thread
     {
         public Worker()
@@ -521,16 +577,17 @@
                 try
                 {
                     int nKeys = selector.select( 1000 );
-                    addSessions();
-
+                    doAddNew();
+                    doUpdateTrafficMask();
+                    
                     if( nKeys > 0 )
                     {
-                        processSessions( selector.selectedKeys() );
+                        process( selector.selectedKeys() );
                     }
 
-                    flushSessions();
-                    removeSessions();
-                    notifyIdleSessions();
+                    doFlush();
+                    doRemove();
+                    notifyIdleness();
 
                     if( selector.keys().isEmpty() )
                     {

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java Tue Nov  1 19:55:00 2005
@@ -111,7 +111,7 @@
     {
         if( !closeFuture.isReady() )
         {
-            managerFilterChain.filterClose( this, closeFuture );
+            filterChain.filterClose( this, closeFuture );
         }
 
         return closeFuture;
@@ -250,5 +250,10 @@
         }
         
         this.readBufferSize = size;
+    }
+
+    protected void updateTrafficMask()
+    {
+        SocketIoProcessor.getInstance().updateTrafficMask( this );
     }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java Tue Nov  1 19:55:00 2005
@@ -30,16 +30,16 @@
         synchronized( writeRequestQueue )
         {
             writeRequestQueue.push( writeRequest );
-            if( writeRequestQueue.size() == 1 )
+            if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
             {
                 // Notify SocketIoProcessor only when writeRequestQueue was empty.
-                SocketIoProcessor.getInstance().flushSession( s );
+                SocketIoProcessor.getInstance().flush( s );
             }
         }
     }
 
     protected void doClose( IoSession session, CloseFuture closeFuture )
     {
-        SocketIoProcessor.getInstance().removeSession( ( SocketSessionImpl ) session );
+        SocketIoProcessor.getInstance().remove( ( SocketSessionImpl ) session );
     }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Tue Nov  1 19:55:00 2005
@@ -165,4 +165,9 @@
     {
         return localAddress;
     }
+
+    protected void updateTrafficMask()
+    {
+        // TODO: Implement me.
+    }
 }

Modified: directory/network/trunk/src/test/org/apache/mina/common/FutureTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/common/FutureTest.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/common/FutureTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/common/FutureTest.java Tue Nov  1 19:55:00 2005
@@ -117,6 +117,10 @@
             {
                 return 0;
             }
+
+            protected void updateTrafficMask()
+            {
+            }
         };
         
         future.setSession( session );

Modified: directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java Tue Nov  1 19:55:00 2005
@@ -196,6 +196,10 @@
         {
             return 0;
         }
+
+        protected void updateTrafficMask()
+        {
+        }
     }
 
     private class EventOrderTestFilter implements IoFilter

Modified: directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java Tue Nov  1 19:55:00 2005
@@ -146,6 +146,10 @@
         {
             return 0;
         }
+
+        protected void updateTrafficMask()
+        {
+        }
     }
     
     private static class EventOrderChecker implements NextFilter

Modified: directory/network/trunk/src/test/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java?rev=330182&r1=330181&r2=330182&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java Tue Nov  1 19:55:00 2005
@@ -226,5 +226,9 @@
         {
             return 0;
         }
+
+        protected void updateTrafficMask()
+        {
+        }
     }
 }