You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/06/12 15:11:30 UTC

svn commit: r190270 - in /directory/network/trunk/src: java/org/apache/mina/common/ java/org/apache/mina/filter/ java/org/apache/mina/registry/ test/org/apache/mina/common/

Author: trustin
Date: Sun Jun 12 06:11:30 2005
New Revision: 190270

URL: http://svn.apache.org/viewcvs?rev=190270&view=rev
Log:
Resolved: DIRMINA-40 Filter API needs callback for enabled notification

Modified:
    directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java
    directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java
    directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java
    directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
    directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java
    directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java

Modified: directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java Sun Jun 12 06:11:30 2005
@@ -51,6 +51,40 @@
  */
 public abstract class AbstractIoFilterChain implements IoFilterChain
 {
+    private static final Map filterRefCntMap = new IdentityHashMap();
+    
+    private static synchronized void increaseRefCnt( IoFilter filter )
+    {
+        Integer refCnt = ( Integer ) filterRefCntMap.get( filter );
+        if( refCnt == null ) // first registration
+        {
+            filter.init();
+            refCnt = new Integer( 1 );
+            filterRefCntMap.put( filter, refCnt );
+        }
+        else
+        {
+            refCnt = new Integer( refCnt.intValue() + 1 );
+            filterRefCntMap.put( filter, refCnt );
+        }
+        
+    }
+    
+    private static synchronized void decreaseRefCnt( IoFilter filter )
+    {
+        Integer refCnt = ( Integer ) filterRefCntMap.get( filter );
+        if( refCnt.intValue() == 1 ) // last remove
+        {
+            filter.destroy();
+            filterRefCntMap.remove( filter );
+        }
+        else
+        {
+            refCnt = new Integer( refCnt.intValue() - 1 );
+            filterRefCntMap.put( filter, refCnt );
+        }
+    }
+    
     private final Map name2entry = new HashMap();
 
     private final Map filter2entry = new IdentityHashMap();
@@ -73,6 +107,14 @@
     {
         return new IoFilter()
         {
+            public void init()
+            {
+            }
+            
+            public void destroy()
+            {
+            }
+
             public void sessionOpened( NextFilter nextFilter, IoSession session )
             {
                 nextFilter.sessionOpened( session );
@@ -132,6 +174,14 @@
     {
         return new IoFilter()
         {
+            public void init()
+            {
+            }
+            
+            public void destroy()
+            {
+            }
+           
             public void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception
             {
                 session.getHandler().sessionOpened( session );
@@ -254,6 +304,8 @@
     public synchronized void remove( String name )
     {
         Entry entry = checkOldName( name );
+        decreaseRefCnt( entry.filter );
+
         Entry prevEntry = entry.prevEntry;
         Entry nextEntry = entry.nextEntry;
         prevEntry.nextEntry = nextEntry;
@@ -279,6 +331,8 @@
 
     private void register( Entry prevEntry, String name, IoFilter filter )
     {
+        increaseRefCnt( filter );
+        
         Entry newEntry = new Entry( prevEntry, prevEntry.nextEntry, name, filter );
         prevEntry.nextEntry.prevEntry = newEntry;
         prevEntry.nextEntry = newEntry;
@@ -478,6 +532,12 @@
         return list;
     }
     
+    protected void finalize() throws Throwable
+    {
+        this.clear();
+        super.finalize();
+    }
+
     protected abstract void doWrite( IoSession session, WriteRequest writeRequest );
 
     private class Entry

Modified: directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java Sun Jun 12 06:11:30 2005
@@ -44,6 +44,22 @@
 public interface IoFilter
 {
     /**
+     * Invoked when this filter is added to {@link IoFilterChain}.
+     * This method is invoked only once even if you add a single filter to
+     * multiple filter chains because MINA maintains its reference count
+     * internally.
+     */
+    void init();
+    
+    /**
+     * Invoked when this filter is removed from {@link IoFilterChain}.
+     * This method is invoked only once even if you remove a single filter from
+     * multiple filter chains because MINA maintains its reference count
+     * internally.
+     */
+    void destroy();
+    
+    /**
      * Filters {@link IoHandler#sessionOpened(IoSession)} event.
      */
     void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception;

Modified: directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java Sun Jun 12 06:11:30 2005
@@ -29,6 +29,14 @@
  */
 public class IoFilterAdapter implements IoFilter
 {
+    public void init()
+    {
+    }
+    
+    public void destroy()
+    {
+    }
+
     public void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception
     {
         nextFilter.sessionOpened( session );

Modified: directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java Sun Jun 12 06:11:30 2005
@@ -48,6 +48,14 @@
     {
         return new IoFilter()
         {
+            public void init()
+            {
+            }
+            
+            public void destroy()
+            {
+            }
+
             public void sessionOpened( NextFilter nextFilter, IoSession session )
             {
                 ( ( IoSessionFilterChain ) session.getFilterChain() ).sessionOpened( session );

Modified: directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java Sun Jun 12 06:11:30 2005
@@ -75,6 +75,14 @@
         this.defaultLevel = defaultLevel;
     }
     
+    public void init()
+    {
+    }
+    
+    public void destroy()
+    {
+    }
+    
     public void sessionOpened( NextFilter nextFilter, IoSession session )
     {
         SessionLog.log( defaultLevel, session, "OPENED" );

Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Sun Jun 12 06:11:30 2005
@@ -18,13 +18,21 @@
  */
 package org.apache.mina.filter;
 
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.util.BaseThreadPool;
+import org.apache.mina.util.BlockingSet;
 import org.apache.mina.util.ByteBufferUtil;
-import org.apache.mina.util.EventType;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.Stack;
 import org.apache.mina.util.ThreadPool;
 
 /**
@@ -37,17 +45,468 @@
  * @see ThreadPool
  * @see BaseThreadPool
  */
-public class ThreadPoolFilter extends BaseThreadPool implements ThreadPool, IoFilter
+public class ThreadPoolFilter implements IoFilter
 {
+    /**
+     * Default maximum size of thread pool (2G).
+     */
+    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+    /**
+     * Default keep-alive time of thread pool (1 min).
+     */
+    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+    private static volatile int threadId = 0;
+
+    private final Map buffers = new IdentityHashMap();
+
+    private final Stack followers = new Stack();
+
+    private final BlockingSet readySessionBuffers = new BlockingSet();
+
+    private final Set busySessionBuffers = new HashSet();
+
+    private Worker leader;
+
+    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+
+    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+    private boolean started;
+
+    private boolean shuttingDown;
+
+    private int poolSize;
+
+    private final Object poolSizeLock = new Object();
 
     /**
      * Creates a new instanceof this filter with default thread pool settings.
-     * You'll have to invoke {@link #start()} method to start threads actually.
      */
     public ThreadPoolFilter()
     {
     }
 
+    public int getPoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            return poolSize;
+        }
+    }
+
+    public int getMaximumPoolSize()
+    {
+        return maximumPoolSize;
+    }
+
+    public int getKeepAliveTime()
+    {
+        return keepAliveTime;
+    }
+
+    public void setMaximumPoolSize( int maximumPoolSize )
+    {
+        if( maximumPoolSize <= 0 )
+            throw new IllegalArgumentException();
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public void setKeepAliveTime( int keepAliveTime )
+    {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public synchronized void init()
+    {
+        if( started )
+            return;
+
+        shuttingDown = false;
+
+        leader = new Worker();
+        leader.start();
+        leader.lead();
+
+        started = true;
+    }
+
+    public synchronized void destroy()
+    {
+        if( !started )
+            return;
+
+        shuttingDown = true;
+        Worker lastLeader = null;
+        for( ;; )
+        {
+            Worker leader = this.leader;
+            if( lastLeader == leader )
+                break;
+
+            while( leader.isAlive() )
+            {
+                leader.interrupt();
+                try
+                {
+                    leader.join();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+
+            lastLeader = leader;
+        }
+
+        started = false;
+    }
+
+    private void increasePoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize++;
+        }
+    }
+
+    private void decreasePoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize--;
+        }
+    }
+
+    protected void fireEvent( NextFilter nextFilter, IoSession session,
+                              EventType type, Object data )
+    {
+        final BlockingSet readySessionBuffers = this.readySessionBuffers;
+        final Set busySessionBuffers = this.busySessionBuffers;
+        final SessionBuffer buf = getSessionBuffer( session );
+        final Queue eventQueue = buf.eventQueue;
+        final Event event = new Event( type, nextFilter, data );
+
+        synchronized( buf )
+        {
+            eventQueue.push( event );
+        }
+
+        synchronized( readySessionBuffers )
+        {
+            if( !busySessionBuffers.contains( buf ) )
+            {
+                busySessionBuffers.add( buf );
+                readySessionBuffers.add( buf );
+            }
+        }
+    }
+
+    private SessionBuffer getSessionBuffer( IoSession session )
+    {
+        final Map buffers = this.buffers;
+        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+        if( buf == null )
+        {
+            synchronized( buffers )
+            {
+                buf = ( SessionBuffer ) buffers.get( session );
+                if( buf == null )
+                {
+                    buf = new SessionBuffer( session );
+                    buffers.put( session, buf );
+                }
+            }
+        }
+        return buf;
+    }
+
+    private void removeSessionBuffer( SessionBuffer buf )
+    {
+        final Map buffers = this.buffers;
+        final IoSession session = buf.session;
+        synchronized( buffers )
+        {
+            buffers.remove( session );
+        }
+    }
+
+    private static class SessionBuffer
+    {
+        private final IoSession session;
+
+        private final Queue eventQueue = new Queue();
+
+        private SessionBuffer( IoSession session )
+        {
+            this.session = session;
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        private final Object promotionLock = new Object();
+
+        private Worker()
+        {
+            super( "IoThreadPool-" + ( threadId++ ) );
+            increasePoolSize();
+        }
+
+        public void lead()
+        {
+            final Object promotionLock = this.promotionLock;
+            synchronized( promotionLock )
+            {
+                leader = this;
+                promotionLock.notify();
+            }
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                if( !waitForPromotion() )
+                    break;
+
+                SessionBuffer buf = fetchBuffer();
+                giveUpLead();
+
+                if( buf == null )
+                {
+                    break;
+                }
+
+                processEvents( buf );
+                follow();
+                releaseBuffer( buf );
+            }
+
+            decreasePoolSize();
+        }
+
+        private SessionBuffer fetchBuffer()
+        {
+            SessionBuffer buf = null;
+            BlockingSet readySessionBuffers = ThreadPoolFilter.this.readySessionBuffers;
+            synchronized( readySessionBuffers )
+            {
+                do
+                {
+                    buf = null;
+                    try
+                    {
+                        readySessionBuffers.waitForNewItem();
+                    }
+                    catch( InterruptedException e )
+                    {
+                        break;
+                    }
+
+                    Iterator it = readySessionBuffers.iterator();
+                    if( !it.hasNext() )
+                    {
+                        // exceeded keepAliveTime
+                        break;
+                    }
+
+                    do
+                    {
+                        buf = null;
+                        buf = ( SessionBuffer ) it.next();
+                        it.remove();
+                    }
+                    while( buf != null && buf.eventQueue.isEmpty()
+                           && it.hasNext() );
+                }
+                while( buf != null && buf.eventQueue.isEmpty() );
+            }
+
+            return buf;
+        }
+
+        private void processEvents( SessionBuffer buf )
+        {
+            final IoSession session = buf.session;
+            final Queue eventQueue = buf.eventQueue;
+            for( ;; )
+            {
+                Event event;
+                synchronized( buf )
+                {
+                    event = ( Event ) eventQueue.pop();
+                    if( event == null )
+                        break;
+                }
+                processEvent( event.getNextFilter(), session,
+                              event.getType(), event.getData() );
+            }
+        }
+
+        private void follow()
+        {
+            final Object promotionLock = this.promotionLock;
+            final Stack followers = ThreadPoolFilter.this.followers;
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    synchronized( followers )
+                    {
+                        followers.push( this );
+                    }
+                }
+            }
+        }
+
+        private void releaseBuffer( SessionBuffer buf )
+        {
+            final BlockingSet readySessionBuffers = ThreadPoolFilter.this.readySessionBuffers;
+            final Set busySessionBuffers = ThreadPoolFilter.this.busySessionBuffers;
+            final Queue eventQueue = buf.eventQueue;
+
+            synchronized( readySessionBuffers )
+            {
+                busySessionBuffers.remove( buf );
+                if( eventQueue.isEmpty() )
+                {
+                    removeSessionBuffer( buf );
+                }
+                else
+                {
+                    readySessionBuffers.add( buf );
+                }
+            }
+        }
+
+        private boolean waitForPromotion()
+        {
+            final Object promotionLock = this.promotionLock;
+
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    try
+                    {
+                        int keepAliveTime = getKeepAliveTime();
+                        if( keepAliveTime > 0 )
+                        {
+                            promotionLock.wait( keepAliveTime );
+                        }
+                        else
+                        {
+                            promotionLock.wait();
+                        }
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+                }
+
+                boolean timeToLead = this == leader;
+
+                if( !timeToLead )
+                {
+                    // time to die
+                    synchronized( followers )
+                    {
+                        followers.remove( this );
+                    }
+                }
+
+                return timeToLead;
+            }
+        }
+
+        private void giveUpLead()
+        {
+            final Stack followers = ThreadPoolFilter.this.followers;
+            Worker worker;
+            synchronized( followers )
+            {
+                worker = ( Worker ) followers.pop();
+            }
+
+            if( worker != null )
+            {
+                worker.lead();
+            }
+            else
+            {
+                if( !shuttingDown )
+                {
+                    synchronized( ThreadPoolFilter.this )
+                    {
+                        if( !shuttingDown
+                            && getPoolSize() < getMaximumPoolSize() )
+                        {
+                            worker = new Worker();
+                            worker.start();
+                            worker.lead();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static class EventType
+    {
+        public static final EventType OPENED = new EventType();
+
+        public static final EventType CLOSED = new EventType();
+
+        public static final EventType READ = new EventType();
+
+        public static final EventType WRITTEN = new EventType();
+
+        public static final EventType RECEIVED = new EventType();
+
+        public static final EventType SENT = new EventType();
+
+        public static final EventType IDLE = new EventType();
+
+        public static final EventType EXCEPTION = new EventType();
+
+        private EventType()
+        {
+        }
+    }
+    
+    private static class Event
+    {
+        private final EventType type;
+        private final NextFilter nextFilter;
+        private final Object data;
+        
+        public Event( EventType type, NextFilter nextFilter, Object data )
+        {
+            this.type = type;
+            this.nextFilter = nextFilter;
+            this.data = data;
+        }
+
+        public Object getData()
+        {
+            return data;
+        }
+        
+
+        public NextFilter getNextFilter()
+        {
+            return nextFilter;
+        }
+        
+
+        public EventType getType()
+        {
+            return type;
+        }
+    }
+    
     public void sessionOpened( NextFilter nextFilter,
                               IoSession session )
     {

Modified: directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java Sun Jun 12 06:11:30 2005
@@ -42,9 +42,6 @@
 
     public SimpleServiceRegistry()
     {
-        socketIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
-        datagramIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
-        vmPipeAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
     }
 
     public void bind( Service service, IoHandler ioHandler ) throws IOException
@@ -68,15 +65,6 @@
             // ignore
         }
         
-        try
-        {
-            acceptor.unbind( service.getAddress() );
-        }
-        catch( Exception e )
-        {
-            // ignore
-        }
-
         services.remove( service );
         stopThreadPools();
     }
@@ -178,7 +166,9 @@
         if( !services.isEmpty() )
             return;
 
-        threadPoolFilter.start();
+        socketIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
+        datagramIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
+        vmPipeAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
     }
 
     private void stopThreadPools()
@@ -186,6 +176,8 @@
         if( !services.isEmpty() )
             return;
 
-        threadPoolFilter.stop();
+        socketIoAcceptor.getFilterChain().remove( "threadPool" );
+        datagramIoAcceptor.getFilterChain().remove( "threadPool" );
+        vmPipeAcceptor.getFilterChain().remove( "threadPool" );
     }
 }

Modified: directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java Sun Jun 12 06:11:30 2005
@@ -55,8 +55,8 @@
     
     public void testChained()
     {
-        chain.addLast( "A", new TestFilter( 'A' ) );
-        chain.addLast( "B", new TestFilter( 'B' ) );
+        chain.addLast( "A", new EventOrderTestFilter( 'A' ) );
+        chain.addLast( "B", new EventOrderTestFilter( 'B' ) );
         run( "ASO BSO HSO" +
              "AMR BMR HMR" +
              "BFW AFW AMS BMS HMS" +
@@ -65,6 +65,30 @@
              "ASC BSC HSC" );
     }
     
+    public void testInitDestroy()
+    {
+        IoFilterChainImpl chain2 = new IoFilterChainImpl();
+        IoFilter filter = new InitDestroyTestFilter();
+
+        chain.addFirst( "A", filter );
+        assertEquals( "INIT", result );
+        
+        chain2.addFirst( "B", filter );
+        assertEquals( "INIT", result );
+
+        chain2.addFirst( "C", filter );
+        assertEquals( "INIT", result );
+
+        chain.remove( "A" );
+        assertEquals( "INIT", result );
+
+        chain2.remove( "B" );
+        assertEquals( "INIT", result );
+
+        chain2.remove( "C" );
+        assertEquals( formatResult( "INIT DESTROY" ), formatResult( result ) );
+    }
+    
     private void run( String expectedResult )
     {
         chain.sessionOpened( session );
@@ -169,15 +193,23 @@
         }
     }
 
-    private class TestFilter implements IoFilter
+    private class EventOrderTestFilter implements IoFilter
     {
         private final char id;
 
-        private TestFilter( char id )
+        private EventOrderTestFilter( char id )
         {
             this.id = id;
         }
         
+        public void init()
+        {
+        }
+        
+        public void destroy()
+        {
+        }
+        
         public void sessionOpened(NextFilter nextFilter, IoSession session) {
             result += id + "SO";
             nextFilter.sessionOpened( session );
@@ -211,6 +243,19 @@
         public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
             result += id + "MS";
             nextFilter.messageSent( session, message );
+        }
+    }
+
+    private class InitDestroyTestFilter extends IoFilterAdapter
+    {
+        public void init()
+        {
+            result += "INIT";
+        }
+        
+        public void destroy()
+        {
+            result += "DESTROY";
         }
     }