You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2006/09/20 04:28:08 UTC

svn commit: r448033 - in /directory/trunks/mina/core/src: main/java/org/apache/mina/common/ main/java/org/apache/mina/util/ test/java/org/apache/mina/transport/socket/nio/

Author: trustin
Date: Tue Sep 19 19:28:07 2006
New Revision: 448033

URL: http://svn.apache.org/viewvc?view=rev&rev=448033
Log:
Applied Greg's patch on DIRMINA-162 (datagram session management fails) with small modification:
* Changed the type of TTL and expiration interval to integer (seconds).
* ExpirationSessionRecycler doesn't implement ExpirationListener directly. I made an inner class instead.
* Removed the static getInstance() method in ExpiringSessionRecycler because we are already using a static global expirer by default. We need some documentation on creating many recyclers though. 

Modified:
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
    directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/ExpiringSessionRecycler.java Tue Sep 19 19:28:07 2006
@@ -29,36 +29,41 @@
 /**
  * An {@link IoSessionRecycler} with sessions that time out on inactivity.
  * 
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * TODO Document me.
  * 
- * TODO Change time unit to 'seconds'.
- * TODO Make thread-safe.
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
  */
-public class ExpiringSessionRecycler implements IoSessionRecycler, ExpirationListener
+public class ExpiringSessionRecycler implements IoSessionRecycler
 {
     private ExpiringMap sessionMap;
-
+    
+    private ExpiringMap.Expirer mapExpirer;
+    
     public ExpiringSessionRecycler()
     {
-        this( ExpiringMap.DEFAULT_EXPIRATION_TIME, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+        this( ExpiringMap.DEFAULT_TIME_TO_LIVE );
     }
-
-    public ExpiringSessionRecycler( long expirationTimeMillis )
+    
+    public ExpiringSessionRecycler( int timeToLive )
     {
-        this( expirationTimeMillis, ExpiringMap.DEFAULT_EXPIRER_DELAY );
+        this( timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL );
     }
-
-    public ExpiringSessionRecycler( long expirationTimeMillis, long expirerDelay )
+    
+    public ExpiringSessionRecycler( int timeToLive, int expirationInterval )
     {
-        // FIXME Use IdentityHashMap if possible.
-        sessionMap = new ExpiringMap( expirationTimeMillis, expirerDelay );
-        sessionMap.addExpirationListener( this );
+        sessionMap = new ExpiringMap( timeToLive, expirationInterval );
+        mapExpirer = sessionMap.getExpirer();
+        sessionMap.addExpirationListener( new DefaultExpirationListener() );
     }
 
     public void put( IoSession session )
     {
+        mapExpirer.startExpiringIfNotStarted();
+        
         Object key = generateKey( session );
-        if ( !sessionMap.containsKey( key ) )
+
+        if( !sessionMap.containsKey( key ) )
         {
             sessionMap.put( key, session );
         }
@@ -66,20 +71,37 @@
 
     public IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress )
     {
-        Object key = generateKey( localAddress, remoteAddress );
-        return ( IoSession ) sessionMap.get( key );
+        return ( IoSession ) sessionMap.get( generateKey( localAddress, remoteAddress ) );
     }
 
     public void remove( IoSession session )
     {
-        Object key = generateKey( session );
-        sessionMap.remove( key );
+        sessionMap.remove( generateKey( session ) );
+    }
+
+    public void stopExpiring()
+    {
+        mapExpirer.stopExpiring();
     }
 
-    public void expired( Object expiredObject )
+    public int getExpirationInterval()
     {
-        IoSession expiredSession = ( IoSession ) expiredObject;
-        expiredSession.close();
+        return sessionMap.getExpirationInterval();
+    }
+
+    public int getTimeToLive()
+    {
+        return sessionMap.getTimeToLive();
+    }
+
+    public void setExpirationInterval( int expirationInterval )
+    {
+        sessionMap.setExpirationInterval( expirationInterval );
+    }
+
+    public void setTimeToLive( int timeToLive )
+    {
+        sessionMap.setTimeToLive( timeToLive );
     }
 
     private Object generateKey( IoSession session )
@@ -93,5 +115,15 @@
         key.add( remoteAddress );
         key.add( localAddress );
         return key;
+    }
+    
+    private class DefaultExpirationListener implements ExpirationListener
+    {
+        public void expired( Object expiredObject )
+        {
+            IoSession expiredSession = ( IoSession ) expiredObject;
+            
+            expiredSession.close();
+        }
     }
 }

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java Tue Sep 19 19:28:07 2006
@@ -31,6 +31,23 @@
 public interface IoSessionRecycler
 {
     /**
+     * A dummy recycler that doesn't recycle any sessions.  Using this recycler will
+     * make all session lifecycle events to be fired for every I/O for all connectionless
+     * sessions.
+     */
+    static IoSessionRecycler NOOP = new IoSessionRecycler()
+    {
+        public void put( IoSession session ) {}
+        public IoSession recycle( SocketAddress localAddress, SocketAddress remoteAddress )
+        {
+            return null;
+        }
+        public void remove( IoSession session )
+        {
+        }
+    };
+
+    /**
      * Called when the underlying transport creates or writes a new {@link IoSession}.
      * 
      * @param session

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringMap.java Tue Sep 19 19:28:07 2006
@@ -20,122 +20,109 @@
 package org.apache.mina.util;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReadWriteLock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * A map with expiration.
  * 
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * TODO Change time unit to 'seconds'.
- * TODO Make thread-safe.
  */
 public class ExpiringMap implements Map
 {
-    public static final long DEFAULT_EXPIRATION_TIME = 60000;
+    public static final int DEFAULT_TIME_TO_LIVE = 60;
 
-    public static final long DEFAULT_EXPIRER_DELAY = 1000;
+    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
 
     private static volatile int expirerCount = 1;
 
-    private long expirationTimeMillis;
-
-    private long expirerDelay;
-
-    private HashMap delegate;
+    private final ConcurrentHashMap delegate;
 
-    private HashMap expirationInfos;
+    private final CopyOnWriteArrayList expirationListeners;
 
-    private List expirationListeners;
-
-    private Expirer expirer;
+    private final Expirer expirer;
 
     public ExpiringMap()
     {
-        this( new HashMap(), new HashMap(), new LinkedList(), DEFAULT_EXPIRATION_TIME, DEFAULT_EXPIRER_DELAY );
+        this( DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL );
     }
 
-    public ExpiringMap( long expirationTimeMillis )
+    public ExpiringMap( int timeToLive )
     {
-        this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, DEFAULT_EXPIRER_DELAY );
+        this( timeToLive, DEFAULT_EXPIRATION_INTERVAL );
     }
 
-    public ExpiringMap( long expirationTimeMillis, long expirerDelay )
+    public ExpiringMap( int timeToLive, int expirationInterval )
     {
-        this( new HashMap(), new HashMap(), new LinkedList(), expirationTimeMillis, expirerDelay );
+        this( new ConcurrentHashMap(), new CopyOnWriteArrayList(), timeToLive, expirationInterval );
     }
 
-    private ExpiringMap( HashMap delegate, HashMap accessTimes, List expirationListeners, long expirationTimeMillis,
-            long expirerDelay )
+    private ExpiringMap(
+            ConcurrentHashMap delegate, CopyOnWriteArrayList expirationListeners,
+            int timeToLive, int expirationInterval )
     {
         this.delegate = delegate;
-        this.expirationInfos = accessTimes;
-        this.expirationTimeMillis = expirationTimeMillis;
         this.expirationListeners = expirationListeners;
-        this.expirerDelay = expirerDelay;
 
         this.expirer = new Expirer();
-        this.expirer.start();
+        expirer.setTimeToLive( timeToLive );
+        expirer.setExpirationInterval( expirationInterval );
     }
 
-    /**
-     * @see java.util.Map#clear()
-     */
-    public void clear()
+    public Object put( Object key, Object value )
     {
-        expirationInfos.clear();
-        delegate.clear();
+        return delegate.put( key, new ExpiringObject( key, value, System.currentTimeMillis() ) );
+    }
+
+    public Object get( Object key )
+    {
+        Object object = delegate.get( key );
+
+        if( object != null )
+        {
+            ExpiringObject expObject = ( ExpiringObject ) object;
+            expObject.setLastAccessTime( System.currentTimeMillis() );
+
+            return expObject.getValue();
+        }
+
+        return object;
+    }
+
+    public Object remove( Object key )
+    {
+        return delegate.remove( key );
     }
 
-    /**
-     * @see java.util.Map#containsKey(java.lang.Object)
-     */
     public boolean containsKey( Object key )
     {
         return delegate.containsKey( key );
     }
 
-    /**
-     * @see java.util.Map#containsValue(java.lang.Object)
-     */
     public boolean containsValue( Object value )
     {
         return delegate.containsValue( value );
     }
 
-    /**
-     * @see java.util.Map#entrySet()
-     */
-    public Set entrySet()
+    public int size()
     {
-        return delegate.entrySet();
+        return delegate.size();
     }
 
-    /**
-     * @see java.util.Map#equals(java.util.Map)
-     */
-    public boolean equals( Object o )
+    public boolean isEmpty()
     {
-        return delegate.equals( o );
+        return delegate.isEmpty();
     }
 
-    /**
-     * @see java.util.Map#get(java.lang.Object)
-     */
-    public Object get( Object key )
+    public void clear()
     {
-        Object object = delegate.get( key );
-
-        if ( object != null )
-        {
-            updateAccessTime( object );
-        }
-
-        return object;
+        delegate.clear();
     }
 
     public int hashCode()
@@ -143,178 +130,180 @@
         return delegate.hashCode();
     }
 
