You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2006/09/12 11:28:44 UTC

svn commit: r442541 - in /directory/trunks/mina/core/src: main/java/org/apache/mina/common/ main/java/org/apache/mina/common/support/ main/java/org/apache/mina/transport/socket/nio/ main/java/org/apache/mina/transport/socket/nio/support/ main/java/org/...

Author: trustin
Date: Tue Sep 12 02:28:42 2006
New Revision: 442541

URL: http://svn.apache.org/viewvc?view=rev&rev=442541
Log:
Resolved issue: DIRMINA-162 (datagram session management fails)
* Added IoSessionRecycler
* Moved common logic related with sessionCreated/Opened/Closed to IoServiceListenerSupport to reduce duplication
* Added ExpiringSessionRecycler and its required classes
* Fixed that IoServiceListener doesn't work with VM-pipe


Added:
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java   (with props)
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java   (with props)
    directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java   (with props)
    directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java   (with props)
    directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Modified:
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java
    directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,96 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.util.ExpirationListener;
+import org.apache.mina.util.ExpiringMap;
+
+/**
+ * An {@link IoSessionRecycler} with sessions that time out on inactivity.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * 
+ * TODO Change time unit to 'seconds'.
+ */
+public class ExpiringSessionRecycler implements IoSessionRecycler, ExpirationListener
+{
+    private ExpiringMap sessionMap;
+
+    public ExpiringSessionRecycler()
+    {
+        this( ExpiringMap.DEFAULT_EXPIRATION_TIME, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+    }
+
+    public ExpiringSessionRecycler( long expirationTimeMillis )
+    {
+        this( expirationTimeMillis, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+    }
+
+    public ExpiringSessionRecycler( long expirationTimeMillis, long expirerDelay )
+    {
+        // FIXME Use IdentityHashMap if possible.
+        sessionMap = new ExpiringMap( expirationTimeMillis, expirerDelay );
+        sessionMap.addExpirationListener( this );
+    }
+
+    public void put( IoSession session )
+    {
+        Object key = generateKey( session );
+        if ( !sessionMap.containsKey( key ) )
+        {
+            sessionMap.put( key, session );
+        }
+    }
+
+    public IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress )
+    {
+        Object key = generateKey( localAddress, remoteAddress );
+        return ( IoSession ) sessionMap.get( key );
+    }
+
+    public void remove( IoSession session )
+    {
+        Object key = generateKey( session );
+        sessionMap.remove( key );
+    }
+
+    public void expired( Object expiredObject )
+    {
+        IoSession expiredSession = ( IoSession ) expiredObject;
+        expiredSession.close();
+    }
+
+    private Object generateKey( IoSession session )
+    {
+        return generateKey( session.getLocalAddress(), session.getRemoteAddress() );
+    }
+
+    private Object generateKey( SocketAddress localAddress, SocketAddress remoteAddress )
+    {
+        List key = new ArrayList( 2 );
+        key.add( remoteAddress );
+        key.add( localAddress );
+        return key;
+    }
+}

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java Tue Sep 12 02:28:42 2006
@@ -82,6 +82,19 @@
     void setThreadModel( ThreadModel threadModel );
     
     /**
+     * Sets the {@link IoSessionRecycler} for this service. Please note that this
+     * recycler will only be applied to connectionless transports.
+     * 
+     * @param sessionRecycler <tt>null</tt> to use the default recycler
+     */
+    void setSessionRecycler( IoSessionRecycler sessionRecycler );
+
+    /**
+     * Returns the {@link IoSessionRecycler} for this service.
+     */
+    IoSessionRecycler getSessionRecycler();
+    
+    /**
      * Returns a deep clone of this configuration.
      */
     Object clone();

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+
+/**
+ * A connectionless transport can recycle existing sessions by assigning an
+ * IoSessionRecyler to its {@link IoServiceConfig}.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO More documentation
+ */
+public interface IoSessionRecycler
+{
+    /**
+     * Called when the underlying transport creates or writes a new {@link IoSession}.
+     * 
+     * @param session
+     *            the new {@link IoSession}.
+     */
+    void put( IoSession session );
+
+    /**
+     * Attempts to retrieve a recycled {@link IoSession}.
+     * 
+     * @param localAddress
+     *            the local socket address of the {@link IoSession} the
+     *            transport wants to recycle.
+     * @param remoteAddress
+     *            the remote socket address of the {@link IoSession} the
+     *            transport wants to recycle.
+     * @return a recycled {@link IoSession}, or null if one cannot be found.
+     */
+    IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress );
+
+    /**
+     * Called when an {@link IoSession} is explicitly closed.
+     * 
+     * @param session
+     *            the new {@link IoSession}.
+     */
+    void remove( IoSession session );
+}

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java Tue Sep 12 02:28:42 2006
@@ -22,9 +22,11 @@
 import java.lang.reflect.Method;
 
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.ExpiringSessionRecycler;
 import org.apache.mina.common.IoFilterChainBuilder;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.ExecutorThreadModel;
+import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.ThreadModel;
 
 /**
@@ -35,6 +37,8 @@
  */
 public abstract class BaseIoServiceConfig implements IoServiceConfig, Cloneable
 {
+    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
+    
     /**
      * Current filter chain builder.
      */
@@ -49,6 +53,11 @@
      * Current thread model.
      */
     private ThreadModel threadModel = defaultThreadModel;
+    
+    /**
+     * Current session recycler
+     */
+    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
 
     public BaseIoServiceConfig()
     {
@@ -97,6 +106,21 @@
         }
         this.threadModel = threadModel;
     }
