You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2006/09/12 11:28:44 UTC
svn commit: r442541 - in /directory/trunks/mina/core/src:
main/java/org/apache/mina/common/ main/java/org/apache/mina/common/support/
main/java/org/apache/mina/transport/socket/nio/
main/java/org/apache/mina/transport/socket/nio/support/ main/java/org/...
Author: trustin
Date: Tue Sep 12 02:28:42 2006
New Revision: 442541
URL: http://svn.apache.org/viewvc?view=rev&rev=442541
Log:
Resolved issue: DIRMINA-162 (datagram session management fails)
* Added IoSessionRecycler
* Moved common logic related with sessionCreated/Opened/Closed to IoServiceListenerSupport to reduce duplication
* Added ExpiringSessionRecycler and its required classes
* Fixed that IoServiceListener doesn't work with VM-pipe
Added:
directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java (with props)
directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java (with props)
directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java (with props)
directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java (with props)
directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Modified:
directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java
directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java
Added: directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.util.ExpirationListener;
+import org.apache.mina.util.ExpiringMap;
+
+/**
+ * An {@link IoSessionRecycler} with sessions that time out on inactivity.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ *
+ * TODO Change time unit to 'seconds'.
+ */
+public class ExpiringSessionRecycler implements IoSessionRecycler, ExpirationListener
+{
+ private ExpiringMap sessionMap;
+
+ public ExpiringSessionRecycler()
+ {
+ this( ExpiringMap.DEFAULT_EXPIRATION_TIME, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+ }
+
+ public ExpiringSessionRecycler( long expirationTimeMillis )
+ {
+ this( expirationTimeMillis, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+ }
+
+ public ExpiringSessionRecycler( long expirationTimeMillis, long expirerDelay )
+ {
+ // FIXME Use IdentityHashMap if possible.
+ sessionMap = new ExpiringMap( expirationTimeMillis, expirerDelay );
+ sessionMap.addExpirationListener( this );
+ }
+
+ public void put( IoSession session )
+ {
+ Object key = generateKey( session );
+ if ( !sessionMap.containsKey( key ) )
+ {
+ sessionMap.put( key, session );
+ }
+ }
+
+ public IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress )
+ {
+ Object key = generateKey( localAddress, remoteAddress );
+ return ( IoSession ) sessionMap.get( key );
+ }
+
+ public void remove( IoSession session )
+ {
+ Object key = generateKey( session );
+ sessionMap.remove( key );
+ }
+
+ public void expired( Object expiredObject )
+ {
+ IoSession expiredSession = ( IoSession ) expiredObject;
+ expiredSession.close();
+ }
+
+ private Object generateKey( IoSession session )
+ {
+ return generateKey( session.getLocalAddress(), session.getRemoteAddress() );
+ }
+
+ private Object generateKey( SocketAddress localAddress, SocketAddress remoteAddress )
+ {
+ List key = new ArrayList( 2 );
+ key.add( remoteAddress );
+ key.add( localAddress );
+ return key;
+ }
+}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoServiceConfig.java Tue Sep 12 02:28:42 2006
@@ -82,6 +82,19 @@
void setThreadModel( ThreadModel threadModel );
/**
+ * Sets the {@link IoSessionRecycler} for this service. Please note that this
+ * recycler will only be applied to connectionless transports.
+ *
+ * @param sessionRecycler <tt>null</tt> to use the default recycler
+ */
+ void setSessionRecycler( IoSessionRecycler sessionRecycler );
+
+ /**
+ * Returns the {@link IoSessionRecycler} for this service.
+ */
+ IoSessionRecycler getSessionRecycler();
+
+ /**
* Returns a deep clone of this configuration.
*/
Object clone();
Added: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+
+/**
+ * A connectionless transport can recycle existing sessions by assigning an
+ * IoSessionRecyler to its {@link IoServiceConfig}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO More documentation
+ */
+public interface IoSessionRecycler
+{
+ /**
+ * Called when the underlying transport creates or writes a new {@link IoSession}.
+ *
+ * @param session
+ * the new {@link IoSession}.
+ */
+ void put( IoSession session );
+
+ /**
+ * Attempts to retrieve a recycled {@link IoSession}.
+ *
+ * @param localAddress
+ * the local socket address of the {@link IoSession} the
+ * transport wants to recycle.
+ * @param remoteAddress
+ * the remote socket address of the {@link IoSession} the
+ * transport wants to recycle.
+ * @return a recycled {@link IoSession}, or null if one cannot be found.
+ */
+ IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress );
+
+ /**
+ * Called when an {@link IoSession} is explicitly closed.
+ *
+ * @param session
+ * the new {@link IoSession}.
+ */
+ void remove( IoSession session );
+}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java Tue Sep 12 02:28:42 2006
@@ -22,9 +22,11 @@
import java.lang.reflect.Method;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.ExpiringSessionRecycler;
import org.apache.mina.common.IoFilterChainBuilder;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.ExecutorThreadModel;
+import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.ThreadModel;
/**
@@ -35,6 +37,8 @@
*/
public abstract class BaseIoServiceConfig implements IoServiceConfig, Cloneable
{
+ private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
+
/**
* Current filter chain builder.
*/
@@ -49,6 +53,11 @@
* Current thread model.
*/
private ThreadModel threadModel = defaultThreadModel;
+
+ /**
+ * Current session recycler
+ */
+ private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
public BaseIoServiceConfig()
{
@@ -97,6 +106,21 @@
}
this.threadModel = threadModel;
}
+
+ // FIXME There can be a problem if a user changes the recycler after the service is activated.
+ public void setSessionRecycler( IoSessionRecycler sessionRecycler )
+ {
+ if( sessionRecycler == null )
+ {
+ sessionRecycler = DEFAULT_RECYCLER;
+ }
+ this.sessionRecycler = sessionRecycler;
+ }
+
+ public IoSessionRecycler getSessionRecycler()
+ {
+ return sessionRecycler;
+ }
public Object clone()
{
@@ -109,7 +133,6 @@
{
throw ( InternalError ) new InternalError().initCause( e );
}
-
// Try to clone the chain builder.
try
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java Tue Sep 12 02:28:42 2006
@@ -111,14 +111,17 @@
Set sessions;
synchronized( managedSessions )
{
- sessions = ( Set )managedSessions.get( serviceAddress );
+ sessions = ( Set ) managedSessions.get( serviceAddress );
if( sessions == null )
{
sessions = new IdentityHashSet();
}
}
- return Collections.unmodifiableSet( sessions );
+ synchronized( sessions )
+ {
+ return Collections.unmodifiableSet( sessions );
+ }
}
/**
@@ -188,6 +191,7 @@
{
SocketAddress serviceAddress = session.getServiceAddress();
+ // Get the session set.
boolean firstSession = false;
Set sessions;
synchronized( managedSessions )
@@ -201,13 +205,7 @@
}
}
- if( session.getService() instanceof IoConnector && firstSession )
- {
- fireServiceActivated(
- session.getService(), session.getServiceAddress(),
- session.getHandler(), session.getServiceConfig() );
- }
-
+ // If already registered, ignore.
synchronized( sessions )
{
if ( !sessions.add( session ) )
@@ -215,7 +213,20 @@
return;
}
}
+
+ // If the first connector session, fire a virtual service activation event.
+ if( session.getService() instanceof IoConnector && firstSession )
+ {
+ fireServiceActivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+ // Fire session events.
+ session.getFilterChain().fireSessionCreated( session );
+ session.getFilterChain().fireSessionOpened( session);
+
+ // Fire listener events.
synchronized( listeners )
{
for( Iterator i = listeners.iterator(); i.hasNext(); )
@@ -232,17 +243,20 @@
{
SocketAddress serviceAddress = session.getServiceAddress();
+ // Get the session set.
Set sessions;
synchronized( managedSessions )
{
sessions = ( Set ) managedSessions.get( serviceAddress );
}
+ // Ignore if unknown.
if( sessions == null )
{
return;
}
+ // Try to remove the remaining empty seession set after removal.
boolean lastSession = false;
synchronized( sessions )
{
@@ -257,7 +271,10 @@
}
}
-
+ // Fire session events.
+ session.getFilterChain().fireSessionClosed( session );
+
+ // Fire listener events.
try
{
synchronized( listeners )
@@ -270,6 +287,7 @@
}
finally
{
+ // Fire a virtual service deactivation event for the last session of the connector.
if( session.getService() instanceof IoConnector && lastSession )
{
fireServiceDeactivated(
@@ -303,22 +321,26 @@
}
final Object lock = new Object();
+ Set sessionsCopy;
+
+ // Create a copy to avoid ConcurrentModificationException
synchronized( sessions )
{
- for( Iterator i = sessions.iterator(); i.hasNext(); )
+ sessionsCopy = new IdentityHashSet( sessions );
+ }
+
+ for( Iterator i = sessionsCopy.iterator(); i.hasNext(); )
+ {
+ ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
{
- ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
+ public void operationComplete( IoFuture future )
{
- public void operationComplete( IoFuture future )
+ synchronized( lock )
{
- synchronized( lock )
- {
- //noinspection NakedNotify
- lock.notifyAll();
- }
+ lock.notifyAll();
}
- } );
- }
+ }
+ } );
}
try
@@ -327,7 +349,7 @@
{
while( !managedSessions.isEmpty() )
{
- lock.wait( 1000 );
+ lock.wait( 500 );
}
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Tue Sep 12 02:28:42 2006
@@ -350,7 +350,6 @@
getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
- session.getFilterChain().fireSessionCreated( session );
session.getIoProcessor().addNew( session );
success = true;
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Tue Sep 12 02:28:42 2006
@@ -33,8 +33,10 @@
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
import org.apache.mina.common.support.BaseIoConnector;
import org.apache.mina.common.support.DefaultConnectFuture;
import org.apache.mina.util.Queue;
@@ -162,10 +164,9 @@
if( ch.connect( address ) )
{
- SocketSessionImpl session = newSession( ch, handler, config );
- success = true;
DefaultConnectFuture future = new DefaultConnectFuture();
- future.setSession( session );
+ newSession( ch, handler, config, future );
+ success = true;
return future;
}
@@ -282,8 +283,7 @@
try
{
ch.finishConnect();
- SocketSessionImpl session = newSession( ch, entry.handler, entry.config );
- entry.setSession( session );
+ newSession( ch, entry.handler, entry.config, entry );
success = true;
}
catch( Throwable e )
@@ -343,7 +343,7 @@
}
}
- private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoServiceConfig config )
+ private void newSession( SocketChannel ch, IoHandler handler, IoServiceConfig config, final ConnectFuture connectFuture )
throws IOException
{
SocketSessionImpl session = new SocketSessionImpl( this,
@@ -358,14 +358,28 @@
getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
config.getThreadModel().buildFilterChain( session.getFilterChain() );
- session.getFilterChain().fireSessionCreated( session );
+ session.getFilterChain().addFirst("__CONNECT_FUTURE_NOTIFIER__", new IoFilterAdapter()
+ {
+
+ public void sessionCreated( NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ session.getFilterChain().remove( "__CONNECT_FUTURE_NOTIFIER__" );
+ try
+ {
+ nextFilter.sessionCreated( session );
+ }
+ finally
+ {
+ connectFuture.setSession( session );
+ }
+ }
+ });
}
catch( Throwable e )
{
throw ( IOException ) new IOException( "Failed to create a session." ).initCause( e );
}
session.getIoProcessor().addNew( session );
- return session;
}
private SocketIoProcessor nextProcessor()
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Tue Sep 12 02:28:42 2006
@@ -175,7 +175,6 @@
if( registered )
{
session.getServiceListeners().fireSessionCreated( session );
- session.getFilterChain().fireSessionOpened( session );
}
}
}
@@ -225,8 +224,6 @@
{
releaseWriteBuffers( session );
session.getServiceListeners().fireSessionDestroyed( session );
-
- session.getFilterChain().fireSessionClosed( session );
}
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Tue Sep 12 02:28:42 2006
@@ -27,7 +27,6 @@
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,8 +38,10 @@
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.util.Queue;
@@ -64,7 +65,7 @@
private final Queue cancelQueue = new Queue();
private final Queue flushingSessions = new Queue();
private Worker worker;
-
+
/**
* Creates a new instance.
*/
@@ -185,22 +186,6 @@
}
}
- public boolean isBound( SocketAddress address )
- {
- synchronized( channels )
- {
- return channels.containsKey( address );
- }
- }
-
- public Set getBoundAddresses()
- {
- synchronized( channels )
- {
- return new HashSet( channels.keySet() );
- }
- }
-
public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
{
if( remoteAddress == null )
@@ -226,24 +211,44 @@
}
RegistrationRequest req = ( RegistrationRequest ) key.attachment();
- DatagramSessionImpl s = new DatagramSessionImpl(
- wrapper, this,
- req.config, ch, req.handler,
- req.address );
- s.setRemoteAddress( remoteAddress );
- s.setSelectionKey( key );
+ IoSession session;
+ IoSessionRecycler sessionRecycler = req.config.getSessionRecycler();
+ synchronized ( sessionRecycler )
+ {
+ session = sessionRecycler.recycle( localAddress, remoteAddress);
+ if( session != null )
+ {
+ return session;
+ }
+
+ // If a new session needs to be created.
+ DatagramSessionImpl datagramSession = new DatagramSessionImpl(
+ wrapper, this,
+ req.config, ch, req.handler,
+ req.address );
+ datagramSession.setRemoteAddress( remoteAddress );
+ datagramSession.setSelectionKey( key );
+
+ req.config.getSessionRecycler().put( datagramSession );
+ session = datagramSession;
+ }
try
{
- buildFilterChain( req, s );
- s.getFilterChain().fireSessionCreated( s );
+ buildFilterChain( req, session );
+ getListeners().fireSessionCreated( session );
}
catch( Throwable t )
{
ExceptionMonitor.getInstance().exceptionCaught( t );
}
- return s;
+ return session;
+ }
+
+ public IoServiceListenerSupport getListeners()
+ {
+ return super.getListeners();
}
private void buildFilterChain( RegistrationRequest req, IoSession session ) throws Exception
@@ -368,26 +373,20 @@
DatagramChannel ch = ( DatagramChannel ) key.channel();
RegistrationRequest req = ( RegistrationRequest ) key.attachment();
- DatagramSessionImpl session = new DatagramSessionImpl(
- wrapper, this,
- req.config,
- ch, req.handler,
- req.address );
- session.setSelectionKey( key );
-
try
{
- buildFilterChain( req, session );
- ( ( DatagramFilterChain ) session.getFilterChain() ).fireSessionCreated( session );
-
if( key.isReadable() )
{
- readSession( session );
+ readSession( ch, req );
}
if( key.isWritable() )
{
- scheduleFlush( session );
+ for( Iterator i = getManagedSessions( req.address ).iterator();
+ i.hasNext(); )
+ {
+ scheduleFlush( ( DatagramSessionImpl ) i.next() );
+ }
}
}
catch( Throwable t )
@@ -397,18 +396,20 @@
}
}
- private void readSession( DatagramSessionImpl session )
+ private void readSession( DatagramChannel channel, RegistrationRequest req ) throws Exception
{
-
- ByteBuffer readBuf = ByteBuffer.allocate( session.getReadBufferSize() );
+ ByteBuffer readBuf = ByteBuffer.allocate(
+ ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() );
try
{
- SocketAddress remoteAddress = session.getChannel().receive(
- readBuf.buf() );
+ SocketAddress remoteAddress = channel.receive(
+ readBuf.buf() );
if( remoteAddress != null )
{
+ DatagramSessionImpl session =
+ ( DatagramSessionImpl ) newSession( remoteAddress, req.address );
+
readBuf.flip();
- session.setRemoteAddress( remoteAddress );
ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
newBuf.put( readBuf );
@@ -418,10 +419,6 @@
session.getFilterChain().fireMessageReceived( session, newBuf );
}
}
- catch( IOException e )
- {
- ( ( DatagramFilterChain ) session.getFilterChain() ).fireExceptionCaught( session, e );
- }
finally
{
readBuf.release();
@@ -451,7 +448,7 @@
}
catch( IOException e )
{
- ( ( DatagramFilterChain ) session.getFilterChain() ).fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught( session, e );
}
}
}
@@ -526,7 +523,7 @@
session.increaseWrittenBytes( writtenBytes );
session.increaseWrittenWriteRequests();
buf.reset();
- ( ( DatagramFilterChain ) session.getFilterChain() ).fireMessageSent( session, req );
+ session.getFilterChain().fireMessageSent( session, req );
}
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Tue Sep 12 02:28:42 2006
@@ -30,6 +30,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandler;
@@ -390,6 +391,13 @@
DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
+ DatagramSessionImpl replaceSession = getRecycledSession(session);
+
+ if(replaceSession != null)
+ {
+ session = replaceSession;
+ }
+
if( key.isReadable() && session.getTrafficMask().isReadable() )
{
readSession( session );
@@ -401,6 +409,30 @@
}
}
}
+
+ private DatagramSessionImpl getRecycledSession( IoSession session )
+ {
+ IoSessionRecycler sessionRecycler = session.getServiceConfig().getSessionRecycler();
+ DatagramSessionImpl replaceSession = null;
+
+ if ( sessionRecycler != null )
+ {
+ synchronized ( sessionRecycler )
+ {
+ replaceSession = ( DatagramSessionImpl ) sessionRecycler.recycle( session.getLocalAddress(), session
+ .getRemoteAddress() );
+
+ if ( replaceSession != null )
+ {
+ return replaceSession;
+ }
+
+ sessionRecycler.put( session );
+ }
+ }
+
+ return null;
+ }
private void readSession( DatagramSessionImpl session )
{
@@ -552,8 +584,17 @@
boolean success = false;
try
{
- buildFilterChain( req, session );
- session.getFilterChain().fireSessionCreated( session );
+ DatagramSessionImpl replaceSession = getRecycledSession( session );
+
+ if ( replaceSession != null )
+ {
+ session = replaceSession;
+ }
+ else
+ {
+ buildFilterChain( req, session );
+ getListeners().fireSessionCreated( session );
+ }
SelectionKey key = req.channel.register( selector,
SelectionKey.OP_READ, session );
@@ -621,6 +662,7 @@
ExceptionMonitor.getInstance().exceptionCaught( e );
}
+ getListeners().fireSessionDestroyed( session );
session.getCloseFuture().setClosed();
key.cancel();
selector.wakeup(); // wake up again to trigger thread death
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Tue Sep 12 02:28:42 2006
@@ -67,6 +67,7 @@
}
else
{
+ ( ( DatagramAcceptorDelegate ) manager ).getListeners().fireSessionDestroyed( session );
session.getCloseFuture().setClosed();
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Tue Sep 12 02:28:42 2006
@@ -36,7 +36,6 @@
import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
-import org.apache.mina.common.support.BaseIoSessionConfig;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.util.Queue;
@@ -81,7 +80,7 @@
this.localAddress = ch.socket().getLocalSocketAddress();
this.serviceAddress = serviceAddress;
this.serviceConfig = serviceConfig;
-
+
// Apply the initial session settings
IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
if( sessionConfig instanceof DatagramSessionConfig )
@@ -99,22 +98,22 @@
}
}
}
-
+
public IoService getService()
{
return wrapperManager;
}
-
+
public IoServiceConfig getServiceConfig()
{
return serviceConfig;
}
-
+
public IoSessionConfig getConfig()
{
return config;
}
-
+
DatagramService getManagerDelegate()
{
return managerDelegate;
@@ -147,6 +146,7 @@
protected void close0()
{
+ getServiceConfig().getSessionRecycler().remove( this );
filterChain.fireFilterClose( this );
}
@@ -177,7 +177,7 @@
return writeRequestQueue.size();
}
}
-
+
public int getScheduledWriteBytes()
{
synchronized( writeRequestQueue )
@@ -185,7 +185,7 @@
return writeRequestQueue.byteSize();
}
}
-
+
public TransportType getTransportType()
{
return TransportType.DATAGRAM;
@@ -205,7 +205,7 @@
{
return localAddress;
}
-
+
public SocketAddress getServiceAddress()
{
return serviceAddress;
@@ -215,13 +215,13 @@
{
managerDelegate.updateTrafficMask( this );
}
-
+
int getReadBufferSize()
{
return readBufferSize;
}
-
- private class SessionConfigImpl extends BaseIoSessionConfig implements DatagramSessionConfig
+
+ private class SessionConfigImpl extends DatagramSessionConfigImpl implements DatagramSessionConfig
{
public int getReceiveBufferSize()
{
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java Tue Sep 12 02:28:42 2006
@@ -22,27 +22,18 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.mina.common.IoAcceptorConfig;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.support.BaseIoAcceptor;
import org.apache.mina.common.support.BaseIoAcceptorConfig;
import org.apache.mina.common.support.BaseIoSessionConfig;
import org.apache.mina.transport.vmpipe.support.VmPipe;
-import org.apache.mina.util.IdentityHashSet;
/**
* Binds the specified {@link IoHandler} to the specified
@@ -89,30 +80,10 @@
boundHandlers.put( address,
new VmPipe( this,
( VmPipeAddress ) address,
- handler, config ) );
+ handler, config, getListeners() ) );
}
}
- public Set getManagedSessions( SocketAddress address )
- {
- if( address == null )
- throw new NullPointerException( "address" );
-
- VmPipe pipe = null;
- synchronized( boundHandlers )
- {
- pipe = ( VmPipe ) boundHandlers.get( address );
- if( pipe == null )
- {
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
- }
-
- Set managedSessions = pipe.getManagedServerSessions();
- return Collections.unmodifiableSet(
- new IdentityHashSet( Arrays.asList( managedSessions.toArray() ) ) );
- }
-
public void unbind( SocketAddress address )
{
if( address == null )
@@ -129,60 +100,9 @@
pipe = ( VmPipe ) boundHandlers.remove( address );
}
- Set managedSessions = pipe.getManagedServerSessions();
-
- IoServiceConfig cfg = pipe.getConfig();
- boolean disconnectOnUnbind;
- if( cfg instanceof IoAcceptorConfig )
- {
- disconnectOnUnbind = ( ( IoAcceptorConfig ) cfg ).isDisconnectOnUnbind();
- }
- else
- {
- disconnectOnUnbind = ( ( IoAcceptorConfig ) getDefaultConfig() ).isDisconnectOnUnbind();
- }
- if( disconnectOnUnbind && managedSessions != null )
- {
- IoSession[] tempSessions = ( IoSession[] )
- managedSessions.toArray( new IoSession[ 0 ] );
-
- final Object lock = new Object();
-
- for( int i = 0; i < tempSessions.length; i++ )
- {
- if( !managedSessions.contains( tempSessions[ i ] ) )
- {
- // The session has already been closed and have been
- // removed from managedSessions by the VmPipeFilterChain.
- continue;
- }
- tempSessions[ i ].close().addListener( new IoFutureListener()
- {
- public void operationComplete( IoFuture future )
- {
- synchronized( lock )
- {
- lock.notify();
- }
- }
- } );
- }
-
- try
- {
- synchronized( lock )
- {
- while( !managedSessions.isEmpty() )
- {
- lock.wait( 1000 );
- }
- }
- }
- catch( InterruptedException ie )
- {
- // Ignored
- }
- }
+ getListeners().fireServiceDeactivated(
+ this, pipe.getAddress(),
+ pipe.getHandler(), pipe.getConfig() );
}
public void unbindAll()
@@ -194,22 +114,6 @@
{
unbind( ( SocketAddress ) i.next() );
}
- }
- }
-
- public boolean isBound( SocketAddress address )
- {
- synchronized( boundHandlers )
- {
- return boundHandlers.containsKey( address );
- }
- }
-
- public Set getBoundAddresses()
- {
- synchronized( boundHandlers )
- {
- return new HashSet( boundHandlers.keySet() );
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Tue Sep 12 02:28:42 2006
@@ -86,6 +86,7 @@
new VmPipeSessionImpl(
this,
config,
+ getListeners(),
new Object(), // lock
new AnonymousSocketAddress(),
handler,
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipe.java Tue Sep 12 02:28:42 2006
@@ -19,12 +19,9 @@
*/
package org.apache.mina.transport.vmpipe.support;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -38,18 +35,19 @@
private final VmPipeAddress address;
private final IoHandler handler;
private final IoServiceConfig config;
- private final Set managedClientSessions = Collections.synchronizedSet( new HashSet() );
- private final Set managedServerSessions = Collections.synchronizedSet( new HashSet() );
+ private final IoServiceListenerSupport listeners;
public VmPipe( VmPipeAcceptor acceptor,
VmPipeAddress address,
IoHandler handler,
- IoServiceConfig config )
+ IoServiceConfig config,
+ IoServiceListenerSupport listeners)
{
this.acceptor = acceptor;
this.address = address;
this.handler = handler;
this.config = config;
+ this.listeners = listeners;
}
public VmPipeAcceptor getAcceptor()
@@ -71,14 +69,9 @@
{
return config;
}
-
- public Set getManagedClientSessions()
- {
- return managedClientSessions;
- }
- public Set getManagedServerSessions()
+ public IoServiceListenerSupport getListeners()
{
- return managedServerSessions;
+ return listeners;
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Tue Sep 12 02:28:42 2006
@@ -118,8 +118,7 @@
{
if( !session.getCloseFuture().isClosed() )
{
- s.getManagedSessions().remove( s );
- s.getFilterChain().fireSessionClosed( session );
+ s.getServiceListeners().fireSessionDestroyed( s );
s.remoteSession.close();
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Tue Sep 12 02:28:42 2006
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.Set;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoFilterChain;
@@ -34,6 +33,7 @@
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.util.Queue;
/**
@@ -46,14 +46,14 @@
{
private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
- private final IoService manager;
+ private final IoService service;
private final IoServiceConfig serviceConfig;
+ private final IoServiceListenerSupport serviceListeners;
private final SocketAddress localAddress;
private final SocketAddress remoteAddress;
private final SocketAddress serviceAddress;
private final IoHandler handler;
private final VmPipeFilterChain filterChain;
- private final Set managedSessions;
final VmPipeSessionImpl remoteSession;
final Object lock;
final Queue pendingDataQueue;
@@ -61,11 +61,14 @@
/**
* Constructor for client-side session.
*/
- public VmPipeSessionImpl( IoService manager, IoServiceConfig serviceConfig, Object lock, SocketAddress localAddress,
- IoHandler handler, VmPipe remoteEntry ) throws IOException
+ public VmPipeSessionImpl(
+ IoService service, IoServiceConfig serviceConfig,
+ IoServiceListenerSupport serviceListeners, Object lock, SocketAddress localAddress,
+ IoHandler handler, VmPipe remoteEntry ) throws IOException
{
- this.manager = manager;
+ this.service = service;
this.serviceConfig = serviceConfig;
+ this.serviceListeners = serviceListeners;
this.lock = lock;
this.localAddress = localAddress;
this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
@@ -73,56 +76,32 @@
this.filterChain = new VmPipeFilterChain( this );
this.pendingDataQueue = new Queue();
- this.managedSessions = remoteEntry.getManagedClientSessions();
+ remoteSession = new VmPipeSessionImpl( service, this, remoteEntry );
- remoteSession = new VmPipeSessionImpl( manager, this, remoteEntry );
-
- // initialize remote session
+ // initialize connector session
try
{
- remoteEntry.getAcceptor().getFilterChainBuilder().buildFilterChain( remoteSession.getFilterChain() );
- remoteEntry.getConfig().getFilterChainBuilder().buildFilterChain( remoteSession.getFilterChain() );
- remoteEntry.getConfig().getThreadModel().buildFilterChain( remoteSession.getFilterChain() );
- remoteSession.getFilterChain().fireSessionCreated( remoteSession );
- }
- catch( Throwable t )
- {
- ExceptionMonitor.getInstance().exceptionCaught( t );
- IOException e = new IOException( "Failed to initialize remote session." );
- e.initCause( t );
- throw e;
- }
-
- // initialize client session
- try
- {
- manager.getFilterChainBuilder().buildFilterChain( filterChain );
+ service.getFilterChainBuilder().buildFilterChain( filterChain );
serviceConfig.getFilterChainBuilder().buildFilterChain( filterChain );
serviceConfig.getThreadModel().buildFilterChain( filterChain );
- handler.sessionCreated( this );
+ serviceListeners.fireSessionCreated( this );
}
catch( Throwable t )
{
- throw ( IOException ) new IOException( "Failed to create a session." ).initCause( t );
+ throw ( IOException ) new IOException( "Failed to create a connector session." ).initCause( t );
}
- VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
VmPipeIdleStatusChecker.getInstance().addSession( this );
-
- remoteSession.managedSessions.add( remoteSession );
- this.managedSessions.add( this );
-
- remoteSession.getFilterChain().fireSessionOpened( remoteSession );
- filterChain.fireSessionOpened( this );
}
/**
* Constructor for server-side session.
*/
- private VmPipeSessionImpl( IoService manager, VmPipeSessionImpl remoteSession, VmPipe entry )
+ private VmPipeSessionImpl( IoService service, VmPipeSessionImpl remoteSession, VmPipe entry ) throws IOException
{
- this.manager = manager;
+ this.service = service;
this.serviceConfig = entry.getConfig();
+ this.serviceListeners = entry.getListeners();
this.lock = remoteSession.lock;
this.localAddress = this.serviceAddress = remoteSession.remoteAddress;
this.remoteAddress = remoteSession.localAddress;
@@ -130,17 +109,34 @@
this.filterChain = new VmPipeFilterChain( this );
this.remoteSession = remoteSession;
this.pendingDataQueue = new Queue();
- this.managedSessions = entry.getManagedServerSessions();
+
+ // initialize acceptor session
+ try
+ {
+ entry.getAcceptor().getFilterChainBuilder().buildFilterChain( this.getFilterChain() );
+ entry.getConfig().getFilterChainBuilder().buildFilterChain( this.getFilterChain() );
+ entry.getConfig().getThreadModel().buildFilterChain( this.getFilterChain() );
+ serviceListeners.fireSessionCreated( this );
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ IOException e = new IOException( "Failed to initialize acceptor session." );
+ e.initCause( t );
+ throw e;
+ }
+
+ VmPipeIdleStatusChecker.getInstance().addSession( this );
}
- Set getManagedSessions()
+ public IoService getService()
{
- return managedSessions;
+ return service;
}
-
- public IoService getService()
+
+ IoServiceListenerSupport getServiceListeners()
{
- return manager;
+ return serviceListeners;
}
public IoServiceConfig getServiceConfig()
Added: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A listener for expired object events.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO Make this a inner interface of ExpiringMap
+ */
+public interface ExpirationListener
+{
+ void expired(Object expiredObject);
+}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpirationListener.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A map with expiration.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO Change time unit to 'seconds'.
+ */
+public class ExpiringMap implements Map
+{
+ public static final long DEFAULT_EXPIRATION_TIME = 5000;
+
+ public static final long DEFAULT_EXPIRER_DELAY = 5000;
+
+ private static volatile int expirerCount = 1;
+
+ private long expirationTimeMillis;
+
+ private long expirerDelay;
+
+ private HashMap delegate;
+
+ private HashMap expirationInfos;
+
+ private List expirationListeners;
+
+ private Expirer expirer;
+
+ public ExpiringMap()
+ {
+ this( new HashMap(), new HashMap(), new LinkedList(), DEFAULT_EXPIRATION_TIME, DEFAULT_EXPIRER_DELAY );
+ }
+
+ public ExpiringMap( long expirationTimeMillis )
+ {
+ this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, DEFAULT_EXPIRER_DELAY );
+ }
+
+ public ExpiringMap( long expirationTimeMillis, long expirerDelay )
+ {
+ this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, expirerDelay );
+ }
+
+ private ExpiringMap( HashMap delegate, HashMap accessTimes, List expirationListeners, long expirationTimeMillis,
+ long expirerDelay )
+ {
+ this.delegate = delegate;
+ this.expirationInfos = accessTimes;
+ this.expirationTimeMillis = expirationTimeMillis;
+ this.expirationListeners = expirationListeners;
+ this.expirerDelay = expirerDelay;
+
+ this.expirer = new Expirer();
+ this.expirer.start();
+ }
+
+ /**
+ * @see java.util.Map#clear()
+ */
+ public void clear()
+ {
+ expirationInfos.clear();
+ delegate.clear();
+ }
+
+ /**
+ * @see java.util.Map#containsKey(java.lang.Object)
+ */
+ public boolean containsKey( Object key )
+ {
+ return delegate.containsKey( key );
+ }
+
+ /**
+ * @see java.util.Map#containsValue(java.lang.Object)
+ */
+ public boolean containsValue( Object value )
+ {
+ return delegate.containsValue( value );
+ }
+
+ /**
+ * @see java.util.Map#entrySet()
+ */
+ public Set entrySet()
+ {
+ return delegate.entrySet();
+ }
+
+ /**
+ * @see java.util.Map#equals(java.util.Map)
+ */
+ public boolean equals( Object o )
+ {
+ return delegate.equals( o );
+ }
+
+ /**
+ * @see java.util.Map#get(java.lang.Object)
+ */
+ public Object get( Object key )
+ {
+ Object object = delegate.get( key );
+
+ if ( object != null )
+ {
+ updateAccessTime( object );
+ }
+
+ return object;
+ }
+
+ public int hashCode()
+ {
+ return delegate.hashCode();
+ }
+
+ public boolean isEmpty()
+ {
+ return delegate.isEmpty();
+ }
+
+ public Set keySet()
+ {
+ return delegate.keySet();
+ }
+
+ public Object put( Object key, Object value )
+ {
+ if ( value != null )
+ {
+ addAccessTime( key, value );
+ }
+
+ return delegate.put( key, value );
+ }
+
+ public void putAll( Map map )
+ {
+ Iterator mapKeyIterator = map.keySet().iterator();
+
+ while ( mapKeyIterator.hasNext() )
+ {
+ Object key = mapKeyIterator.next();
+ Object value = map.get( key );
+
+ if ( value != null )
+ {
+ addAccessTime( key, value );
+ }
+ }
+
+ delegate.putAll( map );
+ }
+
+ public Object remove( Object key )
+ {
+ Object object = delegate.remove( key );
+
+ if ( object != null )
+ {
+ expirationInfos.remove( object );
+ }
+
+ return object;
+ }
+
+ public int size()
+ {
+ return delegate.size();
+ }
+
+ public Collection values()
+ {
+ return delegate.values();
+ }
+
+ private void addAccessTime( Object key, Object value )
+ {
+ ExpirationInfo info = new ExpirationInfo();
+ info.key = key;
+ info.accesstime = System.currentTimeMillis();
+
+ expirationInfos.put( value, info );
+ }
+
+ public void updateAccessTime( Object object )
+ {
+ Object infoObject = expirationInfos.get( object );
+
+ if ( infoObject != null )
+ {
+ ExpirationInfo info = ( ExpirationInfo ) infoObject;
+ info.accesstime = System.currentTimeMillis();
+ }
+ }
+
+ private ExpirationInfo getExpirationInfo( Object object )
+ {
+ Object infoObject = expirationInfos.get( object );
+
+ if ( infoObject != null )
+ {
+ ExpirationInfo info = ( ExpirationInfo ) infoObject;
+
+ return info;
+ }
+
+ return null;
+ }
+
+ public void addExpirationListener( ExpirationListener listener )
+ {
+ synchronized ( expirationListeners )
+ {
+ expirationListeners.add( listener );
+ }
+ }
+
+ public void removeExpirationListener( ExpirationListener listener )
+ {
+ synchronized ( expirationListeners )
+ {
+ expirationListeners.remove( listener );
+ }
+ }
+
+ public Object[] findMappedObjects()
+ {
+ Object results[] = null;
+ synchronized ( delegate )
+ {
+ results = new Object[delegate.size()];
+ results = delegate.values().toArray( results );
+ }
+ return ( results );
+ }
+
+ public void startExpirer()
+ {
+ synchronized ( expirer )
+ {
+ if ( !expirer.isRunning() )
+ {
+ expirer.setRunning( true );
+ expirer.interrupt();
+ }
+ }
+ }
+
+ public void stopExpirer()
+ {
+ synchronized ( expirer )
+ {
+ if ( expirer.isRunning() )
+ {
+ expirer.setRunning( false );
+ expirer.interrupt();
+ }
+ }
+ }
+
+ private class ExpirationInfo
+ {
+ public Object key;
+
+ public long accesstime;
+ }
+
+ private class Expirer extends Thread
+ {
+ private boolean running = true;
+
+ public Expirer()
+ {
+ super( "MapExpirer-" + expirerCount++ );
+ }
+
+ public void run()
+ {
+ while ( running )
+ {
+ processExpires();
+
+ try
+ {
+ Thread.sleep( expirerDelay );
+ }
+ catch ( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ private void processExpires()
+ {
+ long timeNow = System.currentTimeMillis();
+ Object mappedObjects[] = findMappedObjects();
+
+ for ( int i = 0; i < mappedObjects.length; i++ )
+ {
+ Object mappedObject = mappedObjects[i];
+
+ ExpirationInfo info = getExpirationInfo( mappedObject );
+
+ if ( info != null )
+ {
+ if ( expirationTimeMillis < 0 )
+ continue;
+ long timeIdle = timeNow - info.accesstime;
+
+ if ( timeIdle >= expirationTimeMillis )
+ {
+ delegate.remove( info.key );
+ expirationInfos.remove( info.key );
+
+ synchronized ( expirationListeners )
+ {
+ Iterator listenerIterator = expirationListeners.iterator();
+
+ while ( listenerIterator.hasNext() )
+ {
+ ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
+
+ listener.expired( mappedObject );
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void setRunning( boolean running )
+ {
+ this.running = running;
+ }
+
+ public boolean isRunning()
+ {
+ return running;
+ }
+ }
+}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/common/support/IoServiceListenerSupportTest.java Tue Sep 12 02:28:42 2006
@@ -33,7 +33,6 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.TransportType;
import org.easymock.MockControl;
@@ -57,7 +56,7 @@
public void testServiceLifecycle() throws Exception
{
- MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+ MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
// Test activation
@@ -93,25 +92,36 @@
public void testSessionLifecycle() throws Exception
{
- IoSession session = new TestSession( ADDRESS );
- MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+ TestSession session = new TestSession( ADDRESS );
+
+ MockControl chainControl = MockControl.createStrictControl( IoFilterChain.class );
+ IoFilterChain chain = ( IoFilterChain ) chainControl.getMock();
+ session.setFilterChain( chain );
+
+ MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
// Test creation
listener.sessionCreated( session );
+ chain.fireSessionCreated( session );
+ chain.fireSessionOpened( session);
listenerControl.replay();
+ chainControl.replay();
support.add( listener );
support.fireSessionCreated( session );
listenerControl.verify();
+ chainControl.verify();
Assert.assertEquals( 1, support.getManagedSessions( ADDRESS ).size() );
Assert.assertTrue( support.getManagedSessions( ADDRESS ).contains( session ) );
// Test destruction & other side effects
listenerControl.reset();
+ chainControl.reset();
+ chain.fireSessionClosed( session );
listener.sessionDestroyed( session );
listenerControl.replay();
@@ -131,34 +141,50 @@
public void testDisconnectOnUnbind() throws Exception
{
- MockControl acceptorControl = MockControl.createControl( IoAcceptor.class );
+ MockControl acceptorControl = MockControl.createStrictControl( IoAcceptor.class );
IoAcceptor acceptor = ( IoAcceptor ) acceptorControl.getMock();
- MockControl configControl = MockControl.createControl( IoAcceptorConfig.class );
+
+ final TestSession session = new TestSession( acceptor, ADDRESS );
+
+ MockControl configControl = MockControl.createStrictControl( IoAcceptorConfig.class );
IoAcceptorConfig config = ( IoAcceptorConfig ) configControl.getMock();
- final IoSession session = new TestSession( acceptor, ADDRESS );
- MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+
+ MockControl chainControl = MockControl.createStrictControl( IoFilterChain.class );
+ IoFilterChain chain = ( IoFilterChain ) chainControl.getMock();
+ session.setFilterChain( chain );
+
+ MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
// Activate a service and create a session.
listener.serviceActivated( acceptor, ADDRESS, null, config );
listener.sessionCreated( session );
+ chain.fireSessionCreated( session );
+ chain.fireSessionOpened( session );
listenerControl.replay();
+ chainControl.replay();
support.add( listener );
support.fireServiceActivated( acceptor, ADDRESS, null, config );
support.fireSessionCreated( session );
listenerControl.verify();
+ chainControl.verify();
// Deactivate a service and make sure the session is closed & destroyed.
listenerControl.reset();
+ chainControl.reset();
+
listener.serviceDeactivated( acceptor, ADDRESS, null, config );
configControl.expectAndReturn(config.isDisconnectOnUnbind(), true );
listener.sessionDestroyed( session );
+ chain.fireSessionClosed( session );
listenerControl.replay();
configControl.replay();
+ chainControl.replay();
+
new Thread()
{
// Emulate I/O service
@@ -166,7 +192,7 @@
{
try
{
- Thread.sleep( 2000 );
+ Thread.sleep( 500 );
}
catch( InterruptedException e )
{
@@ -179,6 +205,7 @@
listenerControl.verify();
configControl.verify();
+ chainControl.verify();
Assert.assertTrue( session.isClosing() );
Assert.assertEquals( 0, support.getManagedSessions( ADDRESS ).size() );
@@ -187,32 +214,47 @@
public void testConnectorActivation() throws Exception
{
- MockControl connectorControl = MockControl.createControl( IoConnector.class );
+ MockControl connectorControl = MockControl.createStrictControl( IoConnector.class );
IoConnector connector = ( IoConnector ) connectorControl.getMock();
- final IoSession session = new TestSession( connector, ADDRESS );
- MockControl listenerControl = MockControl.createControl( IoServiceListener.class );
+
+ final TestSession session = new TestSession( connector, ADDRESS );
+
+ MockControl chainControl = MockControl.createStrictControl( IoFilterChain.class );
+ IoFilterChain chain = ( IoFilterChain ) chainControl.getMock();
+ session.setFilterChain( chain );
+
+ MockControl listenerControl = MockControl.createStrictControl( IoServiceListener.class );
IoServiceListener listener = ( IoServiceListener ) listenerControl.getMock();
// Creating a session should activate a service automatically.
listener.serviceActivated( connector, ADDRESS, null, null );
listener.sessionCreated( session );
+ chain.fireSessionCreated( session );
+ chain.fireSessionOpened( session );
listenerControl.replay();
+ chainControl.replay();
support.add( listener );
support.fireSessionCreated( session );
listenerControl.verify();
+ chainControl.verify();
// Destroying a session should deactivate a service automatically.
listenerControl.reset();
+ chainControl.reset();
listener.sessionDestroyed( session );
+ chain.fireSessionClosed( session );
listener.serviceDeactivated( connector, ADDRESS, null, null );
listenerControl.replay();
+ chainControl.replay();
+
support.fireSessionDestroyed( session );
listenerControl.verify();
+ chainControl.verify();
Assert.assertEquals( 0, support.getManagedSessions( ADDRESS ).size() );
Assert.assertFalse( support.getManagedSessions( ADDRESS ).contains( session ) );
@@ -222,6 +264,7 @@
{
private final IoService service;
private final SocketAddress serviceAddress;
+ private IoFilterChain filterChain;
public TestSession( SocketAddress serviceAddress )
{
@@ -245,7 +288,12 @@
public IoFilterChain getFilterChain()
{
- return null;
+ return filterChain;
+ }
+
+ public void setFilterChain( IoFilterChain filterChain )
+ {
+ this.filterChain = filterChain;
}
public IoHandler getHandler()
Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java?view=diff&rev=442541&r1=442540&r2=442541
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramConfigTest.java Tue Sep 12 02:28:42 2006
@@ -33,6 +33,7 @@
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
import org.apache.mina.util.AvailablePortFinder;
/**
@@ -70,7 +71,11 @@
{
ConnectFuture future = connector.connect( new InetSocketAddress( "localhost", port ), new IoHandlerAdapter() );
future.join();
- future.getSession().write( ByteBuffer.allocate( 16 ).putInt( 0 ).flip() ).join();
+
+ WriteFuture writeFuture = future.getSession().write( ByteBuffer.allocate( 16 ).putInt( 0 ).flip() );
+ writeFuture.join();
+ Assert.assertTrue( writeFuture.isWritten() );
+
future.getSession().close();
for( int i = 0; i < 30; i ++ )
Added: directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?view=auto&rev=442541
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java (added)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java Tue Sep 12 02:28:42 2006
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExpiringSessionRecycler;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.AvailablePortFinder;
+
+/**
+ * Tests if datagram sessions are recycled properly.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 436993 $, $Date: 2006-08-26 07:36:56 +0900 (í , 26 8ì 2006) $
+ */
+public class DatagramRecyclerTest extends TestCase
+{
+ private final IoAcceptor acceptor = new DatagramAcceptor();
+ private final IoConnector connector = new DatagramConnector();
+
+ public DatagramRecyclerTest()
+ {
+ }
+
+ public void testDatagramRecycler() throws Exception
+ {
+ int port = AvailablePortFinder.getNextAvailable( 1024 );
+ DatagramAcceptorConfig config = new DatagramAcceptorConfig();
+ ExpiringSessionRecycler recycler = new ExpiringSessionRecycler( 1000, 500 );
+ config.setSessionRecycler( recycler );
+
+ MockHandler acceptorHandler = new MockHandler();
+ MockHandler connectorHandler = new MockHandler();
+
+ acceptor.bind( new InetSocketAddress( port ), acceptorHandler, config );
+
+ try
+ {
+ ConnectFuture future = connector.connect(
+ new InetSocketAddress( "localhost", port ), connectorHandler, config );
+ future.join();
+ Assert.assertTrue( future.isConnected() );
+
+ // Write whatever to trigger the acceptor.
+ future.getSession().write( ByteBuffer.allocate(1) ).join();
+
+ // Wait until the connection is closed.
+ future.getSession().getCloseFuture().join( 3000 );
+ Assert.assertTrue( future.getSession().getCloseFuture().isClosed() );
+ acceptorHandler.session.getCloseFuture().join( 3000 );
+ Assert.assertTrue( acceptorHandler.session.getCloseFuture().isClosed() );
+
+ Thread.sleep( 500 );
+
+ Assert.assertEquals( "CROPSECL", connectorHandler.result );
+ Assert.assertEquals( "CROPRECL", acceptorHandler.result );
+ }
+ finally
+ {
+ acceptor.unbind( new InetSocketAddress( port ) );
+ }
+ }
+
+ private class MockHandler extends IoHandlerAdapter
+ {
+ public IoSession session;
+ public String result = "";
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ this.session = session;
+ result += "CA";
+ }
+
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ this.session = session;
+ result += "RE";
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception {
+ this.session = session;
+ result += "SE";
+ }
+
+ public void sessionClosed(IoSession session) throws Exception {
+ this.session = session;
+ result += "CL";
+ }
+
+ public void sessionCreated(IoSession session) throws Exception {
+ this.session = session;
+ result += "CR";
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
+ this.session = session;
+ result += "ID";
+ }
+
+ public void sessionOpened(IoSession session) throws Exception {
+ this.session = session;
+ result += "OP";
+ }
+
+ }
+}