-    public boolean isEmpty()
-    {
-        return delegate.isEmpty();
-    }
-
     public Set keySet()
     {
         return delegate.keySet();
     }
 
-    public Object put( Object key, Object value )
+    public boolean equals( Object obj )
     {
-        if ( value != null )
-        {
-            addAccessTime( key, value );
-        }
-
-        return delegate.put( key, value );
+        return delegate.equals( obj );
     }
 
-    public void putAll( Map map )
+    public void putAll( Map inMap )
     {
-        Iterator mapKeyIterator = map.keySet().iterator();
-
-        while ( mapKeyIterator.hasNext() )
+        synchronized( inMap )
         {
-            Object key = mapKeyIterator.next();
-            Object value = map.get( key );
+            Iterator inMapKeysIt = inMap.keySet().iterator();
 
-            if ( value != null )
+            while( inMapKeysIt.hasNext() )
             {
-                addAccessTime( key, value );
+                Object key = inMapKeysIt.next();
+                Object value = inMap.get( key );
+
+                if( value instanceof ExpiringObject )
+                {
+                    delegate.put( key, value );
+                }
             }
         }
-
-        delegate.putAll( map );
     }
 
-    public Object remove( Object key )
+    public Collection values()
     {
-        Object object = delegate.remove( key );
-
-        if ( object != null )
-        {
-            expirationInfos.remove( object );
-        }
+        return delegate.values();
+    }
 
