You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2006/09/20 04:28:08 UTC
svn commit: r448033 - in /directory/trunks/mina/core/src:
main/java/org/apache/mina/common/ main/java/org/apache/mina/util/
test/java/org/apache/mina/transport/socket/nio/
Author: trustin
Date: Tue Sep 19 19:28:07 2006
New Revision: 448033
URL: http://svn.apache.org/viewvc?view=rev&rev=448033
Log:
Applied Greg's patch on DIRMINA-162 (datagram session management fails) with small modification:
* Changed the type of TTL and expiration interval to integer (seconds).
* ExpirationSessionRecycler doesn't implement ExpirationListener directly. I made an inner class instead.
* Removed the static getInstance() method in ExpiringSessionRecycler because we are already using a static global expirer by default. We need some documentation on creating many recyclers though.
Modified:
directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java Tue Sep 19 19:28:07 2006
@@ -29,36 +29,41 @@
/**
* An {@link IoSessionRecycler} with sessions that time out on inactivity.
*
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO Document me.
*
- * TODO Change time unit to 'seconds'.
- * TODO Make thread-safe.
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
*/
-public class ExpiringSessionRecycler implements IoSessionRecycler, ExpirationListener
+public class ExpiringSessionRecycler implements IoSessionRecycler
{
private ExpiringMap sessionMap;
-
+
+ private ExpiringMap.Expirer mapExpirer;
+
public ExpiringSessionRecycler()
{
- this( ExpiringMap.DEFAULT_EXPIRATION_TIME, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+ this( ExpiringMap.DEFAULT_TIME_TO_LIVE );
}
-
- public ExpiringSessionRecycler( long expirationTimeMillis )
+
+ public ExpiringSessionRecycler( int timeToLive )
{
- this( expirationTimeMillis, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+ this( timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL );
}
-
- public ExpiringSessionRecycler( long expirationTimeMillis, long expirerDelay )
+
+ public ExpiringSessionRecycler( int timeToLive, int expirationInterval )
{
- // FIXME Use IdentityHashMap if possible.
- sessionMap = new ExpiringMap( expirationTimeMillis, expirerDelay );
- sessionMap.addExpirationListener( this );
+ sessionMap = new ExpiringMap( timeToLive, expirationInterval );
+ mapExpirer = sessionMap.getExpirer();
+ sessionMap.addExpirationListener( new DefaultExpirationListener() );
}
public void put( IoSession session )
{
+ mapExpirer.startExpiringIfNotStarted();
+
Object key = generateKey( session );
- if ( !sessionMap.containsKey( key ) )
+
+ if( !sessionMap.containsKey( key ) )
{
sessionMap.put( key, session );
}
@@ -66,20 +71,37 @@
public IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress )
{
- Object key = generateKey( localAddress, remoteAddress );
- return ( IoSession ) sessionMap.get( key );
+ return ( IoSession ) sessionMap.get( generateKey( localAddress, remoteAddress ) );
}
public void remove( IoSession session )
{
- Object key = generateKey( session );
- sessionMap.remove( key );
+ sessionMap.remove( generateKey( session ) );
+ }
+
+ public void stopExpiring()
+ {
+ mapExpirer.stopExpiring();
}
- public void expired( Object expiredObject )
+ public int getExpirationInterval()
{
- IoSession expiredSession = ( IoSession ) expiredObject;
- expiredSession.close();
+ return sessionMap.getExpirationInterval();
+ }
+
+ public int getTimeToLive()
+ {
+ return sessionMap.getTimeToLive();
+ }
+
+ public void setExpirationInterval( int expirationInterval )
+ {
+ sessionMap.setExpirationInterval( expirationInterval );
+ }
+
+ public void setTimeToLive( int timeToLive )
+ {
+ sessionMap.setTimeToLive( timeToLive );
}
private Object generateKey( IoSession session )
@@ -93,5 +115,15 @@
key.add( remoteAddress );
key.add( localAddress );
return key;
+ }
+
+ private class DefaultExpirationListener implements ExpirationListener
+ {
+ public void expired( Object expiredObject )
+ {
+ IoSession expiredSession = ( IoSession ) expiredObject;
+
+ expiredSession.close();
+ }
}
}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java Tue Sep 19 19:28:07 2006
@@ -31,6 +31,23 @@
public interface IoSessionRecycler
{
/**
+ * A dummy recycler that doesn't recycle any sessions. Using this recycler will
+ * make all session lifecycle events to be fired for every I/O for all connectionless
+ * sessions.
+ */
+ static IoSessionRecycler NOOP = new IoSessionRecycler()
+ {
+ public void put( IoSession session ) {}
+ public IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress )
+ {
+ return null;
+ }
+ public void remove( IoSession session )
+ {
+ }
+ };
+
+ /**
* Called when the underlying transport creates or writes a new {@link IoSession}.
*
* @param session
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java Tue Sep 19 19:28:07 2006
@@ -20,122 +20,109 @@
package org.apache.mina.util;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReadWriteLock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* A map with expiration.
*
* @author The Apache Directory Project (mina-dev@directory.apache.org)
- * TODO Change time unit to 'seconds'.
- * TODO Make thread-safe.
*/
public class ExpiringMap implements Map
{
- public static final long DEFAULT_EXPIRATION_TIME = 60000;
+ public static final int DEFAULT_TIME_TO_LIVE = 60;
- public static final long DEFAULT_EXPIRER_DELAY = 1000;
+ public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
private static volatile int expirerCount = 1;
- private long expirationTimeMillis;
-
- private long expirerDelay;
-
- private HashMap delegate;
+ private final ConcurrentHashMap delegate;
- private HashMap expirationInfos;
+ private final CopyOnWriteArrayList expirationListeners;
- private List expirationListeners;
-
- private Expirer expirer;
+ private final Expirer expirer;
public ExpiringMap()
{
- this( new HashMap(), new HashMap(), new LinkedList(), DEFAULT_EXPIRATION_TIME, DEFAULT_EXPIRER_DELAY );
+ this( DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL );
}
- public ExpiringMap( long expirationTimeMillis )
+ public ExpiringMap( int timeToLive )
{
- this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, DEFAULT_EXPIRER_DELAY );
+ this( timeToLive, DEFAULT_EXPIRATION_INTERVAL );
}
- public ExpiringMap( long expirationTimeMillis, long expirerDelay )
+ public ExpiringMap( int timeToLive, int expirationInterval )
{
- this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, expirerDelay );
+ this( new ConcurrentHashMap(), new CopyOnWriteArrayList(), timeToLive, expirationInterval );
}
- private ExpiringMap( HashMap delegate, HashMap accessTimes, List expirationListeners, long expirationTimeMillis,
- long expirerDelay )
+ private ExpiringMap(
+ ConcurrentHashMap delegate, CopyOnWriteArrayList expirationListeners,
+ int timeToLive, int expirationInterval )
{
this.delegate = delegate;
- this.expirationInfos = accessTimes;
- this.expirationTimeMillis = expirationTimeMillis;
this.expirationListeners = expirationListeners;
- this.expirerDelay = expirerDelay;
this.expirer = new Expirer();
- this.expirer.start();
+ expirer.setTimeToLive( timeToLive );
+ expirer.setExpirationInterval( expirationInterval );
}
- /**
- * @see java.util.Map#clear()
- */
- public void clear()
+ public Object put( Object key, Object value )
{
- expirationInfos.clear();
- delegate.clear();
+ return delegate.put( key, new ExpiringObject( key, value, System.currentTimeMillis() ) );
+ }
+
+ public Object get( Object key )
+ {
+ Object object = delegate.get( key );
+
+ if( object != null )
+ {
+ ExpiringObject expObject = ( ExpiringObject ) object;
+ expObject.setLastAccessTime( System.currentTimeMillis() );
+
+ return expObject.getValue();
+ }
+
+ return object;
+ }
+
+ public Object remove( Object key )
+ {
+ return delegate.remove( key );
}
- /**
- * @see java.util.Map#containsKey(java.lang.Object)
- */
public boolean containsKey( Object key )
{
return delegate.containsKey( key );
}
- /**
- * @see java.util.Map#containsValue(java.lang.Object)
- */
public boolean containsValue( Object value )
{
return delegate.containsValue( value );
}
- /**
- * @see java.util.Map#entrySet()
- */
- public Set entrySet()
+ public int size()
{
- return delegate.entrySet();
+ return delegate.size();
}
- /**
- * @see java.util.Map#equals(java.util.Map)
- */
- public boolean equals( Object o )
+ public boolean isEmpty()
{
- return delegate.equals( o );
+ return delegate.isEmpty();
}
- /**
- * @see java.util.Map#get(java.lang.Object)
- */
- public Object get( Object key )
+ public void clear()
{
- Object object = delegate.get( key );
-
- if ( object != null )
- {
- updateAccessTime( object );
- }
-
- return object;
+ delegate.clear();
}
public int hashCode()
@@ -143,178 +130,180 @@
return delegate.hashCode();
}
- public boolean isEmpty()
- {
- return delegate.isEmpty();
- }
-
public Set keySet()
{
return delegate.keySet();
}
- public Object put( Object key, Object value )
+ public boolean equals( Object obj )
{
- if ( value != null )
- {
- addAccessTime( key, value );
- }
-
- return delegate.put( key, value );
+ return delegate.equals( obj );
}
- public void putAll( Map map )
+ public void putAll( Map inMap )
{
- Iterator mapKeyIterator = map.keySet().iterator();
-
- while ( mapKeyIterator.hasNext() )
+ synchronized( inMap )
{
- Object key = mapKeyIterator.next();
- Object value = map.get( key );
+ Iterator inMapKeysIt = inMap.keySet().iterator();
- if ( value != null )
+ while( inMapKeysIt.hasNext() )
{
- addAccessTime( key, value );
+ Object key = inMapKeysIt.next();
+ Object value = inMap.get( key );
+
+ if( value instanceof ExpiringObject )
+ {
+ delegate.put( key, value );
+ }
}
}
-
- delegate.putAll( map );
}
- public Object remove( Object key )
+ public Collection values()
{
- Object object = delegate.remove( key );
-
- if ( object != null )
- {
- expirationInfos.remove( object );
- }
+ return delegate.values();
+ }
- return object;
+ public Set entrySet()
+ {
+ return delegate.entrySet();
}
- public int size()
+ public void addExpirationListener( ExpirationListener listener )
{
- return delegate.size();
+ expirationListeners.add( listener );
}
- public Collection values()
+ public void removeExpirationListener( ExpirationListener listener )
{
- return delegate.values();
+ expirationListeners.remove( listener );
+ }
+
+ public Expirer getExpirer()
+ {
+ return expirer;
}
- private void addAccessTime( Object key, Object value )
+ public int getExpirationInterval()
{
- ExpirationInfo info = new ExpirationInfo();
- info.key = key;
- info.accesstime = System.currentTimeMillis();
+ return expirer.getExpirationInterval();
+ }
- expirationInfos.put( value, info );
+ public int getTimeToLive()
+ {
+ return expirer.getTimeToLive();
}
- public void updateAccessTime( Object object )
+ public void setExpirationInterval( int expirationInterval )
{
- Object infoObject = expirationInfos.get( object );
+ expirer.setExpirationInterval( expirationInterval );
+ }
- if ( infoObject != null )
- {
- ExpirationInfo info = ( ExpirationInfo ) infoObject;
- info.accesstime = System.currentTimeMillis();
- }
+ public void setTimeToLive( int timeToLive )
+ {
+ expirer.setTimeToLive( timeToLive );
}
- private ExpirationInfo getExpirationInfo( Object object )
+ private class ExpiringObject
{
- Object infoObject = expirationInfos.get( object );
+ private Object key;
+
+ private Object value;
+
+ private long lastAccessTime;
- if ( infoObject != null )
+ private ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
+
+ public ExpiringObject( Object key, Object value, long lastAccessTime )
{
- ExpirationInfo info = ( ExpirationInfo ) infoObject;
+ if( value == null )
+ {
+ throw new IllegalArgumentException( "An expiring object cannot be null." );
+ }
- return info;
+ this.key = key;
+ this.value = value;
+ this.lastAccessTime = lastAccessTime;
}
- return null;
- }
+ public long getLastAccessTime()
+ {
+ lastAccessTimeLock.readLock().lock();
- public void addExpirationListener( ExpirationListener listener )
- {
- synchronized ( expirationListeners )
+ try
+ {
+ return lastAccessTime;
+ }
+ finally
+ {
+ lastAccessTimeLock.readLock().unlock();
+ }
+ }
+
+ public void setLastAccessTime( long lastAccessTime )
{
- expirationListeners.add( listener );
+ lastAccessTimeLock.writeLock().lock();
+
+ try
+ {
+ this.lastAccessTime = lastAccessTime;
+ }
+ finally
+ {
+ lastAccessTimeLock.writeLock().unlock();
+ }
}
- }
- public void removeExpirationListener( ExpirationListener listener )
- {
- synchronized ( expirationListeners )
+ public Object getKey()
{
- expirationListeners.remove( listener );
+ return key;
}
- }
- public Object[] findMappedObjects()
- {
- Object results[] = null;
- synchronized ( delegate )
+ public Object getValue()
{
- results = new Object[delegate.size()];
- results = delegate.values().toArray( results );
+ return value;
}
- return ( results );
- }
- public void startExpirer()
- {
- synchronized ( expirer )
+ public boolean equals( Object obj )
{
- if ( !expirer.isRunning() )
- {
- expirer.setRunning( true );
- expirer.interrupt();
- }
+ return value.equals( obj );
}
- }
- public void stopExpirer()
- {
- synchronized ( expirer )
+ public int hashCode()
{
- if ( expirer.isRunning() )
- {
- expirer.setRunning( false );
- expirer.interrupt();
- }
+ return value.hashCode();
}
}
- private class ExpirationInfo
+ public class Expirer implements Runnable
{
- public Object key;
+ private ReadWriteLock stateLock = new ReentrantReadWriteLock();
- public long accesstime;
- }
+ private long timeToLiveMillis;
- private class Expirer extends Thread
- {
- private boolean running = true;
+ private long expirationIntervalMillis;
+
+ private boolean running = false;
+
+ private final Thread expirerThread;
public Expirer()
{
- super( "MapExpirer-" + expirerCount++ );
+ expirerThread = new Thread( this, "ExpiringMapExpirer-" + ( expirerCount++ ) );
+ expirerThread.setDaemon( true );
}
public void run()
{
- while ( running )
+ while( running )
{
processExpires();
try
{
- Thread.sleep( expirerDelay );
+ Thread.sleep( expirationIntervalMillis );
}
- catch ( InterruptedException e )
+ catch( InterruptedException e )
{
}
}
@@ -323,49 +312,162 @@
private void processExpires()
{
long timeNow = System.currentTimeMillis();
- Object mappedObjects[] = findMappedObjects();
- for ( int i = 0; i < mappedObjects.length; i++ )
+ Iterator expiringObjectsIterator = delegate.values().iterator();
+
+ while( expiringObjectsIterator.hasNext() )
{
- Object mappedObject = mappedObjects[i];
+ ExpiringObject expObject = ( ExpiringObject ) expiringObjectsIterator.next();
- ExpirationInfo info = getExpirationInfo( mappedObject );
+ if( timeToLiveMillis <= 0 )
+ continue;
- if ( info != null )
+ long timeIdle = timeNow - expObject.getLastAccessTime();
+
+ if( timeIdle >= timeToLiveMillis )
{
- if ( expirationTimeMillis < 0 )
- continue;
- long timeIdle = timeNow - info.accesstime;
+ delegate.remove( expObject.getKey() );
+
+ Iterator listenerIterator = expirationListeners.iterator();
- if ( timeIdle >= expirationTimeMillis )
+ while( listenerIterator.hasNext() )
{
- delegate.remove( info.key );
- expirationInfos.remove( info.key );
+ ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
- synchronized ( expirationListeners )
- {
- Iterator listenerIterator = expirationListeners.iterator();
-
- while ( listenerIterator.hasNext() )
- {
- ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
-
- listener.expired( mappedObject );
- }
- }
+ listener.expired( expObject.getValue() );
}
}
}
}
- public void setRunning( boolean running )
+ public void startExpiring()
+ {
+ stateLock.writeLock().lock();
+
+ try
+ {
+ if( !running )
+ {
+ running = true;
+ expirerThread.start();
+ }
+ }
+ finally
+ {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public void startExpiringIfNotStarted()
+ {
+ stateLock.readLock().lock();
+ if( running )
+ {
+ stateLock.readLock().unlock();
+ return;
+ }
+
+ stateLock.writeLock().lock();
+ try
+ {
+ if( !running )
+ {
+ running = true;
+ expirerThread.start();
+ }
+ }
+ finally
+ {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public void stopExpiring()
{
- this.running = running;
+ stateLock.writeLock().lock();
+
+ try
+ {
+ if( running )
+ {
+ running = false;
+ expirerThread.interrupt();
+ }
+ }
+ finally
+ {
+ stateLock.writeLock().unlock();
+ }
}
public boolean isRunning()
{
- return running;
+ stateLock.readLock().lock();
+
+ try
+ {
+ return running;
+ }
+ finally
+ {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ public int getTimeToLive()
+ {
+ stateLock.readLock().lock();
+
+ try
+ {
+ return ( int ) timeToLiveMillis / 1000;
+ }
+ finally
+ {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ public void setTimeToLive( long timeToLive )
+ {
+ stateLock.writeLock().lock();
+
+ try
+ {
+ this.timeToLiveMillis = timeToLive * 1000;
+ }
+ finally
+ {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ public int getExpirationInterval()
+ {
+ stateLock.readLock().lock();
+
+ try
+ {
+ return ( int ) expirationIntervalMillis / 1000;
+ }
+ finally
+ {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ public void setExpirationInterval( long expirationInterval )
+ {
+ stateLock.writeLock().lock();
+
+ try
+ {
+ this.expirationIntervalMillis = expirationInterval * 1000;
+ }
+ finally
+ {
+ stateLock.writeLock().unlock();
+ }
}
}
}
Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java Tue Sep 19 19:28:07 2006
@@ -53,7 +53,7 @@
{
int port = AvailablePortFinder.getNextAvailable( 1024 );
DatagramAcceptorConfig config = new DatagramAcceptorConfig();
- ExpiringSessionRecycler recycler = new ExpiringSessionRecycler( 1000, 500 );
+ ExpiringSessionRecycler recycler = new ExpiringSessionRecycler( 1, 1 );
config.setSessionRecycler( recycler );
MockHandler acceptorHandler = new MockHandler();
@@ -66,7 +66,6 @@
ConnectFuture future = connector.connect(
new InetSocketAddress( "localhost", port ), connectorHandler, config );
future.join();
- Assert.assertTrue( future.isConnected() );
// Write whatever to trigger the acceptor.
future.getSession().write( ByteBuffer.allocate(1) ).join();
@@ -77,7 +76,7 @@
acceptorHandler.session.getCloseFuture().join( 3000 );
Assert.assertTrue( acceptorHandler.session.getCloseFuture().isClosed() );
- Thread.sleep( 500 );
+ Thread.sleep( 1000 );
Assert.assertEquals( "CROPSECL", connectorHandler.result );
Assert.assertEquals( "CROPRECL", acceptorHandler.result );