+    
+    // FIXME There can be a problem if a user changes the recycler after the service is activated.
+    public void setSessionRecycler( IoSessionRecycler sessionRecycler )
+    {
+        if( sessionRecycler == null )
+        {
+            sessionRecycler = DEFAULT_RECYCLER;
+        }
+        this.sessionRecycler = sessionRecycler;
+    }
+    
+    public IoSessionRecycler getSessionRecycler()
+    {
+        return sessionRecycler;
+    }
 
     public Object clone()
     {
@@ -109,7 +133,6 @@
         {
             throw ( InternalError ) new InternalError().initCause( e );
         }
-        
         
         // Try to clone the chain builder.
         try

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java Tue Sep 12 02:28:42 2006
@@ -111,14 +111,17 @@
         Set sessions;
         synchronized( managedSessions )
         {
-            sessions = ( Set )managedSessions.get( serviceAddress );
+            sessions = ( Set ) managedSessions.get( serviceAddress );
             if( sessions == null )
             {
                 sessions = new IdentityHashSet();
             }
         }
         
-        return Collections.unmodifiableSet( sessions );
+        synchronized( sessions )
+        {
+            return Collections.unmodifiableSet( sessions );
+        }
     }
 
     /**
@@ -188,6 +191,7 @@
     {
         SocketAddress serviceAddress = session.getServiceAddress();
         
+        // Get the session set.
         boolean firstSession = false;
         Set sessions;
         synchronized( managedSessions )
@@ -201,13 +205,7 @@
             }
         }
         
-        if( session.getService() instanceof IoConnector && firstSession )
-        {
-            fireServiceActivated(
-                    session.getService(), session.getServiceAddress(),
-                    session.getHandler(), session.getServiceConfig() );
-        }
-
+        // If already registered, ignore.
         synchronized( sessions )
         {
             if ( !sessions.add( session ) )
@@ -215,7 +213,20 @@
                 return;
             }
         }
+        
+        // If the first connector session, fire a virtual service activation event.
+        if( session.getService() instanceof IoConnector && firstSession )
+        {
+            fireServiceActivated(
+                    session.getService(), session.getServiceAddress(),
+                    session.getHandler(), session.getServiceConfig() );
+        }
 
+        // Fire session events.
+        session.getFilterChain().fireSessionCreated( session );
+        session.getFilterChain().fireSessionOpened( session);
+        
+        // Fire listener events.
         synchronized( listeners )
         {
             for( Iterator i = listeners.iterator(); i.hasNext(); )
@@ -232,17 +243,20 @@
     {
         SocketAddress serviceAddress = session.getServiceAddress();
         
+        // Get the session set.
         Set sessions;
         synchronized( managedSessions )
         {
             sessions = ( Set ) managedSessions.get( serviceAddress );
         }
         
+        // Ignore if unknown.
         if( sessions == null )
         {
             return;
         }
         
+        // Try to remove the remaining empty seession set after removal.
         boolean lastSession = false;
         synchronized( sessions )
         {
@@ -257,7 +271,10 @@
             }
         }
         
-
+        // Fire session events.
+        session.getFilterChain().fireSessionClosed( session );
+        
+        // Fire listener events.
         try
         {
             synchronized( listeners )
@@ -270,6 +287,7 @@
         }
         finally
         {
+            // Fire a virtual service deactivation event for the last session of the connector.
             if( session.getService() instanceof IoConnector && lastSession )
             {
                 fireServiceDeactivated(
@@ -303,22 +321,26 @@
         }
 
         final Object lock = new Object();
+        Set sessionsCopy;
+        
+        // Create a copy to avoid ConcurrentModificationException
         synchronized( sessions )
         {
-            for( Iterator i = sessions.iterator(); i.hasNext(); )
+            sessionsCopy = new IdentityHashSet( sessions );
+        }
+        
+        for( Iterator i = sessionsCopy.iterator(); i.hasNext(); )
+        {
+            ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
             {
-                ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
+                public void operationComplete( IoFuture future )
                 {
-                    public void operationComplete( IoFuture future )
+                    synchronized( lock )
                     {
-                        synchronized( lock )
-                        {
-                            //noinspection NakedNotify
-                            lock.notifyAll();
-                        }
+                        lock.notifyAll();
                     }
-                } );
-            }
+                }
+            } );
         }
 
         try
@@ -327,7 +349,7 @@
             {
                 while( !managedSessions.isEmpty() )
                 {
-                    lock.wait( 1000 );
+                    lock.wait( 500 );
                 }
             }
         }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Tue Sep 12 02:28:42 2006
@@ -350,7 +350,6 @@
                     getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
                     req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
                     req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
-                    session.getFilterChain().fireSessionCreated( session );
                     session.getIoProcessor().addNew( session );
                     success = true;
                 }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Tue Sep 12 02:28:42 2006
@@ -33,8 +33,10 @@
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
 import org.apache.mina.common.support.BaseIoConnector;
 import org.apache.mina.common.support.DefaultConnectFuture;
 import org.apache.mina.util.Queue;
@@ -162,10 +164,9 @@
 
             if( ch.connect( address ) )
             {
-                SocketSessionImpl session = newSession( ch, handler, config );
-                success = true;
                 DefaultConnectFuture future = new DefaultConnectFuture();
-                future.setSession( session );
+                newSession( ch, handler, config, future );
+                success = true;
                 return future;
             }
 
@@ -282,8 +283,7 @@
             try
             {
                 ch.finishConnect();
-                SocketSessionImpl session = newSession( ch, entry.handler, entry.config );
-                entry.setSession( session );
+                newSession( ch, entry.handler, entry.config, entry );
                 success = true;
             }
             catch( Throwable e )
@@ -343,7 +343,7 @@
         }
     }
 
-    private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoServiceConfig config )
+    private void newSession( SocketChannel ch, IoHandler handler, IoServiceConfig config, final ConnectFuture connectFuture )
         throws IOException
     {
         SocketSessionImpl session = new SocketSessionImpl( this,
@@ -358,14 +358,28 @@
             getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
             config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
             config.getThreadModel().buildFilterChain( session.getFilterChain() );
-            session.getFilterChain().fireSessionCreated( session );
+            session.getFilterChain().addFirst("__CONNECT_FUTURE_NOTIFIER__", new IoFilterAdapter()
+            {
+
+                public void sessionCreated( NextFilter nextFilter, IoSession session ) throws Exception
+                {
+                    session.getFilterChain().remove( "__CONNECT_FUTURE_NOTIFIER__" );
+                    try
+                    {
+                        nextFilter.sessionCreated( session );
+                    }
+                    finally
+                    {
+                        connectFuture.setSession( session );
+                    }
+                }
+            });
         }
         catch( Throwable e )
         {
             throw ( IOException ) new IOException( "Failed to create a session." ).initCause( e );
         }
         session.getIoProcessor().addNew( session );
-        return session;
     }
 
     private SocketIoProcessor nextProcessor()

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Tue Sep 12 02:28:42 2006
@@ -175,7 +175,6 @@
             if( registered )
             {
                 session.getServiceListeners().fireSessionCreated( session );
-                session.getFilterChain().fireSessionOpened( session );
             }
         }
     }
@@ -225,8 +224,6 @@
             {
                 releaseWriteBuffers( session );
                 session.getServiceListeners().fireSessionDestroyed( session );
-
-                session.getFilterChain().fireSessionClosed( session );
             }
         }
     }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Tue Sep 12 02:28:42 2006
@@ -27,7 +27,6 @@
 import java.nio.channels.Selector;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +38,10 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
 import org.apache.mina.util.Queue;
@@ -64,7 +65,7 @@
     private final Queue cancelQueue = new Queue();
     private final Queue flushingSessions = new Queue();
     private Worker worker;
-
+    
     /**
      * Creates a new instance.
      */