-        return object;
+    public Set entrySet()
+    {
+        return delegate.entrySet();
     }
 
-    public int size()
+    public void addExpirationListener( ExpirationListener listener )
     {
-        return delegate.size();
+        expirationListeners.add( listener );
     }
 
-    public Collection values()
+    public void removeExpirationListener( ExpirationListener listener )
     {
-        return delegate.values();
+        expirationListeners.remove( listener );
+    }
+    
+    public Expirer getExpirer()
+    {
+        return expirer;
     }
 
-    private void addAccessTime( Object key, Object value )
+    public int getExpirationInterval()
     {
-        ExpirationInfo info = new ExpirationInfo();
-        info.key = key;
-        info.accesstime = System.currentTimeMillis();
+        return expirer.getExpirationInterval();
+    }
 
-        expirationInfos.put( value, info );
+    public int getTimeToLive()
+    {
+        return expirer.getTimeToLive();
     }
 
-    public void updateAccessTime( Object object )
+    public void setExpirationInterval( int expirationInterval )
     {
-        Object infoObject = expirationInfos.get( object );
+        expirer.setExpirationInterval( expirationInterval );
+    }
 
-        if ( infoObject != null )
-        {
-            ExpirationInfo info = ( ExpirationInfo ) infoObject;
-            info.accesstime = System.currentTimeMillis();
-        }
+    public void setTimeToLive( int timeToLive )
+    {
+        expirer.setTimeToLive( timeToLive );
     }
 
