You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/12/01 06:19:07 UTC

svn commit: r350169 [8/16] - in /directory/network: branches/chain_refactor/src/java/org/apache/mina/common/ trunk/src/examples/org/apache/mina/examples/echoserver/ trunk/src/examples/org/apache/mina/examples/httpserver/ trunk/src/examples/org/apache/m...

Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Wed Nov 30 21:17:41 2005
@@ -1,708 +1,700 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.filter;
-
-import java.util.ArrayList;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.util.BlockingQueue;
-import org.apache.mina.util.ByteBufferUtil;
-import org.apache.mina.util.IdentityHashSet;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.Stack;
-
-/**
- * A Thread-pooling filter.  This filter forwards {@link IoHandler} events
- * to its thread pool.
- * <p>
- * This is an implementation of
- * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
- * thread pool</a> by Douglas C. Schmidt et al.
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class ThreadPoolFilter implements IoFilter
-{
-    /**
-     * Default maximum size of thread pool (2G).
-     */
-    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
-
-    /**
-     * Default keep-alive time of thread pool (1 min).
-     */
-    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
-
-    /**
-     * A queue which contains {@link Integer}s which represents reusable
-     * thread IDs.  {@link Worker} first checks this queue and then
-     * uses {@link #threadId} when no reusable thread ID is available.
-     */
-    private static final Queue threadIdReuseQueue = new Queue();
-    private static int threadId = 0;
-    
-    private static int acquireThreadId()
-    {
-        synchronized( threadIdReuseQueue )
-        {
-            Integer id = ( Integer ) threadIdReuseQueue.pop();
-            if( id == null )
-            {
-                return ++ threadId;
-            }
-            else
-            {
-                return id.intValue();
-            }
-        }
-    }
-    
-    private static void releaseThreadId( int id )
-    {
-        synchronized( threadIdReuseQueue )
-        {
-            threadIdReuseQueue.push( new Integer( id ) );
-        }
-    }
-
-    private final String threadNamePrefix;
-    private final Map parents = new IdentityHashMap(); 
-    private final Map buffers = new IdentityHashMap();
-    private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
-    private final Set allSessionBuffers = new IdentityHashSet();
-
-    private Worker leader;
-    private final Stack followers = new Stack();
-    private final Set allWorkers = new IdentityHashSet();
-
-    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
-    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
-
-    private boolean shuttingDown;
-
-    private int poolSize;
-    private final Object poolSizeLock = new Object();
-
-    /**
-     * Creates a new instance of this filter with default thread pool settings.
-     */
-    public ThreadPoolFilter()
-    {
-        this( "IoThreadPool" );
-    }
-    
-    /**
-     * Creates a new instance of this filter with the specified thread name prefix
-     * and other default settings.
-     * @param threadNamePrefix the prefix of the thread names this pool will create.
-     */
-    public ThreadPoolFilter( String threadNamePrefix )
-    {
-        if( threadNamePrefix == null )
-        {
-            throw new NullPointerException( "threadNamePrefix" );
-        }
-        threadNamePrefix = threadNamePrefix.trim();
-        if( threadNamePrefix.length() == 0 )
-        {
-            throw new IllegalArgumentException( "threadNamePrefix is empty." );
-        }
-        this.threadNamePrefix = threadNamePrefix;
-    }
-    
-    public String getThreadNamePrefix()
-    {
-        return threadNamePrefix;
-    }
-
-    public int getPoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            return poolSize;
-        }
-    }
-
-    public int getMaximumPoolSize()
-    {
-        return maximumPoolSize;
-    }
-
-    public int getKeepAliveTime()
-    {
-        return keepAliveTime;
-    }
-
-    public void setMaximumPoolSize( int maximumPoolSize )
-    {
-        if( maximumPoolSize <= 0 )
-            throw new IllegalArgumentException();
-        this.maximumPoolSize = maximumPoolSize;
-    }
-
-    public void setKeepAliveTime( int keepAliveTime )
-    {
-        this.keepAliveTime = keepAliveTime;
-    }
-
-    private void increasePoolSize( Worker worker )
-    {
-        synchronized( poolSizeLock )
-        {
-            poolSize++;
-            allWorkers.add( worker );
-        }
-    }
-
-    private void decreasePoolSize( Worker worker )
-    {
-        synchronized( poolSizeLock )
-        {
-            poolSize--;
-            allWorkers.remove( worker );
-        }
-    }
-
-    private void fireEvent( NextFilter nextFilter, IoSession session,
-                              EventType type, Object data )
-    {
-        final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
-        final Set allSessionBuffers = this.allSessionBuffers;
-        final Event event = new Event( type, nextFilter, data );
-
-        synchronized( unfetchedSessionBuffers )
-        {
-            final SessionBuffer buf = getSessionBuffer( session );
-            final Queue eventQueue = buf.eventQueue;
-
-            synchronized( buf )
-            {
-                eventQueue.push( event );
-            }
-
-            if( !allSessionBuffers.contains( buf ) )
-            {
-                allSessionBuffers.add( buf );
-                unfetchedSessionBuffers.push( buf );
-            }
-        }
-    }
-    
-    /**
-     * Implement this method to fetch (or pop) a {@link SessionBuffer} from
-     * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
-     * simply pops the buffer from it.  You could prioritize the fetch order.
-     * 
-     * @return A non-null {@link SessionBuffer}
-     */
-    protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
-    {
-        return ( SessionBuffer ) unfetchedSessionBuffers.pop();
-    }
-
-    private SessionBuffer getSessionBuffer( IoSession session )
-    {
-        final Map buffers = this.buffers;
-        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
-        if( buf == null )
-        {
-            synchronized( buffers )
-            {
-                buf = ( SessionBuffer ) buffers.get( session );
-                if( buf == null )
-                {
-                    buf = new SessionBuffer( session );
-                    buffers.put( session, buf );
-                }
-            }
-        }
-        return buf;
-    }
-
-    private void removeSessionBuffer( SessionBuffer buf )
-    {
-        final Map buffers = this.buffers;
-        final IoSession session = buf.session;
-        synchronized( buffers )
-        {
-            buffers.remove( session );
-        }
-    }
-
-    protected static class SessionBuffer
-    {
-        private final IoSession session;
-
-        private final Queue eventQueue = new Queue();
-
-        private SessionBuffer( IoSession session )
-        {
-            this.session = session;
-        }
-        
-        public IoSession getSession()
-        {
-            return session;
-        }
-        
-        public Queue getEventQueue()
-        {
-            return eventQueue;
-        }
-    }
-
-    private class Worker extends Thread
-    {
-        private final int id;
-        private final Object promotionLock = new Object();
-        private boolean dead;
-
-        private Worker()
-        {
-            int id = acquireThreadId();
-            this.id = id;
-            this.setName( threadNamePrefix + '-' + id );
-            increasePoolSize( this );
-        }
-
-        public boolean lead()
-        {
-            final Object promotionLock = this.promotionLock;
-            synchronized( promotionLock )
-            {
-                if( dead )
-                {
-                    return false;
-                }
-
-                leader = this;
-                promotionLock.notify();
-            }
-            
-            return true;
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                if( !waitForPromotion() )
-                    break;
-
-                SessionBuffer buf = fetchBuffer();
-                giveUpLead();
-                if( buf == null )
-                {
-                    break;
-                }
-
-                processEvents( buf );
-                follow();
-                releaseBuffer( buf );
-            }
-
-            decreasePoolSize( this );
-            releaseThreadId( id );
-        }
-
-        private SessionBuffer fetchBuffer()
-        {
-            BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
-            synchronized( unfetchedSessionBuffers )
-            {
-                while( !shuttingDown )
-                {
-                    try
-                    {
-                        unfetchedSessionBuffers.waitForNewItem();
-                    }
-                    catch( InterruptedException e )
-                    {
-                        continue;
-                    }
-
-                    return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers );
-                }
-            }
-
-            return null;
-        }
-
-        private void processEvents( SessionBuffer buf )
-        {
-            final IoSession session = buf.session;
-            final Queue eventQueue = buf.eventQueue;
-            for( ;; )
-            {
-                Event event;
-                synchronized( buf )
-                {
-                    event = ( Event ) eventQueue.pop();
-                    if( event == null )
-                        break;
-                }
-                processEvent( event.getNextFilter(), session,
-                              event.getType(), event.getData() );
-            }
-        }
-
-        private void follow()
-        {
-            final Object promotionLock = this.promotionLock;
-            final Stack followers = ThreadPoolFilter.this.followers;
-            synchronized( promotionLock )
-            {
-                if( this != leader )
-                {
-                    synchronized( followers )
-                    {
-                        followers.push( this );
-                    }
-                }
-            }
-        }
-
-        private void releaseBuffer( SessionBuffer buf )
-        {
-            final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
-            final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
-            final Queue eventQueue = buf.eventQueue;
-
-            synchronized( unfetchedSessionBuffers )
-            {
-                if( eventQueue.isEmpty() )
-                {
-                    allSessionBuffers.remove( buf );
-                    removeSessionBuffer( buf );
-                }
-                else
-                {
-                    unfetchedSessionBuffers.push( buf );
-                }
-            }
-        }
-
-        private boolean waitForPromotion()
-        {
-            final Object promotionLock = this.promotionLock;
-
-            long startTime = System.currentTimeMillis();
-            long currentTime = System.currentTimeMillis();
-            
-            synchronized( promotionLock )
-            {
-                while( this != leader && !shuttingDown )
-                {
-                    // Calculate remaining keep-alive time
-                    int keepAliveTime = getKeepAliveTime();
-                    if( keepAliveTime > 0 )
-                    {
-                        keepAliveTime -= ( currentTime - startTime );
-                    }
-                    else
-                    {
-                        keepAliveTime = Integer.MAX_VALUE;
-                    }
-                    
-                    // Break the loop if there's no remaining keep-alive time.
-                    if( keepAliveTime <= 0 )
-                    {
-                        break;
-                    }
-
-                    // Wait for promotion
-                    try
-                    {
-                        promotionLock.wait( keepAliveTime );
-                    }
-                    catch( InterruptedException e )
-                    {
-                    }
-
-                    // Update currentTime for the next iteration
-                    currentTime = System.currentTimeMillis();
-                }
-
-                boolean timeToLead = this == leader && !shuttingDown;
-
-                if( !timeToLead )
-                {
-                    // time to die
-                    synchronized( followers )
-                    {
-                        followers.remove( this );
-                    }
-
-                    // Mark as dead explicitly when we've got promotionLock.
-                    dead = true;
-                }
-
-                return timeToLead;
-            }
-        }
-
-        private void giveUpLead()
-        {
-            final Stack followers = ThreadPoolFilter.this.followers;
-            Worker worker;
-            do
-            {
-                synchronized( followers )
-                {
-                    worker = ( Worker ) followers.pop();
-                }
-
-                if( worker == null )
-                {
-                    // Increase the number of threads if we
-                    // are not shutting down and we can increase the number.
-                    if( !shuttingDown
-                        && getPoolSize() < getMaximumPoolSize() )
-                    {
-                        worker = new Worker();
-                        worker.lead();
-                        worker.start();
-                    }
-
-                    // This loop should end because:
-                    // 1) lead() is called already,
-                    // 2) or it is shutting down and there's no more threads left.
-                    break;
-                }
-            }
-            while( !worker.lead() );
-        }
-    }
-
-    protected static class EventType
-    {
-        public static final EventType OPENED = new EventType( "OPENED" );
-
-        public static final EventType CLOSED = new EventType( "CLOSED" );
-
-        public static final EventType READ = new EventType( "READ" );
-
-        public static final EventType WRITTEN = new EventType( "WRITTEN" );
-
-        public static final EventType RECEIVED = new EventType( "RECEIVED" );
-
-        public static final EventType SENT = new EventType( "SENT" );
-
-        public static final EventType IDLE = new EventType( "IDLE" );
-
-        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
-
-        private final String value;
-        
-        private EventType( String value )
-        {
-            this.value = value;
-        }
-        
-        public String toString()
-        {
-            return value;
-        }
-    }
-    
-    protected static class Event
-    {
-        private final EventType type;
-        private final NextFilter nextFilter;
-        private final Object data;
-        
-        public Event( EventType type, NextFilter nextFilter, Object data )
-        {
-            this.type = type;
-            this.nextFilter = nextFilter;
-            this.data = data;
-        }
-
-        public Object getData()
-        {
-            return data;
-        }
-        
-
-        public NextFilter getNextFilter()
-        {
-            return nextFilter;
-        }
-        
-
-        public EventType getType()
-        {
-            return type;
-        }
-    }
-    
-    public void sessionCreated( NextFilter nextFilter, IoSession session )
-    {
-        nextFilter.sessionCreated( session );
-    }
-    
-    public void sessionOpened( NextFilter nextFilter,
-                              IoSession session )
-    {
-        fireEvent( nextFilter, session, EventType.OPENED, null );
-    }
-
-    public void sessionClosed( NextFilter nextFilter,
-                              IoSession session )
-    {
-        fireEvent( nextFilter, session, EventType.CLOSED, null );
-    }
-
-    public void sessionIdle( NextFilter nextFilter,
-                            IoSession session, IdleStatus status )
-    {
-        fireEvent( nextFilter, session, EventType.IDLE, status );
-    }
-
-    public void exceptionCaught( NextFilter nextFilter,
-                                IoSession session, Throwable cause )
-    {
-        fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
-    }
-
-    public void messageReceived( NextFilter nextFilter,
-                                 IoSession session, Object message )
-    {
-        ByteBufferUtil.acquireIfPossible( message );
-        fireEvent( nextFilter, session, EventType.RECEIVED, message );
-    }
-
-    public void messageSent( NextFilter nextFilter,
-                             IoSession session, Object message )
-    {
-        ByteBufferUtil.acquireIfPossible( message );
-        fireEvent( nextFilter, session, EventType.SENT, message );
-    }
-
-    protected void processEvent( NextFilter nextFilter,
-                                 IoSession session, EventType type,
-                                 Object data )
-    {
-        if( type == EventType.RECEIVED )
-        {
-            nextFilter.messageReceived( session, data );
-            ByteBufferUtil.releaseIfPossible( data );
-        }
-        else if( type == EventType.SENT )
-        {
-            nextFilter.messageSent( session, data );
-            ByteBufferUtil.releaseIfPossible( data );
-        }
-        else if( type == EventType.EXCEPTION )
-        {
-            nextFilter.exceptionCaught( session, ( Throwable ) data );
-        }
-        else if( type == EventType.IDLE )
-        {
-            nextFilter.sessionIdle( session, ( IdleStatus ) data );
-        }
-        else if( type == EventType.OPENED )
-        {
-            nextFilter.sessionOpened( session );
-        }
-        else if( type == EventType.CLOSED )
-        {
-            nextFilter.sessionClosed( session );
-        }
-    }
-
-    public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
-    {
-        nextFilter.filterWrite( session, writeRequest );
-    }
-
-    public void filterClose( NextFilter nextFilter, IoSession session, CloseFuture closeFuture ) throws Exception
-    {
-        nextFilter.filterClose( session, closeFuture );
-    }
-
-    public synchronized void init( IoFilterChain parent, NextFilter nextFilter )
-    {
-        if( parents.size() > 0 )
-        {
-            return;
-        }
-
-        parents.put( parent, Boolean.TRUE );
-
-        shuttingDown = false;
-        leader = new Worker();
-        leader.start();
-        leader.lead();
-    }
-
-    public synchronized void destroy( IoFilterChain parent, NextFilter nextFilter )
-    {
-        parents.remove( parent );
-        if( parents.size() > 0 )
-        {
-            return;
-        }
-
-        shuttingDown = true;
-        while( getPoolSize() != 0 )
-        {
-            List allWorkers;
-            synchronized( poolSizeLock )
-            {
-                allWorkers = new ArrayList( this.allWorkers );
-            }
-            
-            for( Iterator i = allWorkers.iterator(); i.hasNext(); )
-            {
-                Worker worker = ( Worker ) i.next();
-                while( worker.isAlive() )
-                {
-                    worker.interrupt();
-                    try
-                    {
-                        // This timeout will help us from 
-                        // infinite lock-up and interrupt workers again.
-                        worker.join( 100 );
-                    }
-                    catch( InterruptedException e )
-                    {
-                    }
-                }
-            }
-        }
-        
-        this.allSessionBuffers.clear();
-        this.unfetchedSessionBuffers.clear();
-        this.buffers.clear();
-        this.followers.clear();
-        this.leader = null;
-    }
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.BlockingQueue;
+import org.apache.mina.util.ByteBufferUtil;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.Stack;
+
+/**
+ * A Thread-pooling filter.  This filter forwards {@link IoHandler} events
+ * to its thread pool.
+ * <p>
+ * This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ThreadPoolFilter extends IoFilterAdapter
+{
+    /**
+     * Default maximum size of thread pool (2G).
+     */
+    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+    /**
+     * Default keep-alive time of thread pool (1 min).
+     */
+    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+    /**
+     * A queue which contains {@link Integer}s which represents reusable
+     * thread IDs.  {@link Worker} first checks this queue and then
+     * uses {@link #threadId} when no reusable thread ID is available.
+     */
+    private static final Queue threadIdReuseQueue = new Queue();
+    private static int threadId = 0;
+    
+    private static int acquireThreadId()
+    {
+        synchronized( threadIdReuseQueue )
+        {
+            Integer id = ( Integer ) threadIdReuseQueue.pop();
+            if( id == null )
+            {
+                return ++ threadId;
+            }
+            else
+            {
+                return id.intValue();
+            }
+        }
+    }
+    
+    private static void releaseThreadId( int id )
+    {
+        synchronized( threadIdReuseQueue )
+        {
+            threadIdReuseQueue.push( new Integer( id ) );
+        }
+    }
+
+    private final String threadNamePrefix;
+    private final Map buffers = new IdentityHashMap();
+    private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
+    private final Set allSessionBuffers = new IdentityHashSet();
+
+    private Worker leader;
+    private final Stack followers = new Stack();
+    private final Set allWorkers = new IdentityHashSet();
+
+    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+    private boolean shuttingDown;
+
+    private int poolSize;
+    private final Object poolSizeLock = new Object();
+
+    /**
+     * Creates a new instance of this filter with default thread pool settings.
+     */
+    public ThreadPoolFilter()
+    {
+        this( "IoThreadPool" );
+    }
+    
+    /**
+     * Creates a new instance of this filter with the specified thread name prefix
+     * and other default settings.
+     * @param threadNamePrefix the prefix of the thread names this pool will create.
+     */
+    public ThreadPoolFilter( String threadNamePrefix )
+    {
+        if( threadNamePrefix == null )
+        {
+            throw new NullPointerException( "threadNamePrefix" );
+        }
+        threadNamePrefix = threadNamePrefix.trim();
+        if( threadNamePrefix.length() == 0 )
+        {
+            throw new IllegalArgumentException( "threadNamePrefix is empty." );
+        }
+        this.threadNamePrefix = threadNamePrefix;
+    }
+    
+    public String getThreadNamePrefix()
+    {
+        return threadNamePrefix;
+    }
+
+    public int getPoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            return poolSize;
+        }
+    }
+
+    public int getMaximumPoolSize()
+    {
+        return maximumPoolSize;
+    }
+
+    public int getKeepAliveTime()
+    {
+        return keepAliveTime;
+    }
+
+    public void setMaximumPoolSize( int maximumPoolSize )
+    {
+        if( maximumPoolSize <= 0 )
+            throw new IllegalArgumentException();
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public void setKeepAliveTime( int keepAliveTime )
+    {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public void init()
+    {
+        shuttingDown = false;
+        leader = new Worker();
+        leader.start();
+        leader.lead();
+    }
+
+    public void destroy()
+    {
+        shuttingDown = true;
+        int expectedPoolSize = 0;
+        while( getPoolSize() != expectedPoolSize )
+        {
+            List allWorkers;
+            synchronized( poolSizeLock )
+            {
+                allWorkers = new ArrayList( this.allWorkers );
+            }
+            
+            // You may not interrupt the current thread.
+            if( allWorkers.remove( Thread.currentThread() ) )
+            {
+                expectedPoolSize = 1;
+            }
+            
+            for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+            {
+                Worker worker = ( Worker ) i.next();
+                while( worker.isAlive() )
+                {
+                    worker.interrupt();
+                    try
+                    {
+                        // This timeout will help us from 
+                        // infinite lock-up and interrupt workers again.
+                        worker.join( 100 );
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+                }
+            }
+        }
+        
+        this.allSessionBuffers.clear();
+        this.unfetchedSessionBuffers.clear();
+        this.buffers.clear();
+        this.followers.clear();
+        this.leader = null;
+    }
+
+    private void increasePoolSize( Worker worker )
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize++;
+            allWorkers.add( worker );
+        }
+    }
+
+    private void decreasePoolSize( Worker worker )
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize--;
+            allWorkers.remove( worker );
+        }
+    }
+
+    private void fireEvent( NextFilter nextFilter, IoSession session,
+                              EventType type, Object data )
+    {
+        final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
+        final Set allSessionBuffers = this.allSessionBuffers;
+        final Event event = new Event( type, nextFilter, data );
+
+        synchronized( unfetchedSessionBuffers )
+        {
+            final SessionBuffer buf = getSessionBuffer( session );
+            final Queue eventQueue = buf.eventQueue;
+
+            synchronized( buf )
+            {
+                eventQueue.push( event );
+            }
+
+            if( !allSessionBuffers.contains( buf ) )
+            {
+                allSessionBuffers.add( buf );
+                unfetchedSessionBuffers.push( buf );
+            }
+        }
+    }
+    
+    /**
+     * Implement this method to fetch (or pop) a {@link SessionBuffer} from
+     * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
+     * simply pops the buffer from it.  You could prioritize the fetch order.
+     * 
+     * @return A non-null {@link SessionBuffer}
+     */
+    protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
+    {
+        return ( SessionBuffer ) unfetchedSessionBuffers.pop();
+    }
+
+    private SessionBuffer getSessionBuffer( IoSession session )
+    {
+        final Map buffers = this.buffers;
+        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+        if( buf == null )
+        {
+            synchronized( buffers )
+            {
+                buf = ( SessionBuffer ) buffers.get( session );
+                if( buf == null )
+                {
+                    buf = new SessionBuffer( session );
+                    buffers.put( session, buf );
+                }
+            }
+        }
+        return buf;
+    }
+
+    private void removeSessionBuffer( SessionBuffer buf )
+    {
+        final Map buffers = this.buffers;
+        final IoSession session = buf.session;
+        synchronized( buffers )
+        {
+            buffers.remove( session );
+        }
+    }
+
+    protected static class SessionBuffer
+    {
+        private final IoSession session;
+
+        private final Queue eventQueue = new Queue();
+
+        private SessionBuffer( IoSession session )
+        {
+            this.session = session;
+        }
+        
+        public IoSession getSession()
+        {
+            return session;
+        }
+        
+        public Queue getEventQueue()
+        {
+            return eventQueue;
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        private final int id;
+        private final Object promotionLock = new Object();
+        private boolean dead;
+
+        private Worker()
+        {
+            int id = acquireThreadId();
+            this.id = id;
+            this.setName( threadNamePrefix + '-' + id );
+            increasePoolSize( this );
+        }
+
+        public boolean lead()
+        {
+            final Object promotionLock = this.promotionLock;
+            synchronized( promotionLock )
+            {
+                if( dead )
+                {
+                    return false;
+                }
+
+                leader = this;
+                promotionLock.notify();
+            }
+            
+            return true;
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                if( !waitForPromotion() )
+                    break;
+
+                SessionBuffer buf = fetchBuffer();
+                giveUpLead();
+                if( buf == null )
+                {
+                    break;
+                }
+
+                processEvents( buf );
+                follow();
+                releaseBuffer( buf );
+            }
+
+            decreasePoolSize( this );
+            releaseThreadId( id );
+        }
+
+        private SessionBuffer fetchBuffer()
+        {
+            BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+            synchronized( unfetchedSessionBuffers )
+            {
+                while( !shuttingDown )
+                {
+                    try
+                    {
+                        unfetchedSessionBuffers.waitForNewItem();
+                    }
+                    catch( InterruptedException e )
+                    {
+                        continue;
+                    }
+
+                    return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers );
+                }
+            }
+
+            return null;
+        }
+
+        private void processEvents( SessionBuffer buf )
+        {
+            final IoSession session = buf.session;
+            final Queue eventQueue = buf.eventQueue;
+            for( ;; )
+            {
+                Event event;
+                synchronized( buf )
+                {
+                    event = ( Event ) eventQueue.pop();
+                    if( event == null )
+                        break;
+                }
+                processEvent( event.getNextFilter(), session,
+                              event.getType(), event.getData() );
+            }
+        }
+
+        private void follow()
+        {
+            final Object promotionLock = this.promotionLock;
+            final Stack followers = ThreadPoolFilter.this.followers;
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    synchronized( followers )
+                    {
+                        followers.push( this );
+                    }
+                }
+            }
+        }
+
+        private void releaseBuffer( SessionBuffer buf )
+        {
+            final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+            final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
+            final Queue eventQueue = buf.eventQueue;
+
+            synchronized( unfetchedSessionBuffers )
+            {
+                if( eventQueue.isEmpty() )
+                {
+                    allSessionBuffers.remove( buf );
+                    removeSessionBuffer( buf );
+                }
+                else
+                {
+                    unfetchedSessionBuffers.push( buf );
+                }
+            }
+        }
+
+        private boolean waitForPromotion()
+        {
+            final Object promotionLock = this.promotionLock;
+
+            long startTime = System.currentTimeMillis();
+            long currentTime = System.currentTimeMillis();
+            
+            synchronized( promotionLock )
+            {
+                while( this != leader && !shuttingDown )
+                {
+                    // Calculate remaining keep-alive time
+                    int keepAliveTime = getKeepAliveTime();
+                    if( keepAliveTime > 0 )
+                    {
+                        keepAliveTime -= ( currentTime - startTime );
+                    }
+                    else
+                    {
+                        keepAliveTime = Integer.MAX_VALUE;
+                    }
+                    
+                    // Break the loop if there's no remaining keep-alive time.
+                    if( keepAliveTime <= 0 )
+                    {
+                        break;
+                    }
+
+                    // Wait for promotion
+                    try
+                    {
+                        promotionLock.wait( keepAliveTime );
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+
+                    // Update currentTime for the next iteration
+                    currentTime = System.currentTimeMillis();
+                }
+
+                boolean timeToLead = this == leader && !shuttingDown;
+
+                if( !timeToLead )
+                {
+                    // time to die
+                    synchronized( followers )
+                    {
+                        followers.remove( this );
+                    }
+
+                    // Mark as dead explicitly when we've got promotionLock.
+                    dead = true;
+                }
+
+                return timeToLead;
+            }
+        }
+
+        private void giveUpLead()
+        {
+            final Stack followers = ThreadPoolFilter.this.followers;
+            Worker worker;
+            do
+            {
+                synchronized( followers )
+                {
+                    worker = ( Worker ) followers.pop();
+                }
+
+                if( worker == null )
+                {
+                    // Increase the number of threads if we
+                    // are not shutting down and we can increase the number.
+                    if( !shuttingDown
+                        && getPoolSize() < getMaximumPoolSize() )
+                    {
+                        worker = new Worker();
+                        worker.lead();
+                        worker.start();
+                    }
+
+                    // This loop should end because:
+                    // 1) lead() is called already,
+                    // 2) or it is shutting down and there's no more threads left.
+                    break;
+                }
+            }
+            while( !worker.lead() );
+        }
+    }
+
+    protected static class EventType
+    {
+        public static final EventType OPENED = new EventType( "OPENED" );
+
+        public static final EventType CLOSED = new EventType( "CLOSED" );
+
+        public static final EventType READ = new EventType( "READ" );
+
+        public static final EventType WRITTEN = new EventType( "WRITTEN" );
+
+        public static final EventType RECEIVED = new EventType( "RECEIVED" );
+
+        public static final EventType SENT = new EventType( "SENT" );
+
+        public static final EventType IDLE = new EventType( "IDLE" );
+
+        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+        private final String value;
+        
+        private EventType( String value )
+        {
+            this.value = value;
+        }
+        
+        public String toString()
+        {
+            return value;
+        }
+    }
+    
+    protected static class Event
+    {
+        private final EventType type;
+        private final NextFilter nextFilter;
+        private final Object data;
+        
+        public Event( EventType type, NextFilter nextFilter, Object data )
+        {
+            this.type = type;
+            this.nextFilter = nextFilter;
+            this.data = data;
+        }
+
+        public Object getData()
+        {
+            return data;
+        }
+        
+
+        public NextFilter getNextFilter()
+        {
+            return nextFilter;
+        }
+        
+
+        public EventType getType()
+        {
+            return type;
+        }
+    }
+    
+    public void sessionCreated( NextFilter nextFilter, IoSession session )
+    {
+        nextFilter.sessionCreated( session );
+    }
+    
+    public void sessionOpened( NextFilter nextFilter,
+                              IoSession session )
+    {
+        fireEvent( nextFilter, session, EventType.OPENED, null );
+    }
+
+    public void sessionClosed( NextFilter nextFilter,
+                              IoSession session )
+    {
+        fireEvent( nextFilter, session, EventType.CLOSED, null );
+    }
+
+    public void sessionIdle( NextFilter nextFilter,
+                            IoSession session, IdleStatus status )
+    {
+        fireEvent( nextFilter, session, EventType.IDLE, status );
+    }
+
+    public void exceptionCaught( NextFilter nextFilter,
+                                IoSession session, Throwable cause )
+    {
+        fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
+    }
+
+    public void messageReceived( NextFilter nextFilter,
+                                 IoSession session, Object message )
+    {
+        ByteBufferUtil.acquireIfPossible( message );
+        fireEvent( nextFilter, session, EventType.RECEIVED, message );
+    }
+
+    public void messageSent( NextFilter nextFilter,
+                             IoSession session, Object message )
+    {
+        ByteBufferUtil.acquireIfPossible( message );
+        fireEvent( nextFilter, session, EventType.SENT, message );
+    }
+
+    protected void processEvent( NextFilter nextFilter,
+                                 IoSession session, EventType type,
+                                 Object data )
+    {
+        if( type == EventType.RECEIVED )
+        {
+            nextFilter.messageReceived( session, data );
+            ByteBufferUtil.releaseIfPossible( data );
+        }
+        else if( type == EventType.SENT )
+        {
+            nextFilter.messageSent( session, data );
+            ByteBufferUtil.releaseIfPossible( data );
+        }
+        else if( type == EventType.EXCEPTION )
+        {
+            nextFilter.exceptionCaught( session, ( Throwable ) data );
+        }
+        else if( type == EventType.IDLE )
+        {
+            nextFilter.sessionIdle( session, ( IdleStatus ) data );
+        }
+        else if( type == EventType.OPENED )
+        {
+            nextFilter.sessionOpened( session );
+        }
+        else if( type == EventType.CLOSED )
+        {
+            nextFilter.sessionClosed( session );
+        }
+    }
+
+    public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
+    {
+        nextFilter.filterWrite( session, writeRequest );
+    }
+
+    public void filterClose( NextFilter nextFilter, IoSession session, CloseFuture closeFuture ) throws Exception
+    {
+        nextFilter.filterClose( session, closeFuture );
+    }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java Wed Nov 30 21:17:41 2005
@@ -1,171 +1,171 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-
-/**
- * A {@link ProtocolDecoder} that cumulates the content of received
- * buffers to a <em>cumulative buffer</em> to help users implement decoders.
- * <p>
- * If the received {@link ByteBuffer} is only a part of a message.
- * decoders should cumulate received buffers to make a message complete or
- * to postpone decoding until more buffers arrive.
- * <p>
- * Here is an example decoder that decodes a list of integers:
- * <pre>
- * public class IntegerDecoder extends CumulativeProtocolDecoder {
- * 
- *     public IntegerDecoder() {
- *         super(4);
- *     }
- * 
- *     protected boolean doDecode(IoSession session, ByteBuffer in,
- *                                ProtocolDecoderOutput out) throws ProtocolViolationException {
- *         if (in.remaining() < 4) {
- *             return false; // Cumulate remainder to decode later.
- *         }
- *         
- *         out.write(new Integer(in.getInt()));
- * 
- *         // Decoded one integer; CumulativeProtocolDecoder will call me again,
- *         // so I can decode as many integers as possible.
- *         return true;
- *     }
- * }
- * </pre>
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
-    
-    private static final String BUFFER = CumulativeProtocolDecoder.class.getName() + ".Buffer";
-    
-    private final int initialCapacity;
-    
-    /**
-     * Creates a new instance with the 16 bytes initial capacity of
-     * cumulative buffer.  Please note that the capacity increases
-     * automatically.
-     */
-    protected CumulativeProtocolDecoder()
-    {
-        this( 16 );
-    }
-    
-    /**
-     * Creates a new instance with the specified initial capacity of
-     * cumulative buffer.  Please note that the capacity increases
-     * automatically.
-     */
-    protected CumulativeProtocolDecoder( int initialCapacity )
-    {
-        if( initialCapacity < 0 )
-        {
-            throw new IllegalArgumentException( "initialCapacity: " + initialCapacity );
-        }
-        
-        this.initialCapacity = initialCapacity;
-    }
-    
-    /**
-     * Cumulates content of <tt>in</tt> into internal buffer and forwards
-     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
-     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
-     * and the cumulative buffer is compacted after decoding ends.
-     * 
-     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
-     *                               <tt>true</tt> not consuming the cumulative buffer.
-     */
-    public void decode( IoSession session, ByteBuffer in,
-                        ProtocolDecoderOutput out ) throws Exception
-    {
-        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
-        if( buf == null )
-        {
-            buf = ByteBuffer.allocate( initialCapacity );
-            buf.setAutoExpand( true );
-            session.setAttribute( BUFFER, buf );
-        }
-        
-        buf.put( in );
-        buf.flip();
-
-        try
-        {
-            for( ;; )
-            {
-                int oldPos = buf.position();
-                boolean decoded = doDecode( session, buf, out );
-                if( decoded )
-                {
-                    if( buf.position() == oldPos )
-                    {
-                        throw new IllegalStateException(
-                                "doDecode() can't return true when buffer is not consumed." );
-                    }
-                    
-                    if( !buf.hasRemaining() )
-                    {
-                        break;
-                    }
-                }
-                else
-                {
-                    break;
-                }
-            }
-        }
-        finally
-        {
-            buf.compact();
-        }
-    }
-    
-    /**
-     * Implement this method to consume the specified cumulative buffer and
-     * decode its content into message(s). 
-     *  
-     * @param in the cumulative buffer
-     * @return <tt>true</tt> if and only if there's more to decode in the buffer
-     *         and you want to have <tt>doDecode</tt> method invoked again.
-     *         Return <tt>false</tt> if remaining data is not enough to decode,
-     *         then this method will be invoked again when more data is cumulated.
-     * @throws Exception if cannot decode <tt>in</tt>.
-     */
-    protected abstract boolean doDecode( IoSession session, ByteBuffer in,
-                                         ProtocolDecoderOutput out ) throws Exception;
-
-    /**
-     * Releases the cumulative buffer used by the specified <tt>session</tt>.
-     * Please don't forget to call <tt>super.dispose( session )</tt> when
-     * you override this method.
-     */
-    public void dispose( IoSession session ) throws Exception
-    {
-        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
-        if( buf != null )
-        {
-            buf.release();
-            session.removeAttribute( BUFFER );
-        }
-    }
-}
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes a list of integers:
+ * <pre>
+ * public class IntegerDecoder extends CumulativeProtocolDecoder {
+ * 
+ *     public IntegerDecoder() {
+ *         super(4);
+ *     }
+ * 
+ *     protected boolean doDecode(IoSession session, ByteBuffer in,
+ *                                ProtocolDecoderOutput out) throws ProtocolViolationException {
+ *         if (in.remaining() < 4) {
+ *             return false; // Cumulate remainder to decode later.
+ *         }
+ *         
+ *         out.write(new Integer(in.getInt()));
+ * 
+ *         // Decoded one integer; CumulativeProtocolDecoder will call me again,
+ *         // so I can decode as many integers as possible.
+ *         return true;
+ *     }
+ * }
+ * </pre>
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+    
+    private static final String BUFFER = CumulativeProtocolDecoder.class.getName() + ".Buffer";
+    
+    private final int initialCapacity;
+    
+    /**
+     * Creates a new instance with the 16 bytes initial capacity of
+     * cumulative buffer.  Please note that the capacity increases
+     * automatically.
+     */
+    protected CumulativeProtocolDecoder()
+    {
+        this( 16 );
+    }
+    
+    /**
+     * Creates a new instance with the specified initial capacity of
+     * cumulative buffer.  Please note that the capacity increases
+     * automatically.
+     */
+    protected CumulativeProtocolDecoder( int initialCapacity )
+    {
+        if( initialCapacity < 0 )
+        {
+            throw new IllegalArgumentException( "initialCapacity: " + initialCapacity );
+        }
+        
+        this.initialCapacity = initialCapacity;
+    }
+    
+    /**
+     * Cumulates content of <tt>in</tt> into internal buffer and forwards
+     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+     * and the cumulative buffer is compacted after decoding ends.
+     * 
+     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+     *                               <tt>true</tt> not consuming the cumulative buffer.
+     */
+    public void decode( IoSession session, ByteBuffer in,
+                        ProtocolDecoderOutput out ) throws Exception
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        if( buf == null )
+        {
+            buf = ByteBuffer.allocate( initialCapacity );
+            buf.setAutoExpand( true );
+            session.setAttribute( BUFFER, buf );
+        }
+        
+        buf.put( in );
+        buf.flip();
+
+        try
+        {
+            for( ;; )
+            {
+                int oldPos = buf.position();
+                boolean decoded = doDecode( session, buf, out );
+                if( decoded )
+                {
+                    if( buf.position() == oldPos )
+                    {
+                        throw new IllegalStateException(
+                                "doDecode() can't return true when buffer is not consumed." );
+                    }
+                    
+                    if( !buf.hasRemaining() )
+                    {
+                        break;
+                    }
+                }
+                else
+                {
+                    break;
+                }
+            }
+        }
+        finally
+        {
+            buf.compact();
+        }
+    }
+    
+    /**
+     * Implement this method to consume the specified cumulative buffer and
+     * decode its content into message(s). 
+     *  
+     * @param in the cumulative buffer
+     * @return <tt>true</tt> if and only if there's more to decode in the buffer
+     *         and you want to have <tt>doDecode</tt> method invoked again.
+     *         Return <tt>false</tt> if remaining data is not enough to decode,
+     *         then this method will be invoked again when more data is cumulated.
+     * @throws Exception if cannot decode <tt>in</tt>.
+     */
+    protected abstract boolean doDecode( IoSession session, ByteBuffer in,
+                                         ProtocolDecoderOutput out ) throws Exception;
+
+    /**
+     * Releases the cumulative buffer used by the specified <tt>session</tt>.
+     * Please don't forget to call <tt>super.dispose( session )</tt> when
+     * you override this method.
+     */
+    public void dispose( IoSession session ) throws Exception
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        if( buf != null )
+        {
+            buf.release();
+            session.removeAttribute( BUFFER );
+        }
+    }
+}

Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java Wed Nov 30 21:17:41 2005
@@ -1,64 +1,64 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-/**
- * An exception that is thrown when {@link ProtocolEncoder} or
- * {@link ProtocolDecoder} cannot understand or failed to validate
- * data to process.
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class ProtocolCodecException extends Exception
-{
-    private static final long serialVersionUID = 5939878548186330695L;
-
-    /**
-     * Constructs a new instance.
-     */
-    public ProtocolCodecException()
-    {
-    }
-
-    /**
-     * Constructs a new instance with the specified message.
-     */
-    public ProtocolCodecException( String message )
-    {
-        super( message );
-    }
-
-    /**
-     * Constructs a new instance with the specified cause.
-     */
-    public ProtocolCodecException( Throwable cause )
-    {
-        super( cause );
-    }
-
-    /**
-     * Constructs a new instance with the specified message and the specified
-     * cause.
-     */
-    public ProtocolCodecException( String message, Throwable cause )
-    {
-        super( message, cause );
-    }
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+/**
+ * An exception that is thrown when {@link ProtocolEncoder} or
+ * {@link ProtocolDecoder} cannot understand or failed to validate
+ * data to process.
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ProtocolCodecException extends Exception
+{
+    private static final long serialVersionUID = 5939878548186330695L;
+
+    /**
+     * Constructs a new instance.
+     */
+    public ProtocolCodecException()
+    {
+    }
+
+    /**
+     * Constructs a new instance with the specified message.
+     */
+    public ProtocolCodecException( String message )
+    {
+        super( message );
+    }
+
+    /**
+     * Constructs a new instance with the specified cause.
+     */
+    public ProtocolCodecException( Throwable cause )
+    {
+        super( cause );
+    }
+
+    /**
+     * Constructs a new instance with the specified message and the specified
+     * cause.
+     */
+    public ProtocolCodecException( String message, Throwable cause )
+    {
+        super( message, cause );
+    }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java Wed Nov 30 21:17:41 2005
@@ -1,45 +1,45 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-/**
- * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
- * binary or protocol specific data into message object and vice versa.
- * <p>
- * Please refer to
- * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
- * example.
- *  
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public interface ProtocolCodecFactory
-{
-    /**
-     * Returns a new (or reusable) instance of {@link ProtocolEncoder} which
-     * encodes message objects into binary or protocol-specific data.
-     */
-    ProtocolEncoder getEncoder();
-
-    /**
-     * Returns a new (or reusable) instance of {@link ProtocolDecoder} which
-     * decodes binary or protocol-specific data into message objects.
-     */
-    ProtocolDecoder getDecoder();
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+/**
+ * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
+ * binary or protocol specific data into message object and vice versa.
+ * <p>
+ * Please refer to
+ * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
+ * example.
+ *  
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ProtocolCodecFactory
+{
+    /**
+     * Returns a new (or reusable) instance of {@link ProtocolEncoder} which
+     * encodes message objects into binary or protocol-specific data.
+     */
+    ProtocolEncoder getEncoder();
+
+    /**
+     * Returns a new (or reusable) instance of {@link ProtocolDecoder} which
+     * decodes binary or protocol-specific data into message objects.
+     */
+    ProtocolDecoder getDecoder();
 }

Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Wed Nov 30 21:17:41 2005
@@ -1,289 +1,298 @@
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ByteBufferProxy;
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
-import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.SessionLog;
-
-public class ProtocolCodecFilter extends IoFilterAdapter
-{
-    public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder";
-    public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder";
-    public static final String ENCODER_OUT = ProtocolCodecFilter.class.getName() + ".encoderOutput";
-    public static final String DECODER_OUT = ProtocolCodecFilter.class.getName() + ".decoderOutput";
-    
-    private final ProtocolCodecFactory factory;
-    
-    public ProtocolCodecFilter( ProtocolCodecFactory factory )
-    {
-        if( factory == null )
-        {
-            throw new NullPointerException( "factory" );
-        }
-        this.factory = factory;
-    }
-    
-    public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
-    {
-        if( !( message instanceof ByteBuffer ) )
-        {
-            nextFilter.messageReceived( session, message );
-            return;
-        }
-
-        ByteBuffer in = ( ByteBuffer ) message;
-        ProtocolDecoder decoder = getDecoder( session );
-        SimpleProtocolDecoderOutput decoderOut = getDecoderOut( session );
-        
-        try
-        {
-            synchronized( decoder )
-            {
-                decoder.decode( session, in, decoderOut );
-            }
-        }
-        catch( Throwable t )
-        {
-            ProtocolDecoderException pde;
-            if( t instanceof ProtocolDecoderException )
-            {
-                pde = ( ProtocolDecoderException ) t;
-            }
-            else
-            {
-                pde = new ProtocolDecoderException( t );
-            }
-            pde.setHexdump( in.getHexDump() );
-            throw pde;
-        }
-        finally
-        {
-            // Dispose the decoder if this session is connectionless.
-            if( session.getTransportType().isConnectionless() )
-            {
-                disposeDecoder( session );
-            }
-
-            // Release the read buffer.
-            in.release();
-
-            Queue queue = decoderOut.getMessageQueue();
-            synchronized( queue )
-            {
-                if( !queue.isEmpty() )
-                {
-                    do
-                    {
-                        nextFilter.messageReceived( session, queue.pop() );
-                    }
-                    while( !queue.isEmpty() );
-                }
-            }
-        }
-    }
-
-    public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
-    {
-        if( ! ( message instanceof MessageByteBuffer ) )
-        {
-            nextFilter.messageSent( session, message );
-            return;
-        }
-
-        MessageByteBuffer buf = ( MessageByteBuffer ) message;
-        try
-        {
-            buf.release();
-        }
-        finally
-        {
-            nextFilter.messageSent( session, buf.message );
-        }
-    }
-    
-    public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
-    {
-        Object message = writeRequest.getMessage();
-        if( message instanceof ByteBuffer )
-        {
-            nextFilter.filterWrite( session, writeRequest );
-            return;
-        }
-
-        ProtocolEncoder encoder = getEncoder( session );
-        ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session );
-        encoderOut.nextFilter = nextFilter;
-        
-        try
-        {
-            encoder.encode( session, message, encoderOut );
-        }
-        catch( Throwable t )
-        {
-            ProtocolEncoderException pee;
-            if( t instanceof ProtocolEncoderException )
-            {
-                pee = ( ProtocolEncoderException ) t;
-            }
-            else
-            {
-                pee = new ProtocolEncoderException( t );
-            }
-            throw pee;
-        }
-        finally
-        {
-            // Dispose the encoder if this session is connectionless.
-            if( session.getTransportType().isConnectionless() )
-            {
-                disposeEncoder( session );
-            }
-        }
-
-        encoderOut.writeRequest = writeRequest;
-        encoderOut.flush();
-    }
-    
-    public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception
-    {
-        disposeEncoder( session );
-        disposeDecoder( session );
-        nextFilter.sessionClosed( session );
-    }
-
-    private ProtocolEncoder getEncoder( IoSession session )
-    {
-        ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
-        if( encoder == null )
-        {
-            encoder = factory.getEncoder();
-            session.setAttribute( ENCODER, encoder );
-        }
-        return encoder;
-    }
-    
-    private ProtocolEncoderOutputImpl getEncoderOut( IoSession session )
-    {
-        ProtocolEncoderOutputImpl out = ( ProtocolEncoderOutputImpl ) session.getAttribute( ENCODER_OUT );
-        if( out == null )
-        {
-            out = new ProtocolEncoderOutputImpl( session );
-            session.setAttribute( ENCODER_OUT, out );
-        }
-        return out;
-    }
-    
-    private ProtocolDecoder getDecoder( IoSession session )
-    {
-        ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
-        if( decoder == null )
-        {
-            decoder = factory.getDecoder();
-            session.setAttribute( DECODER, decoder );
-        }
-        return decoder;
-    }
-    
-    private SimpleProtocolDecoderOutput getDecoderOut( IoSession session )
-    {
-        SimpleProtocolDecoderOutput out = ( SimpleProtocolDecoderOutput ) session.getAttribute( DECODER_OUT );
-        if( out == null )
-        {
-            out = new SimpleProtocolDecoderOutput();
-            session.setAttribute( DECODER_OUT, out );
-        }
-        return out;
-    }
-    
-    private void disposeEncoder( IoSession session )
-    {
-        session.removeAttribute( ENCODER_OUT );
-        ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
-        if( encoder == null )
-        {
-            return;
-        }
-
-        try
-        {
-            encoder.dispose( session );
-        }
-        catch( Throwable t )
-        {
-            SessionLog.warn(
-                    session,
-                    "Failed to dispose: " + encoder.getClass().getName() +
-                    " (" + encoder + ')' );
-        }
-    }
-
-    private void disposeDecoder( IoSession session )
-    {
-        session.removeAttribute( DECODER_OUT );
-        ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
-        if( decoder == null )
-        {
-            return;
-        }
-
-        try
-        {
-            decoder.dispose( session );
-        }
-        catch( Throwable t )
-        {
-            SessionLog.warn(
-                    session,
-                    "Falied to dispose: " + decoder.getClass().getName() +
-                    " (" + decoder + ')' );
-        }
-    }
-
-    private static class MessageByteBuffer extends ByteBufferProxy
-    {
-        private final Object message;
-        
-        private MessageByteBuffer( ByteBuffer buf, Object message )
-        {
-            super( buf );
-            this.message = message;
-        }
-    }
-    
-    private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput
-    {
-        private final IoSession session;
-        private NextFilter nextFilter;
-        private WriteRequest writeRequest;
-        
-        public ProtocolEncoderOutputImpl( IoSession session )
-        {
-            this.session = session;
-        }
-
-        protected WriteFuture doFlush( ByteBuffer buf )
-        {
-            WriteFuture future;
-            if( writeRequest != null )
-            {
-                future = writeRequest.getFuture();
-                nextFilter.filterWrite(
-                        session,
-                        new WriteRequest(
-                                new MessageByteBuffer(
-                                        buf, writeRequest.getMessage() ), future ) );
-            }
-            else
-            {
-                future = new WriteFuture();
-                nextFilter.filterWrite( session, new WriteRequest( buf, future ) );
-            }
-            return future;
-        }
-    }
-}
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ByteBufferProxy;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.SessionLog;
+
+public class ProtocolCodecFilter extends IoFilterAdapter
+{
+    public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder";
+    public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder";
+    public static final String ENCODER_OUT = ProtocolCodecFilter.class.getName() + ".encoderOutput";
+    public static final String DECODER_OUT = ProtocolCodecFilter.class.getName() + ".decoderOutput";
+    
+    private final ProtocolCodecFactory factory;
+    
+    public ProtocolCodecFilter( ProtocolCodecFactory factory )
+    {
+        if( factory == null )
+        {
+            throw new NullPointerException( "factory" );
+        }
+        this.factory = factory;
+    }
+    
+    public void onPostAdd( IoFilterChain parent, String name, NextFilter nextFilter ) throws Exception
+    {
+        if( parent.contains( ProtocolCodecFilter.class ) )
+        {
+            throw new IllegalStateException( "A filter chain cannot contain more than one ProtocolCodecFilter." );
+        }
+    }
+
+    public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    {
+        if( !( message instanceof ByteBuffer ) )
+        {
+            nextFilter.messageReceived( session, message );
+            return;
+        }
+
+        ByteBuffer in = ( ByteBuffer ) message;
+        ProtocolDecoder decoder = getDecoder( session );
+        SimpleProtocolDecoderOutput decoderOut = getDecoderOut( session );
+        
+        try
+        {
+            synchronized( decoder )
+            {
+                decoder.decode( session, in, decoderOut );
+            }
+        }
+        catch( Throwable t )
+        {
+            ProtocolDecoderException pde;
+            if( t instanceof ProtocolDecoderException )
+            {
+                pde = ( ProtocolDecoderException ) t;
+            }
+            else
+            {
+                pde = new ProtocolDecoderException( t );
+            }
+            pde.setHexdump( in.getHexDump() );
+            throw pde;
+        }
+        finally
+        {
+            // Dispose the decoder if this session is connectionless.
+            if( session.getTransportType().isConnectionless() )
+            {
+                disposeDecoder( session );
+            }
+
+            // Release the read buffer.
+            in.release();
+
+            Queue queue = decoderOut.getMessageQueue();
+            synchronized( queue )
+            {
+                if( !queue.isEmpty() )
+                {
+                    do
+                    {
+                        nextFilter.messageReceived( session, queue.pop() );
+                    }
+                    while( !queue.isEmpty() );
+                }
+            }
+        }
+    }
+
+    public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    {
+        if( ! ( message instanceof MessageByteBuffer ) )
+        {
+            nextFilter.messageSent( session, message );
+            return;
+        }
+
+        MessageByteBuffer buf = ( MessageByteBuffer ) message;
+        try
+        {
+            buf.release();
+        }
+        finally
+        {
+            nextFilter.messageSent( session, buf.message );
+        }
+    }
+    
+    public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
+    {
+        Object message = writeRequest.getMessage();
+        if( message instanceof ByteBuffer )
+        {
+            nextFilter.filterWrite( session, writeRequest );
+            return;
+        }
+
+        ProtocolEncoder encoder = getEncoder( session );
+        ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session );
+        encoderOut.nextFilter = nextFilter;
+        
+        try
+        {
+            encoder.encode( session, message, encoderOut );
+        }
+        catch( Throwable t )
+        {
+            ProtocolEncoderException pee;
+            if( t instanceof ProtocolEncoderException )
+            {
+                pee = ( ProtocolEncoderException ) t;
+            }
+            else
+            {
+                pee = new ProtocolEncoderException( t );
+            }
+            throw pee;
+        }
+        finally
+        {
+            // Dispose the encoder if this session is connectionless.
+            if( session.getTransportType().isConnectionless() )
+            {
+                disposeEncoder( session );
+            }
+        }
+
+        encoderOut.writeRequest = writeRequest;
+        encoderOut.flush();
+    }
+    
+    public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception
+    {
+        disposeEncoder( session );
+        disposeDecoder( session );
+        nextFilter.sessionClosed( session );
+    }
+
+    private ProtocolEncoder getEncoder( IoSession session )
+    {
+        ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+        if( encoder == null )
+        {
+            encoder = factory.getEncoder();
+            session.setAttribute( ENCODER, encoder );
+        }
+        return encoder;
+    }
+    
+    private ProtocolEncoderOutputImpl getEncoderOut( IoSession session )
+    {
+        ProtocolEncoderOutputImpl out = ( ProtocolEncoderOutputImpl ) session.getAttribute( ENCODER_OUT );
+        if( out == null )
+        {
+            out = new ProtocolEncoderOutputImpl( session );
+            session.setAttribute( ENCODER_OUT, out );
+        }
+        return out;
+    }
+    
+    private ProtocolDecoder getDecoder( IoSession session )
+    {
+        ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+        if( decoder == null )
+        {
+            decoder = factory.getDecoder();
+            session.setAttribute( DECODER, decoder );
+        }
+        return decoder;
+    }
+    
+    private SimpleProtocolDecoderOutput getDecoderOut( IoSession session )
+    {
+        SimpleProtocolDecoderOutput out = ( SimpleProtocolDecoderOutput ) session.getAttribute( DECODER_OUT );
+        if( out == null )
+        {
+            out = new SimpleProtocolDecoderOutput();
+            session.setAttribute( DECODER_OUT, out );
+        }
+        return out;
+    }
+    
+    private void disposeEncoder( IoSession session )
+    {
+        session.removeAttribute( ENCODER_OUT );
+        ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+        if( encoder == null )
+        {
+            return;
+        }
+
+        try
+        {
+            encoder.dispose( session );
+        }
+        catch( Throwable t )
+        {
+            SessionLog.warn(
+                    session,
+                    "Failed to dispose: " + encoder.getClass().getName() +
+                    " (" + encoder + ')' );
+        }
+    }
+
+    private void disposeDecoder( IoSession session )
+    {
+        session.removeAttribute( DECODER_OUT );
+        ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+        if( decoder == null )
+        {
+            return;
+        }
+
+        try
+        {
+            decoder.dispose( session );
+        }
+        catch( Throwable t )
+        {
+            SessionLog.warn(
+                    session,
+                    "Falied to dispose: " + decoder.getClass().getName() +
+                    " (" + decoder + ')' );
+        }
+    }
+
+    private static class MessageByteBuffer extends ByteBufferProxy
+    {
+        private final Object message;
+        
+        private MessageByteBuffer( ByteBuffer buf, Object message )
+        {
+            super( buf );
+            this.message = message;
+        }
+    }
+    
+    private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput
+    {
+        private final IoSession session;
+        private NextFilter nextFilter;
+        private WriteRequest writeRequest;
+        
+        public ProtocolEncoderOutputImpl( IoSession session )
+        {
+            this.session = session;
+        }
+
+        protected WriteFuture doFlush( ByteBuffer buf )
+        {
+            WriteFuture future;
+            if( writeRequest != null )
+            {
+                future = writeRequest.getFuture();
+                nextFilter.filterWrite(
+                        session,
+                        new WriteRequest(
+                                new MessageByteBuffer(
+                                        buf, writeRequest.getMessage() ), future ) );
+            }
+            else
+            {
+                future = new WriteFuture();
+                nextFilter.filterWrite( session, new WriteRequest( buf, future ) );
+            }
+            return future;
+        }
+    }
+}

Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java Wed Nov 30 21:17:41 2005
@@ -1,57 +1,57 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-
-/**
- * Decodes binary or protocol-specific data into higher-level message objects.
- * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
- * method with read data, and then the decoder implementation puts decoded
- * messages into {@link ProtocolDecoderOutput} by calling
- * {@link ProtocolDecoderOutput#write(Object)}.
- * <p>
- * Please refer to
- * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/TextLineDecoder.html"><code>TextLineDecoder</code></a>
- * example. 
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public interface ProtocolDecoder
-{
-    /**
-     * Decodes binary or protocol-specific content into higher-level message objects.
-     * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
-     * method with read data, and then the decoder implementation puts decoded
-     * messages into {@link ProtocolDecoderOutput}.
-     * 
-     * @throws Exception if the read data violated protocol specification
-     */
-    void decode( IoSession session, ByteBuffer in,
-                 ProtocolDecoderOutput out ) throws Exception;
-    
-    /**
-     * Releases all resources related with this decoder.
-     * 
-     * @throws Exception if failed to dispose all resources
-     */
-    void dispose( IoSession session ) throws Exception;
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * Decodes binary or protocol-specific data into higher-level message objects.
+ * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
+ * method with read data, and then the decoder implementation puts decoded
+ * messages into {@link ProtocolDecoderOutput} by calling
+ * {@link ProtocolDecoderOutput#write(Object)}.
+ * <p>
+ * Please refer to
+ * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/TextLineDecoder.html"><code>TextLineDecoder</code></a>
+ * example. 
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ProtocolDecoder
+{
+    /**
+     * Decodes binary or protocol-specific content into higher-level message objects.
+     * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
+     * method with read data, and then the decoder implementation puts decoded
+     * messages into {@link ProtocolDecoderOutput}.
+     * 
+     * @throws Exception if the read data violated protocol specification
+     */
+    void decode( IoSession session, ByteBuffer in,
+                 ProtocolDecoderOutput out ) throws Exception;
+    
+    /**
+     * Releases all resources related with this decoder.
+     * 
+     * @throws Exception if failed to dispose all resources
+     */
+    void dispose( IoSession session ) throws Exception;
 }