@@ -185,22 +186,6 @@
         }
     }
     
-    public boolean isBound( SocketAddress address )
-    {
-        synchronized( channels )
-        {
-            return channels.containsKey( address );
-        }
-    }
-    
-    public Set getBoundAddresses()
-    {
-        synchronized( channels )
-        {
-            return new HashSet( channels.keySet() );
-        }
-    }
-    
     public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
     {
         if( remoteAddress == null )
@@ -226,24 +211,44 @@
         }
 
         RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-        DatagramSessionImpl s = new DatagramSessionImpl(
-                wrapper, this,
-                req.config, ch, req.handler,
-                req.address );
-        s.setRemoteAddress( remoteAddress );
-        s.setSelectionKey( key );
+        IoSession session;
+        IoSessionRecycler sessionRecycler = req.config.getSessionRecycler();
+        synchronized ( sessionRecycler )
+        {
+            session = sessionRecycler.recycle( localAddress, remoteAddress);
+            if( session != null )
+            {
+                return session;
+            }
+
+            // If a new session needs to be created.
+            DatagramSessionImpl datagramSession = new DatagramSessionImpl(
+                    wrapper, this,
+                    req.config, ch, req.handler,
+                    req.address );
+            datagramSession.setRemoteAddress( remoteAddress );
+            datagramSession.setSelectionKey( key );
+            
+            req.config.getSessionRecycler().put( datagramSession );
+            session = datagramSession;
+        }
         
         try
         {
-            buildFilterChain( req, s );
-            s.getFilterChain().fireSessionCreated( s );
+            buildFilterChain( req, session );
+            getListeners().fireSessionCreated( session );
         }
         catch( Throwable t )
         {
             ExceptionMonitor.getInstance().exceptionCaught( t );
         }
         
-        return s;
+        return session;
+    }
+    
+    public IoServiceListenerSupport getListeners()
+    {
+        return super.getListeners();
     }
 
     private void buildFilterChain( RegistrationRequest req, IoSession session ) throws Exception
@@ -368,26 +373,20 @@
             DatagramChannel ch = ( DatagramChannel ) key.channel();
 
             RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-            DatagramSessionImpl session = new DatagramSessionImpl(
-                    wrapper, this,
-                    req.config,
-                    ch, req.handler,
-                    req.address );
-            session.setSelectionKey( key );
-            
             try
             {
-                buildFilterChain( req, session );
-                ( ( DatagramFilterChain ) session.getFilterChain() ).fireSessionCreated( session );
-
                 if( key.isReadable() )
                 {
-                    readSession( session );
+                    readSession( ch, req );
                 }
 
                 if( key.isWritable() )
                 {
-                    scheduleFlush( session );
+                    for( Iterator i = getManagedSessions( req.address ).iterator();
+                         i.hasNext(); )
+                    {
+                        scheduleFlush( ( DatagramSessionImpl ) i.next() );
+                    }
                 }
             }
             catch( Throwable t )
@@ -397,18 +396,20 @@
         }
     }
 
-    private void readSession( DatagramSessionImpl session )
+    private void readSession( DatagramChannel channel, RegistrationRequest req ) throws Exception
     {
-
-        ByteBuffer readBuf = ByteBuffer.allocate( session.getReadBufferSize() );
+        ByteBuffer readBuf = ByteBuffer.allocate(
+                ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() );
         try
         {
-            SocketAddress remoteAddress = session.getChannel().receive(
-                    readBuf.buf() );
+            SocketAddress remoteAddress = channel.receive(
+                readBuf.buf() );
             if( remoteAddress != null )
             {
+                DatagramSessionImpl session =
+                    ( DatagramSessionImpl ) newSession( remoteAddress, req.address );
+
                 readBuf.flip();
-                session.setRemoteAddress( remoteAddress );
 
                 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
                 newBuf.put( readBuf );
@@ -418,10 +419,6 @@
                 session.getFilterChain().fireMessageReceived( session, newBuf );
             }
         }
-        catch( IOException e )
-        {
-            ( ( DatagramFilterChain ) session.getFilterChain() ).fireExceptionCaught( session, e );
-        }
         finally
         {
             readBuf.release();
@@ -451,7 +448,7 @@
             }
             catch( IOException e )
             {
-                ( ( DatagramFilterChain ) session.getFilterChain() ).fireExceptionCaught( session, e );
+                session.getFilterChain().fireExceptionCaught( session, e );
             }
         }
     }
@@ -526,7 +523,7 @@
                 session.increaseWrittenBytes( writtenBytes );
                 session.increaseWrittenWriteRequests();
                 buf.reset();
-                ( ( DatagramFilterChain ) session.getFilterChain() ).fireMessageSent( session, req );
+                session.getFilterChain().fireMessageSent( session, req );
             }
         }
     }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Tue Sep 12 02:28:42 2006
@@ -30,6 +30,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandler;
@@ -390,6 +391,13 @@
 
             DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
 
+            DatagramSessionImpl replaceSession = getRecycledSession(session);
+
+            if(replaceSession != null)
+            {
+                session = replaceSession;
+            }
+
             if( key.isReadable() && session.getTrafficMask().isReadable() )
             {
                 readSession( session );
@@ -401,6 +409,30 @@
             }
         }
     }