-    private ExpirationInfo getExpirationInfo( Object object )
+    private class ExpiringObject
     {
-        Object infoObject = expirationInfos.get( object );
+        private Object key;
+
+        private Object value;
+
+        private long lastAccessTime;
 
-        if ( infoObject != null )
+        private ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
+
+        public ExpiringObject( Object key, Object value, long lastAccessTime )
         {
-            ExpirationInfo info = ( ExpirationInfo ) infoObject;
+            if( value == null )
+            {
+                throw new IllegalArgumentException( "An expiring object cannot be null." );
+            }
 
-            return info;
+            this.key = key;
+            this.value = value;
+            this.lastAccessTime = lastAccessTime;
         }
 
-        return null;
-    }
+        public long getLastAccessTime()
+        {
+            lastAccessTimeLock.readLock().lock();
 
-    public void addExpirationListener( ExpirationListener listener )
-    {
-        synchronized ( expirationListeners )
+            try
+            {
+                return lastAccessTime;
+            }
+            finally
+            {
+                lastAccessTimeLock.readLock().unlock();
+            }
+        }
+
+        public void setLastAccessTime( long lastAccessTime )
         {
-            expirationListeners.add( listener );
+            lastAccessTimeLock.writeLock().lock();
+
+            try
+            {
+                this.lastAccessTime = lastAccessTime;
+            }
+            finally
+            {
+                lastAccessTimeLock.writeLock().unlock();
+            }
         }
-    }
 
-    public void removeExpirationListener( ExpirationListener listener )
-    {
-        synchronized ( expirationListeners )
+        public Object getKey()
         {
-            expirationListeners.remove( listener );
+            return key;
         }
-    }
 
-    public Object[] findMappedObjects()
-    {
-        Object results[] = null;
-        synchronized ( delegate )
+        public Object getValue()
         {
-            results = new Object[delegate.size()];
-            results = delegate.values().toArray( results );
+            return value;
         }
-        return ( results );
-    }
 
-    public void startExpirer()
-    {
-        synchronized ( expirer )
+        public boolean equals( Object obj )
         {
-            if ( !expirer.isRunning() )
-            {
-                expirer.setRunning( true );
-                expirer.interrupt();
-            }
+            return value.equals( obj );
         }
-    }
 
-    public void stopExpirer()
-    {
-        synchronized ( expirer )
+        public int hashCode()
         {
-            if ( expirer.isRunning() )
-            {
-                expirer.setRunning( false );
-                expirer.interrupt();
-            }
+            return value.hashCode();
         }
     }
 
-    private class ExpirationInfo
+    public class Expirer implements Runnable
     {
-        public Object key;
+        private ReadWriteLock stateLock = new ReentrantReadWriteLock();
 
-        public long accesstime;
-    }
+        private long timeToLiveMillis;
 
-    private class Expirer extends Thread
-    {
-        private boolean running = true;
+        private long expirationIntervalMillis;
+
+        private boolean running = false;
+
+        private final Thread expirerThread;
 
         public Expirer()
         {
-            super( "MapExpirer-" + expirerCount++ );
+            expirerThread = new Thread( this, "ExpiringMapExpirer-" + ( expirerCount++ ) );
+            expirerThread.setDaemon( true );
         }
 
         public void run()
         {
-            while ( running )
+            while( running )
             {
                 processExpires();
 
                 try
                 {
-                    Thread.sleep( expirerDelay );
+                    Thread.sleep( expirationIntervalMillis );
                 }
-                catch ( InterruptedException e )
+                catch( InterruptedException e )
                 {
                 }
             }
@@ -323,49 +312,162 @@
         private void processExpires()
         {
             long timeNow = System.currentTimeMillis();
-            Object mappedObjects[] = findMappedObjects();
 
-            for ( int i = 0; i < mappedObjects.length; i++ )
+            Iterator expiringObjectsIterator = delegate.values().iterator();
+
+            while( expiringObjectsIterator.hasNext() )
             {
-                Object mappedObject = mappedObjects[i];
+                ExpiringObject expObject = ( ExpiringObject ) expiringObjectsIterator.next();
 
-                ExpirationInfo info = getExpirationInfo( mappedObject );
+                if( timeToLiveMillis <= 0 )
+                    continue;
 
-                if ( info != null )
+                long timeIdle = timeNow - expObject.getLastAccessTime();
+
+                if( timeIdle >= timeToLiveMillis )
                 {
-                    if ( expirationTimeMillis < 0 )
-                        continue;
-                    long timeIdle = timeNow - info.accesstime;
+                    delegate.remove( expObject.getKey() );
+
+                    Iterator listenerIterator = expirationListeners.iterator();
 
-                    if ( timeIdle >= expirationTimeMillis )
+                    while( listenerIterator.hasNext() )
                     {
-                        delegate.remove( info.key );
-                        expirationInfos.remove( info.key );
+                        ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
 
-                        synchronized ( expirationListeners )
-                        {
-                            Iterator listenerIterator = expirationListeners.iterator();
-
-                            while ( listenerIterator.hasNext() )
-                            {
-                                ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
-
-                                listener.expired( mappedObject );
-                            }
-                        }
+                        listener.expired( expObject.getValue() );
                     }
                 }
             }
         }
 
-        public void setRunning( boolean running )
+        public void startExpiring()
+        {
+            stateLock.writeLock().lock();
+
+            try
+            {
+                if( !running )
+                {
+                    running = true;
+                    expirerThread.start();
+                }
+            }
+            finally
+            {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public void startExpiringIfNotStarted()
+        {
+            stateLock.readLock().lock();
+            if( running )
+            {
+                stateLock.readLock().unlock();
+                return;
+            }
+            
+            stateLock.writeLock().lock();
+            try
+            {
+                if( !running )
+                {
+                    running = true;
+                    expirerThread.start();
+                }
+            }
+            finally
+            {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public void stopExpiring()
         {
-            this.running = running;
+            stateLock.writeLock().lock();
+
+            try
+            {
+                if( running )
+                {
+                    running = false;
+                    expirerThread.interrupt();
+                }
+            }
+            finally
+            {
+                stateLock.writeLock().unlock();
+            }
         }
 
         public boolean isRunning()
         {
-            return running;
+            stateLock.readLock().lock();
+
+            try
+            {
+                return running;
+            }
+            finally
+            {
+                stateLock.readLock().unlock();
+            }
+        }
+
+        public int getTimeToLive()
+        {
+            stateLock.readLock().lock();
+
+            try
+            {
+                return ( int ) timeToLiveMillis / 1000;
+            }
+            finally
+            {
+                stateLock.readLock().unlock();
+            }
+        }
+
+        public void setTimeToLive( long timeToLive )
+        {
+            stateLock.writeLock().lock();
+
+            try
+            {
+                this.timeToLiveMillis = timeToLive * 1000;
+            }
+            finally
+            {
+                stateLock.writeLock().unlock();
+            }
+        }
+
+        public int getExpirationInterval()
+        {
+            stateLock.readLock().lock();
+
+            try
+            {
+                return ( int ) expirationIntervalMillis / 1000;
+            }
+            finally
+            {
+                stateLock.readLock().unlock();
+            }
+        }
+
+        public void setExpirationInterval( long expirationInterval )
+        {
+            stateLock.writeLock().lock();
+
+            try
+            {
+                this.expirationIntervalMillis = expirationInterval * 1000;
+            }
+            finally
+            {
+                stateLock.writeLock().unlock();
+            }
         }
     }
 }

Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?view=diff&rev=448033&r1=448032&r2=448033
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java Tue Sep 19 19:28:07 2006
@@ -53,7 +53,7 @@
     {
         int port = AvailablePortFinder.getNextAvailable( 1024 );
         DatagramAcceptorConfig config = new DatagramAcceptorConfig();
-        ExpiringSessionRecycler recycler = new ExpiringSessionRecycler( 1000, 500 );
+        ExpiringSessionRecycler recycler = new ExpiringSessionRecycler( 1, 1 );
         config.setSessionRecycler( recycler );
         
         MockHandler acceptorHandler = new MockHandler();
@@ -66,7 +66,6 @@
             ConnectFuture future = connector.connect(
                     new InetSocketAddress( "localhost", port ), connectorHandler, config );
             future.join();
-            Assert.assertTrue( future.isConnected() );
             
             // Write whatever to trigger the acceptor.
             future.getSession().write( ByteBuffer.allocate(1) ).join();
@@ -77,7 +76,7 @@
             acceptorHandler.session.getCloseFuture().join( 3000 );
             Assert.assertTrue( acceptorHandler.session.getCloseFuture().isClosed() );
             
-            Thread.sleep( 500 );
+            Thread.sleep( 1000 );
 
             Assert.assertEquals( "CROPSECL", connectorHandler.result );
             Assert.assertEquals( "CROPRECL", acceptorHandler.result );