+    
+    private DatagramSessionImpl getRecycledSession( IoSession session )
+    {
+        IoSessionRecycler sessionRecycler = session.getServiceConfig().getSessionRecycler();
+        DatagramSessionImpl replaceSession = null;
+
+        if ( sessionRecycler != null )
+        {
+            synchronized ( sessionRecycler )
+            {
+                replaceSession = ( DatagramSessionImpl ) sessionRecycler.recycle( session.getLocalAddress(), session
+                        .getRemoteAddress() );
+
+                if ( replaceSession != null )
+                {
+                    return replaceSession;
+                }
+
+                sessionRecycler.put( session );
+            }
+        }
+
+        return null;
+    }
 
     private void readSession( DatagramSessionImpl session )
     {
@@ -552,8 +584,17 @@
             boolean success = false;
             try
             {
-                buildFilterChain( req, session );
-                session.getFilterChain().fireSessionCreated( session );
+                DatagramSessionImpl replaceSession = getRecycledSession( session );
+
+                if ( replaceSession != null )
+                {
+                    session = replaceSession;
+                }
+                else
+                {
+                    buildFilterChain( req, session );
+                    getListeners().fireSessionCreated( session );
+                }
 
                 SelectionKey key = req.channel.register( selector,
                         SelectionKey.OP_READ, session );
@@ -621,6 +662,7 @@
                     ExceptionMonitor.getInstance().exceptionCaught( e );
                 }
                 
+                getListeners().fireSessionDestroyed( session );
                 session.getCloseFuture().setClosed();
                 key.cancel();
                 selector.wakeup(); // wake up again to trigger thread death

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Tue Sep 12 02:28:42 2006
@@ -67,6 +67,7 @@
         }
         else
         {
+            ( ( DatagramAcceptorDelegate ) manager ).getListeners().fireSessionDestroyed( session );
             session.getCloseFuture().setClosed();
         }
     }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Tue Sep 12 02:28:42 2006
@@ -36,7 +36,6 @@
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
-import org.apache.mina.common.support.BaseIoSessionConfig;
 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
 import org.apache.mina.util.Queue;
 
@@ -81,7 +80,7 @@
         this.localAddress = ch.socket().getLocalSocketAddress();
         this.serviceAddress = serviceAddress;
         this.serviceConfig = serviceConfig;
-        
+
         // Apply the initial session settings
         IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
         if( sessionConfig instanceof DatagramSessionConfig )
@@ -99,22 +98,22 @@
             }
         }
     }
-    
+
     public IoService getService()
     {
         return wrapperManager;
     }
-    
+
     public IoServiceConfig getServiceConfig()
     {
         return serviceConfig;
     }
-    
+
     public IoSessionConfig getConfig()
     {
         return config;
     }
-    
+
     DatagramService getManagerDelegate()
     {
         return managerDelegate;
@@ -147,6 +146,7 @@
     
     protected void close0()
     {
+        getServiceConfig().getSessionRecycler().remove( this );
         filterChain.fireFilterClose( this );
     }
 
@@ -177,7 +177,7 @@
             return writeRequestQueue.size();
         }
     }
-    
+
     public int getScheduledWriteBytes()
     {
         synchronized( writeRequestQueue )
@@ -185,7 +185,7 @@
             return writeRequestQueue.byteSize();
         }
     }
-    
+
     public TransportType getTransportType()
     {
         return TransportType.DATAGRAM;
@@ -205,7 +205,7 @@
     {
         return localAddress;
     }
-    
+
     public SocketAddress getServiceAddress()
     {
         return serviceAddress;
@@ -215,13 +215,13 @@
     {
         managerDelegate.updateTrafficMask( this );
     }
-    
+
     int getReadBufferSize()
     {
         return readBufferSize;
     }
-    
-    private class SessionConfigImpl extends BaseIoSessionConfig implements DatagramSessionConfig
+
+    private class SessionConfigImpl extends DatagramSessionConfigImpl implements DatagramSessionConfig
     {
         public int getReceiveBufferSize()
         {

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java Tue Sep 12 02:28:42 2006
@@ -22,27 +22,18 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.mina.common.IoAcceptorConfig;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.support.BaseIoAcceptor;
 import org.apache.mina.common.support.BaseIoAcceptorConfig;
 import org.apache.mina.common.support.BaseIoSessionConfig;
 import org.apache.mina.transport.vmpipe.support.VmPipe;
-import org.apache.mina.util.IdentityHashSet;
 
 /**
  * Binds the specified {@link IoHandler} to the specified
@@ -89,30 +80,10 @@
             boundHandlers.put( address, 
                                new VmPipe( this,
                                           ( VmPipeAddress ) address,
-                                          handler, config ) );
+                                          handler, config, getListeners() ) );
         }
     }
     
-    public Set getManagedSessions( SocketAddress address )
-    {
-        if( address == null )
-            throw new NullPointerException( "address" );
-        
-        VmPipe pipe = null;
-        synchronized( boundHandlers )
-        {
-            pipe = ( VmPipe ) boundHandlers.get( address );
-            if( pipe == null )
-            {
-                throw new IllegalArgumentException( "Address not bound: " + address );
-            }
-        }
-        
-        Set managedSessions = pipe.getManagedServerSessions();
-        return Collections.unmodifiableSet(
-                new IdentityHashSet( Arrays.asList( managedSessions.toArray() ) ) );
-    }
-
     public void unbind( SocketAddress address )
     {
         if( address == null )
@@ -129,60 +100,9 @@
             pipe = ( VmPipe ) boundHandlers.remove( address );
         }
         
-        Set managedSessions = pipe.getManagedServerSessions();
-        
-        IoServiceConfig cfg = pipe.getConfig();
-        boolean disconnectOnUnbind;
-        if( cfg instanceof IoAcceptorConfig )
-        {
-            disconnectOnUnbind = ( ( IoAcceptorConfig ) cfg ).isDisconnectOnUnbind();
-        }
-        else
-        {
-            disconnectOnUnbind = ( ( IoAcceptorConfig ) getDefaultConfig() ).isDisconnectOnUnbind();
-        }
-        if( disconnectOnUnbind && managedSessions != null )
-        {
-            IoSession[] tempSessions = ( IoSession[] ) 
-                                  managedSessions.toArray( new IoSession[ 0 ] );
-            
-            final Object lock = new Object();
-            
-            for( int i = 0; i < tempSessions.length; i++ )
-            {
-                if( !managedSessions.contains( tempSessions[ i ] ) )
-                {
-                    // The session has already been closed and have been 
-                    // removed from managedSessions by the VmPipeFilterChain.
-                    continue;
-                }
-                tempSessions[ i ].close().addListener( new IoFutureListener()
-                {
-                    public void operationComplete( IoFuture future )
-                    {
-                        synchronized( lock )
-                        {
-                            lock.notify();
-                        }
-                    }
-                } );
-            }
-
-            try
-            {
-                synchronized( lock )
-                {
-                    while( !managedSessions.isEmpty() )
-                    {
-                        lock.wait( 1000 );
-                    }
-                }
-            }
-            catch( InterruptedException ie )
-            {
-                // Ignored
-            }
-        }                
+        getListeners().fireServiceDeactivated(
+                this, pipe.getAddress(),
+                pipe.getHandler(), pipe.getConfig() );
     }
     
     public void unbindAll()
@@ -194,22 +114,6 @@
             {
                 unbind( ( SocketAddress ) i.next() );
             }
-        }
-    }
-    
-    public boolean isBound( SocketAddress address )
-    {
-        synchronized( boundHandlers )
-        {
-            return boundHandlers.containsKey( address );
-        }
-    }
-    
-    public Set getBoundAddresses()
-    {
-        synchronized( boundHandlers )
-        {
-            return new HashSet( boundHandlers.keySet() );
         }
     }
     

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Tue Sep 12 02:28:42 2006
@@ -86,6 +86,7 @@
                 new VmPipeSessionImpl(
                         this,
                         config,
+                        getListeners(),
                         new Object(), // lock
                         new AnonymousSocketAddress(),
                         handler,

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java Tue Sep 12 02:28:42 2006
@@ -19,12 +19,9 @@
  */
 package org.apache.mina.transport.vmpipe.support;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 
@@ -38,18 +35,19 @@
     private final VmPipeAddress address;
     private final IoHandler handler;
     private final IoServiceConfig config;
-    private final Set managedClientSessions = Collections.synchronizedSet( new HashSet() );
-    private final Set managedServerSessions = Collections.synchronizedSet( new HashSet() );
+    private final IoServiceListenerSupport listeners;
     
     public VmPipe( VmPipeAcceptor acceptor,
                    VmPipeAddress address,
                    IoHandler handler,
-                   IoServiceConfig config )
+                   IoServiceConfig config,
+                   IoServiceListenerSupport listeners)
     {
         this.acceptor = acceptor;
         this.address = address;
         this.handler = handler;
         this.config = config;
+        this.listeners = listeners;
     }
 
     public VmPipeAcceptor getAcceptor()
@@ -71,14 +69,9 @@
     {
         return config;
     }
-
-    public Set getManagedClientSessions()
-    {
-        return managedClientSessions;
-    }
     
-    public Set getManagedServerSessions()
+    public IoServiceListenerSupport getListeners()
     {
-        return managedServerSessions;
+        return listeners;
     }
 }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Tue Sep 12 02:28:42 2006
@@ -118,8 +118,7 @@
         {
             if( !session.getCloseFuture().isClosed() )
             {
-                s.getManagedSessions().remove( s );
-                s.getFilterChain().fireSessionClosed( session );
+                s.getServiceListeners().fireSessionDestroyed( s );
                 s.remoteSession.close();
             }
         }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Tue Sep 12 02:28:42 2006
@@ -21,7 +21,6 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.util.Set;
 
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoFilterChain;
@@ -34,6 +33,7 @@
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.apache.mina.util.Queue;
 
 /**
@@ -46,14 +46,14 @@
 {
     private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
     
-    private final IoService manager;
+    private final IoService service;
     private final IoServiceConfig serviceConfig;
+    private final IoServiceListenerSupport serviceListeners;
     private final SocketAddress localAddress;
     private final SocketAddress remoteAddress;
     private final SocketAddress serviceAddress;
     private final IoHandler handler;
     private final VmPipeFilterChain filterChain;
-    private final Set managedSessions;
     final VmPipeSessionImpl remoteSession;
     final Object lock;
     final Queue pendingDataQueue;
@@ -61,11 +61,14 @@
     /**
      * Constructor for client-side session.
      */
-    public VmPipeSessionImpl( IoService manager, IoServiceConfig serviceConfig, Object lock, SocketAddress localAddress,
-                   IoHandler handler, VmPipe remoteEntry ) throws IOException
+    public VmPipeSessionImpl(
+            IoService service, IoServiceConfig serviceConfig,
+            IoServiceListenerSupport serviceListeners, Object lock, SocketAddress localAddress,
+            IoHandler handler, VmPipe remoteEntry ) throws IOException
     {
-        this.manager = manager;
+        this.service = service;
         this.serviceConfig = serviceConfig;
+        this.serviceListeners = serviceListeners;
         this.lock = lock;
         this.localAddress = localAddress;
         this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
@@ -73,56 +76,32 @@
         this.filterChain = new VmPipeFilterChain( this );
         this.pendingDataQueue = new Queue();
 
-        this.managedSessions = remoteEntry.getManagedClientSessions();
+        remoteSession = new VmPipeSessionImpl( service, this, remoteEntry );
         
-        remoteSession = new VmPipeSessionImpl( manager, this, remoteEntry );
-        
-        // initialize remote session
+        // initialize connector session
         try
         {
-            remoteEntry.getAcceptor().getFilterChainBuilder().buildFilterChain( remoteSession.getFilterChain() );
-            remoteEntry.getConfig().getFilterChainBuilder().buildFilterChain( remoteSession.getFilterChain() );
-            remoteEntry.getConfig().getThreadModel().buildFilterChain( remoteSession.getFilterChain() );
-            remoteSession.getFilterChain().fireSessionCreated( remoteSession );
-        }
-        catch( Throwable t )
-        {
-            ExceptionMonitor.getInstance().exceptionCaught( t );
-            IOException e = new IOException( "Failed to initialize remote session." );
-            e.initCause( t );
-            throw e;
-        }
-        
-        // initialize client session
-        try
-        {
-            manager.getFilterChainBuilder().buildFilterChain( filterChain );
+            service.getFilterChainBuilder().buildFilterChain( filterChain );
             serviceConfig.getFilterChainBuilder().buildFilterChain( filterChain );
             serviceConfig.getThreadModel().buildFilterChain( filterChain );
-            handler.sessionCreated( this );
+            serviceListeners.fireSessionCreated( this );
         }
         catch( Throwable t )
         {
-            throw ( IOException ) new IOException( "Failed to create a session." ).initCause( t );
+            throw ( IOException ) new IOException( "Failed to create a connector session." ).initCause( t );
         }
 
-        VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
         VmPipeIdleStatusChecker.getInstance().addSession( this );
-        
-        remoteSession.managedSessions.add( remoteSession );
-        this.managedSessions.add( this );
-        
-        remoteSession.getFilterChain().fireSessionOpened( remoteSession );
-        filterChain.fireSessionOpened( this );
     }
 
     /**
      * Constructor for server-side session.
      */
-    private VmPipeSessionImpl( IoService manager, VmPipeSessionImpl remoteSession, VmPipe entry )
+    private VmPipeSessionImpl( IoService service, VmPipeSessionImpl remoteSession, VmPipe entry ) throws IOException
     {
-        this.manager = manager;
+        this.service = service;
         this.serviceConfig = entry.getConfig();
+        this.serviceListeners = entry.getListeners();
         this.lock = remoteSession.lock;
         this.localAddress = this.serviceAddress = remoteSession.remoteAddress;
         this.remoteAddress = remoteSession.localAddress;
@@ -130,17 +109,34 @@
         this.filterChain = new VmPipeFilterChain( this );
         this.remoteSession = remoteSession;
         this.pendingDataQueue = new Queue();
-        this.managedSessions = entry.getManagedServerSessions();
+
+        // initialize acceptor session
+        try
+        {
+            entry.getAcceptor().getFilterChainBuilder().buildFilterChain( this.getFilterChain() );
+            entry.getConfig().getFilterChainBuilder().buildFilterChain( this.getFilterChain() );
+            entry.getConfig().getThreadModel().buildFilterChain( this.getFilterChain() );
+            serviceListeners.fireSessionCreated( this );
+        }
+        catch( Throwable t )
+        {
+            ExceptionMonitor.getInstance().exceptionCaught( t );
+            IOException e = new IOException( "Failed to initialize acceptor session." );
+            e.initCause( t );
+            throw e;
+        }
+        
+        VmPipeIdleStatusChecker.getInstance().addSession( this );
     }
     
-    Set getManagedSessions()
+    public IoService getService()
     {
-        return managedSessions;
+        return service;
     }
-
-    public IoService getService()
+    
+    IoServiceListenerSupport getServiceListeners()
     {
-        return manager;
+        return serviceListeners;
     }
     
     public IoServiceConfig getServiceConfig()

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.util;
+
+/**
+ * A listener for expired object events.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO Make this a inner interface of ExpiringMap
+ */
+public interface ExpirationListener
+{
+    void expired(Object expiredObject);
+}

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,370 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A map with expiration.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO Change time unit to 'seconds'.
+ */
+public class ExpiringMap implements Map
+{
+    public static final long DEFAULT_EXPIRATION_TIME = 5000;
+
+    public static final long DEFAULT_EXPIRER_DELAY = 5000;
+
+    private static volatile int expirerCount = 1;
+
+    private long expirationTimeMillis;
+
+    private long expirerDelay;
+
+    private HashMap delegate;
+
+    private HashMap expirationInfos;
+
+    private List expirationListeners;
+
+    private Expirer expirer;
+
+    public ExpiringMap()
+    {
+        this( new HashMap(), new HashMap(), new LinkedList(), DEFAULT_EXPIRATION_TIME, DEFAULT_EXPIRER_DELAY );
+    }
+
+    public ExpiringMap( long expirationTimeMillis )
+    {
+        this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, DEFAULT_EXPIRER_DELAY );
+    }
+
+    public ExpiringMap( long expirationTimeMillis, long expirerDelay )
+    {
+        this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, expirerDelay );
+    }
+
+    private ExpiringMap( HashMap delegate, HashMap accessTimes, List expirationListeners, long expirationTimeMillis,
+            long expirerDelay )
+    {
+        this.delegate = delegate;
+        this.expirationInfos = accessTimes;
+        this.expirationTimeMillis = expirationTimeMillis;
+        this.expirationListeners = expirationListeners;
+        this.expirerDelay = expirerDelay;
+
+        this.expirer = new Expirer();
+        this.expirer.start();
+    }
+
+    /**
+     * @see java.util.Map#clear()
+     */
+    public void clear()
+    {
+        expirationInfos.clear();
+        delegate.clear();
+    }
+
+    /**
+     * @see java.util.Map#containsKey(java.lang.Object)
+     */
+    public boolean containsKey( Object key )
+    {
+        return delegate.containsKey( key );
+    }
+
+    /**
+     * @see java.util.Map#containsValue(java.lang.Object)
+     */
+    public boolean containsValue( Object value )
+    {
+        return delegate.containsValue( value );
+    }
+
+    /**
+     * @see java.util.Map#entrySet()
+     */
+    public Set entrySet()
+    {
+        return delegate.entrySet();
+    }
+
+    /**
+     * @see java.util.Map#equals(java.util.Map)
+     */
+    public boolean equals( Object o )
+    {
+        return delegate.equals( o );
+    }
+
+    /**
+     * @see java.util.Map#get(java.lang.Object)
+     */
+    public Object get( Object key )
+    {
+        Object object = delegate.get( key );
+
+        if ( object != null )
+        {
+            updateAccessTime( object );
+        }
+
+        return object;
+    }
+
+    public int hashCode()
+    {
+        return delegate.hashCode();
+    }
+
+    public boolean isEmpty()
+    {
+        return delegate.isEmpty();
+    }
+
+    public Set keySet()
+    {
+        return delegate.keySet();
+    }
+
+    public Object put( Object key, Object value )
+    {
+        if ( value != null )
+        {
+            addAccessTime( key, value );
+        }
+
+        return delegate.put( key, value );
+    }
+
+    public void putAll( Map map )
+    {
+        Iterator mapKeyIterator = map.keySet().iterator();
+
+        while ( mapKeyIterator.hasNext() )
+        {
+            Object key = mapKeyIterator.next();
+            Object value = map.get( key );
+
+            if ( value != null )
+            {
+                addAccessTime( key, value );
+            }
+        }
+
+        delegate.putAll( map );
+    }
+
+    public Object remove( Object key )
+    {
+        Object object = delegate.remove( key );
+
+        if ( object != null )
+        {
+            expirationInfos.remove( object );
+        }
+
+        return object;
+    }
+
+    public int size()
+    {
+        return delegate.size();
+    }
+
+    public Collection values()
+    {
+        return delegate.values();
+    }
+
+    private void addAccessTime( Object key, Object value )
+    {
+        ExpirationInfo info = new ExpirationInfo();
+        info.key = key;
+        info.accesstime = System.currentTimeMillis();
+
+        expirationInfos.put( value, info );
+    }
+
+    public void updateAccessTime( Object object )
+    {
+        Object infoObject = expirationInfos.get( object );
+
+        if ( infoObject != null )
+        {
+            ExpirationInfo info = ( ExpirationInfo ) infoObject;
+            info.accesstime = System.currentTimeMillis();
+        }
+    }
+
+    private ExpirationInfo getExpirationInfo( Object object )
+    {
+        Object infoObject = expirationInfos.get( object );
+
+        if ( infoObject != null )
+        {
+            ExpirationInfo info = ( ExpirationInfo ) infoObject;
+
+            return info;
+        }
+
+        return null;
+    }
+
+    public void addExpirationListener( ExpirationListener listener )
+    {
+        synchronized ( expirationListeners )
+        {
+            expirationListeners.add( listener );
+        }
+    }
+
+    public void removeExpirationListener( ExpirationListener listener )
+    {
+        synchronized ( expirationListeners )
+        {
+            expirationListeners.remove( listener );
+        }
+    }
+
+    public Object[] findMappedObjects()
+    {
+        Object results[] = null;
+        synchronized ( delegate )
+        {
+            results = new Object[delegate.size()];
+            results = delegate.values().toArray( results );
+        }
+        return ( results );
+    }
+
+    public void startExpirer()
+    {
+        synchronized ( expirer )
+        {
+            if ( !expirer.isRunning() )
+            {
+                expirer.setRunning( true );
+                expirer.interrupt();
+            }
+        }
+    }
+
+    public void stopExpirer()
+    {
+        synchronized ( expirer )
+        {
+            if ( expirer.isRunning() )
+            {
+                expirer.setRunning( false );
+                expirer.interrupt();
+            }
+        }
+    }
+
+    private class ExpirationInfo
+    {
+        public Object key;
+
+        public long accesstime;
+    }
+
+    private class Expirer extends Thread
+    {
+        private boolean running = true;
+
+        public Expirer()
+        {
+            super( "MapExpirer-" + expirerCount++ );
+        }
+
+        public void run()
+        {
+            while ( running )
+            {
+                processExpires();
+
+                try
+                {
+                    Thread.sleep( expirerDelay );
+                }
+                catch ( InterruptedException e )
+                {
+                }
+            }
+        }
+
+        private void processExpires()
+        {
+            long timeNow = System.currentTimeMillis();
+            Object mappedObjects[] = findMappedObjects();
+
+            for ( int i = 0; i < mappedObjects.length; i++ )
+            {
+                Object mappedObject = mappedObjects[i];
+
+                ExpirationInfo info = getExpirationInfo( mappedObject );
+
+                if ( info != null )
+                {
+                    if ( expirationTimeMillis < 0 )
+                        continue;
+                    long timeIdle = timeNow - info.accesstime;
+
+                    if ( timeIdle >= expirationTimeMillis )
+                    {
+                        delegate.remove( info.key );
+                        expirationInfos.remove( info.key );
+
+                        synchronized ( expirationListeners )
+                        {
+                            Iterator listenerIterator = expirationListeners.iterator();
+
+                            while ( listenerIterator.hasNext() )
+                            {
+                                ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
+
+                                listener.expired( mappedObject );
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        public void setRunning( boolean running )
+        {
+            this.running = running;
+        }
+
+        public boolean isRunning()
+        {
+            return running;
+        }
+    }
+}

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java Tue Sep 12 02:28:42 2006
@@ -33,7 +33,6 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.TransportType;
 import org.easymock.MockControl;
@@ -57,7 +56,7 @@
     
     public void testServiceLifecycle() throws Exception
     {
-        MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+        MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
         IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
         
         // Test activation
@@ -93,25 +92,36 @@
     
     public void testSessionLifecycle() throws Exception
     {
-        IoSession session = new TestSession( ADDRESS );
-        MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+        TestSession session = new TestSession( ADDRESS );
+        
+        MockControl chainControl = MockControl.createStrictControl( IoFilterChain.class );
+        IoFilterChain chain = ( IoFilterChain ) chainControl.getMock();
+        session.setFilterChain( chain );
+        
+        MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
         IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
         
         // Test creation
         listener.sessionCreated( session );
+        chain.fireSessionCreated( session );
+        chain.fireSessionOpened( session);
         
         listenerControl.replay();
+        chainControl.replay();
         
         support.add( listener );
         support.fireSessionCreated( session );
         
         listenerControl.verify();
+        chainControl.verify();
         
         Assert.assertEquals( 1, support.getManagedSessions( ADDRESS ).size() );
         Assert.assertTrue( support.getManagedSessions( ADDRESS ).contains( session ) );
         
         // Test destruction & other side effects
         listenerControl.reset();
+        chainControl.reset();
+        chain.fireSessionClosed( session );
         listener.sessionDestroyed( session );
 
         listenerControl.replay();
@@ -131,34 +141,50 @@
     
     public void testDisconnectOnUnbind() throws Exception
     {
-        MockControl acceptorControl = MockControl.createControl( IoAcceptor.class );
+        MockControl acceptorControl = MockControl.createStrictControl( IoAcceptor.class );
         IoAcceptor acceptor = ( IoAcceptor ) acceptorControl.getMock();
-        MockControl configControl = MockControl.createControl( IoAcceptorConfig.class );
+
+        final TestSession session = new TestSession( acceptor, ADDRESS );
+
+        MockControl configControl = MockControl.createStrictControl( IoAcceptorConfig.class );
         IoAcceptorConfig config = ( IoAcceptorConfig ) configControl.getMock();
-        final IoSession session = new TestSession( acceptor, ADDRESS );
-        MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+
+        MockControl chainControl = MockControl.createStrictControl( IoFilterChain.class );
+        IoFilterChain chain = ( IoFilterChain ) chainControl.getMock();
+        session.setFilterChain( chain );
+        
+        MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
         IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
         
         // Activate a service and create a session.
         listener.serviceActivated( acceptor, ADDRESS, null, config );
         listener.sessionCreated( session );
+        chain.fireSessionCreated( session );
+        chain.fireSessionOpened( session );
         
         listenerControl.replay();
+        chainControl.replay();
         
         support.add( listener );
         support.fireServiceActivated( acceptor, ADDRESS, null, config );
         support.fireSessionCreated( session );
         
         listenerControl.verify();
+        chainControl.verify();
         
         // Deactivate a service and make sure the session is closed & destroyed.
         listenerControl.reset();
+        chainControl.reset();
+
         listener.serviceDeactivated( acceptor, ADDRESS, null, config );
         configControl.expectAndReturn(config.isDisconnectOnUnbind(), true );
         listener.sessionDestroyed( session );
+        chain.fireSessionClosed( session );
 
         listenerControl.replay();
         configControl.replay();
+        chainControl.replay();
+
         new Thread()
         {
             // Emulate I/O service
@@ -166,7 +192,7 @@
             {
                 try
                 {
-                    Thread.sleep( 2000 );
+                    Thread.sleep( 500 );
                 }
                 catch( InterruptedException e )
                 {
@@ -179,6 +205,7 @@
         
         listenerControl.verify();
         configControl.verify();
+        chainControl.verify();
 
         Assert.assertTrue( session.isClosing() );
         Assert.assertEquals( 0, support.getManagedSessions( ADDRESS ).size() );
@@ -187,32 +214,47 @@
     
     public void testConnectorActivation() throws Exception
     {
-        MockControl connectorControl = MockControl.createControl( IoConnector.class );
+        MockControl connectorControl = MockControl.createStrictControl( IoConnector.class );
         IoConnector connector = ( IoConnector ) connectorControl.getMock();
-        final IoSession session = new TestSession( connector, ADDRESS );
-        MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+
+        final TestSession session = new TestSession( connector, ADDRESS );
+
+        MockControl chainControl = MockControl.createStrictControl( IoFilterChain.class );
+        IoFilterChain chain = ( IoFilterChain ) chainControl.getMock();
+        session.setFilterChain( chain );
+
+        MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
         IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
         
         // Creating a session should activate a service automatically.
         listener.serviceActivated( connector, ADDRESS, null, null );
         listener.sessionCreated( session );
+        chain.fireSessionCreated( session );
+        chain.fireSessionOpened( session );
         
         listenerControl.replay();
+        chainControl.replay();
         
         support.add( listener );
         support.fireSessionCreated( session );
         
         listenerControl.verify();
+        chainControl.verify();
         
         // Destroying a session should deactivate a service automatically.
         listenerControl.reset();
+        chainControl.reset();
         listener.sessionDestroyed( session );
+        chain.fireSessionClosed( session );
         listener.serviceDeactivated( connector, ADDRESS, null, null );
         
         listenerControl.replay();
+        chainControl.replay();
+        
         support.fireSessionDestroyed( session );
         
         listenerControl.verify();
+        chainControl.verify();
 
         Assert.assertEquals( 0, support.getManagedSessions( ADDRESS ).size() );
         Assert.assertFalse( support.getManagedSessions( ADDRESS ).contains( session ) );
@@ -222,6 +264,7 @@
     {
         private final IoService service;
         private final SocketAddress serviceAddress;
+        private IoFilterChain filterChain;
         
         public TestSession( SocketAddress serviceAddress )
         {
@@ -245,7 +288,12 @@
 
         public IoFilterChain getFilterChain()
         {
-            return null;
+            return filterChain;
+        }
+        
+        public void setFilterChain( IoFilterChain filterChain )
+        {
+            this.filterChain = filterChain;
         }
 
         public IoHandler getHandler()

Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java Tue Sep 12 02:28:42 2006
@@ -33,6 +33,7 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 import org.apache.mina.util.AvailablePortFinder;
 
 /**
@@ -70,7 +71,11 @@
         {
             ConnectFuture future = connector.connect( new InetSocketAddress( "localhost", port ), new IoHandlerAdapter() );
             future.join();
-            future.getSession().write( ByteBuffer.allocate( 16 ).putInt( 0 ).flip() ).join();
+            
+            WriteFuture writeFuture = future.getSession().write( ByteBuffer.allocate( 16 ).putInt( 0 ).flip() );
+            writeFuture.join();
+            Assert.assertTrue( writeFuture.isWritten() );
+            
             future.getSession().close();
     
             for( int i = 0; i < 30; i ++ )

Added: directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java (added)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,132 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExpiringSessionRecycler;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.AvailablePortFinder;
+
+/**
+ * Tests if datagram sessions are recycled properly.
+ * 
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 436993 $, $Date: 2006-08-26 07:36:56 +0900 (토, 26  8월 2006) $ 
+ */
+public class DatagramRecyclerTest extends TestCase
+{
+    private final IoAcceptor acceptor = new DatagramAcceptor();
+    private final IoConnector connector = new DatagramConnector();
+
+    public DatagramRecyclerTest()
+    {
+    }
+    
+    public void testDatagramRecycler() throws Exception
+    {
+        int port = AvailablePortFinder.getNextAvailable( 1024 );
+        DatagramAcceptorConfig config = new DatagramAcceptorConfig();
+        ExpiringSessionRecycler recycler = new ExpiringSessionRecycler( 1000, 500 );
+        config.setSessionRecycler( recycler );
+        
+        MockHandler acceptorHandler = new MockHandler();
+        MockHandler connectorHandler = new MockHandler();
+        
+        acceptor.bind( new InetSocketAddress( port ), acceptorHandler, config );
+        
+        try
+        {
+            ConnectFuture future = connector.connect(
+                    new InetSocketAddress( "localhost", port ), connectorHandler, config );
+            future.join();
+            Assert.assertTrue( future.isConnected() );
+            
+            // Write whatever to trigger the acceptor.
+            future.getSession().write( ByteBuffer.allocate(1) ).join();
+
+            // Wait until the connection is closed.
+            future.getSession().getCloseFuture().join( 3000 );
+            Assert.assertTrue( future.getSession().getCloseFuture().isClosed() );
+            acceptorHandler.session.getCloseFuture().join( 3000 );
+            Assert.assertTrue( acceptorHandler.session.getCloseFuture().isClosed() );
+            
+            Thread.sleep( 500 );
+
+            Assert.assertEquals( "CROPSECL", connectorHandler.result );
+            Assert.assertEquals( "CROPRECL", acceptorHandler.result );
+        }
+        finally
+        {
+            acceptor.unbind( new InetSocketAddress( port ) );
+        }
+    }
+    
+    private class MockHandler extends IoHandlerAdapter
+    {
+        public IoSession session;
+        public String result = "";
+
+        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+            this.session = session;
+            result += "CA";
+        }
+
+        public void messageReceived(IoSession session, Object message) throws Exception {
+            this.session = session;
+            result += "RE";
+        }
+
+        public void messageSent(IoSession session, Object message) throws Exception {
+            this.session = session;
+            result += "SE";
+        }
+
+        public void sessionClosed(IoSession session) throws Exception {
+            this.session = session;
+            result += "CL";
+        }
+
+        public void sessionCreated(IoSession session) throws Exception {
+            this.session = session;
+            result += "CR";
+        }
+
+        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
+            this.session = session;
+            result += "ID";
+        }
+
+        public void sessionOpened(IoSession session) throws Exception {
+            this.session = session;
+            result += "OP";
+        }
+
+    }
+}