You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/12/01 06:19:07 UTC
svn commit: r350169 [8/16] - in /directory/network:
branches/chain_refactor/src/java/org/apache/mina/common/
trunk/src/examples/org/apache/mina/examples/echoserver/
trunk/src/examples/org/apache/mina/examples/httpserver/
trunk/src/examples/org/apache/m...
Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Wed Nov 30 21:17:41 2005
@@ -1,708 +1,700 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.filter;
-
-import java.util.ArrayList;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.util.BlockingQueue;
-import org.apache.mina.util.ByteBufferUtil;
-import org.apache.mina.util.IdentityHashSet;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.Stack;
-
-/**
- * A Thread-pooling filter. This filter forwards {@link IoHandler} events
- * to its thread pool.
- * <p>
- * This is an implementation of
- * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
- * thread pool</a> by Douglas C. Schmidt et al.
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class ThreadPoolFilter implements IoFilter
-{
- /**
- * Default maximum size of thread pool (2G).
- */
- public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
-
- /**
- * Default keep-alive time of thread pool (1 min).
- */
- public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
-
- /**
- * A queue which contains {@link Integer}s which represents reusable
- * thread IDs. {@link Worker} first checks this queue and then
- * uses {@link #threadId} when no reusable thread ID is available.
- */
- private static final Queue threadIdReuseQueue = new Queue();
- private static int threadId = 0;
-
- private static int acquireThreadId()
- {
- synchronized( threadIdReuseQueue )
- {
- Integer id = ( Integer ) threadIdReuseQueue.pop();
- if( id == null )
- {
- return ++ threadId;
- }
- else
- {
- return id.intValue();
- }
- }
- }
-
- private static void releaseThreadId( int id )
- {
- synchronized( threadIdReuseQueue )
- {
- threadIdReuseQueue.push( new Integer( id ) );
- }
- }
-
- private final String threadNamePrefix;
- private final Map parents = new IdentityHashMap();
- private final Map buffers = new IdentityHashMap();
- private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
- private final Set allSessionBuffers = new IdentityHashSet();
-
- private Worker leader;
- private final Stack followers = new Stack();
- private final Set allWorkers = new IdentityHashSet();
-
- private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
- private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
-
- private boolean shuttingDown;
-
- private int poolSize;
- private final Object poolSizeLock = new Object();
-
- /**
- * Creates a new instance of this filter with default thread pool settings.
- */
- public ThreadPoolFilter()
- {
- this( "IoThreadPool" );
- }
-
- /**
- * Creates a new instance of this filter with the specified thread name prefix
- * and other default settings.
- * @param threadNamePrefix the prefix of the thread names this pool will create.
- */
- public ThreadPoolFilter( String threadNamePrefix )
- {
- if( threadNamePrefix == null )
- {
- throw new NullPointerException( "threadNamePrefix" );
- }
- threadNamePrefix = threadNamePrefix.trim();
- if( threadNamePrefix.length() == 0 )
- {
- throw new IllegalArgumentException( "threadNamePrefix is empty." );
- }
- this.threadNamePrefix = threadNamePrefix;
- }
-
- public String getThreadNamePrefix()
- {
- return threadNamePrefix;
- }
-
- public int getPoolSize()
- {
- synchronized( poolSizeLock )
- {
- return poolSize;
- }
- }
-
- public int getMaximumPoolSize()
- {
- return maximumPoolSize;
- }
-
- public int getKeepAliveTime()
- {
- return keepAliveTime;
- }
-
- public void setMaximumPoolSize( int maximumPoolSize )
- {
- if( maximumPoolSize <= 0 )
- throw new IllegalArgumentException();
- this.maximumPoolSize = maximumPoolSize;
- }
-
- public void setKeepAliveTime( int keepAliveTime )
- {
- this.keepAliveTime = keepAliveTime;
- }
-
- private void increasePoolSize( Worker worker )
- {
- synchronized( poolSizeLock )
- {
- poolSize++;
- allWorkers.add( worker );
- }
- }
-
- private void decreasePoolSize( Worker worker )
- {
- synchronized( poolSizeLock )
- {
- poolSize--;
- allWorkers.remove( worker );
- }
- }
-
- private void fireEvent( NextFilter nextFilter, IoSession session,
- EventType type, Object data )
- {
- final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
- final Set allSessionBuffers = this.allSessionBuffers;
- final Event event = new Event( type, nextFilter, data );
-
- synchronized( unfetchedSessionBuffers )
- {
- final SessionBuffer buf = getSessionBuffer( session );
- final Queue eventQueue = buf.eventQueue;
-
- synchronized( buf )
- {
- eventQueue.push( event );
- }
-
- if( !allSessionBuffers.contains( buf ) )
- {
- allSessionBuffers.add( buf );
- unfetchedSessionBuffers.push( buf );
- }
- }
- }
-
- /**
- * Implement this method to fetch (or pop) a {@link SessionBuffer} from
- * the given <tt>unfetchedSessionBuffers</tt>. The default implementation
- * simply pops the buffer from it. You could prioritize the fetch order.
- *
- * @return A non-null {@link SessionBuffer}
- */
- protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
- {
- return ( SessionBuffer ) unfetchedSessionBuffers.pop();
- }
-
- private SessionBuffer getSessionBuffer( IoSession session )
- {
- final Map buffers = this.buffers;
- SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
- if( buf == null )
- {
- synchronized( buffers )
- {
- buf = ( SessionBuffer ) buffers.get( session );
- if( buf == null )
- {
- buf = new SessionBuffer( session );
- buffers.put( session, buf );
- }
- }
- }
- return buf;
- }
-
- private void removeSessionBuffer( SessionBuffer buf )
- {
- final Map buffers = this.buffers;
- final IoSession session = buf.session;
- synchronized( buffers )
- {
- buffers.remove( session );
- }
- }
-
- protected static class SessionBuffer
- {
- private final IoSession session;
-
- private final Queue eventQueue = new Queue();
-
- private SessionBuffer( IoSession session )
- {
- this.session = session;
- }
-
- public IoSession getSession()
- {
- return session;
- }
-
- public Queue getEventQueue()
- {
- return eventQueue;
- }
- }
-
- private class Worker extends Thread
- {
- private final int id;
- private final Object promotionLock = new Object();
- private boolean dead;
-
- private Worker()
- {
- int id = acquireThreadId();
- this.id = id;
- this.setName( threadNamePrefix + '-' + id );
- increasePoolSize( this );
- }
-
- public boolean lead()
- {
- final Object promotionLock = this.promotionLock;
- synchronized( promotionLock )
- {
- if( dead )
- {
- return false;
- }
-
- leader = this;
- promotionLock.notify();
- }
-
- return true;
- }
-
- public void run()
- {
- for( ;; )
- {
- if( !waitForPromotion() )
- break;
-
- SessionBuffer buf = fetchBuffer();
- giveUpLead();
- if( buf == null )
- {
- break;
- }
-
- processEvents( buf );
- follow();
- releaseBuffer( buf );
- }
-
- decreasePoolSize( this );
- releaseThreadId( id );
- }
-
- private SessionBuffer fetchBuffer()
- {
- BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
- synchronized( unfetchedSessionBuffers )
- {
- while( !shuttingDown )
- {
- try
- {
- unfetchedSessionBuffers.waitForNewItem();
- }
- catch( InterruptedException e )
- {
- continue;
- }
-
- return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers );
- }
- }
-
- return null;
- }
-
- private void processEvents( SessionBuffer buf )
- {
- final IoSession session = buf.session;
- final Queue eventQueue = buf.eventQueue;
- for( ;; )
- {
- Event event;
- synchronized( buf )
- {
- event = ( Event ) eventQueue.pop();
- if( event == null )
- break;
- }
- processEvent( event.getNextFilter(), session,
- event.getType(), event.getData() );
- }
- }
-
- private void follow()
- {
- final Object promotionLock = this.promotionLock;
- final Stack followers = ThreadPoolFilter.this.followers;
- synchronized( promotionLock )
- {
- if( this != leader )
- {
- synchronized( followers )
- {
- followers.push( this );
- }
- }
- }
- }
-
- private void releaseBuffer( SessionBuffer buf )
- {
- final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
- final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
- final Queue eventQueue = buf.eventQueue;
-
- synchronized( unfetchedSessionBuffers )
- {
- if( eventQueue.isEmpty() )
- {
- allSessionBuffers.remove( buf );
- removeSessionBuffer( buf );
- }
- else
- {
- unfetchedSessionBuffers.push( buf );
- }
- }
- }
-
- private boolean waitForPromotion()
- {
- final Object promotionLock = this.promotionLock;
-
- long startTime = System.currentTimeMillis();
- long currentTime = System.currentTimeMillis();
-
- synchronized( promotionLock )
- {
- while( this != leader && !shuttingDown )
- {
- // Calculate remaining keep-alive time
- int keepAliveTime = getKeepAliveTime();
- if( keepAliveTime > 0 )
- {
- keepAliveTime -= ( currentTime - startTime );
- }
- else
- {
- keepAliveTime = Integer.MAX_VALUE;
- }
-
- // Break the loop if there's no remaining keep-alive time.
- if( keepAliveTime <= 0 )
- {
- break;
- }
-
- // Wait for promotion
- try
- {
- promotionLock.wait( keepAliveTime );
- }
- catch( InterruptedException e )
- {
- }
-
- // Update currentTime for the next iteration
- currentTime = System.currentTimeMillis();
- }
-
- boolean timeToLead = this == leader && !shuttingDown;
-
- if( !timeToLead )
- {
- // time to die
- synchronized( followers )
- {
- followers.remove( this );
- }
-
- // Mark as dead explicitly when we've got promotionLock.
- dead = true;
- }
-
- return timeToLead;
- }
- }
-
- private void giveUpLead()
- {
- final Stack followers = ThreadPoolFilter.this.followers;
- Worker worker;
- do
- {
- synchronized( followers )
- {
- worker = ( Worker ) followers.pop();
- }
-
- if( worker == null )
- {
- // Increase the number of threads if we
- // are not shutting down and we can increase the number.
- if( !shuttingDown
- && getPoolSize() < getMaximumPoolSize() )
- {
- worker = new Worker();
- worker.lead();
- worker.start();
- }
-
- // This loop should end because:
- // 1) lead() is called already,
- // 2) or it is shutting down and there's no more threads left.
- break;
- }
- }
- while( !worker.lead() );
- }
- }
-
- protected static class EventType
- {
- public static final EventType OPENED = new EventType( "OPENED" );
-
- public static final EventType CLOSED = new EventType( "CLOSED" );
-
- public static final EventType READ = new EventType( "READ" );
-
- public static final EventType WRITTEN = new EventType( "WRITTEN" );
-
- public static final EventType RECEIVED = new EventType( "RECEIVED" );
-
- public static final EventType SENT = new EventType( "SENT" );
-
- public static final EventType IDLE = new EventType( "IDLE" );
-
- public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
-
- private final String value;
-
- private EventType( String value )
- {
- this.value = value;
- }
-
- public String toString()
- {
- return value;
- }
- }
-
- protected static class Event
- {
- private final EventType type;
- private final NextFilter nextFilter;
- private final Object data;
-
- public Event( EventType type, NextFilter nextFilter, Object data )
- {
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- }
-
- public Object getData()
- {
- return data;
- }
-
-
- public NextFilter getNextFilter()
- {
- return nextFilter;
- }
-
-
- public EventType getType()
- {
- return type;
- }
- }
-
- public void sessionCreated( NextFilter nextFilter, IoSession session )
- {
- nextFilter.sessionCreated( session );
- }
-
- public void sessionOpened( NextFilter nextFilter,
- IoSession session )
- {
- fireEvent( nextFilter, session, EventType.OPENED, null );
- }
-
- public void sessionClosed( NextFilter nextFilter,
- IoSession session )
- {
- fireEvent( nextFilter, session, EventType.CLOSED, null );
- }
-
- public void sessionIdle( NextFilter nextFilter,
- IoSession session, IdleStatus status )
- {
- fireEvent( nextFilter, session, EventType.IDLE, status );
- }
-
- public void exceptionCaught( NextFilter nextFilter,
- IoSession session, Throwable cause )
- {
- fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
- }
-
- public void messageReceived( NextFilter nextFilter,
- IoSession session, Object message )
- {
- ByteBufferUtil.acquireIfPossible( message );
- fireEvent( nextFilter, session, EventType.RECEIVED, message );
- }
-
- public void messageSent( NextFilter nextFilter,
- IoSession session, Object message )
- {
- ByteBufferUtil.acquireIfPossible( message );
- fireEvent( nextFilter, session, EventType.SENT, message );
- }
-
- protected void processEvent( NextFilter nextFilter,
- IoSession session, EventType type,
- Object data )
- {
- if( type == EventType.RECEIVED )
- {
- nextFilter.messageReceived( session, data );
- ByteBufferUtil.releaseIfPossible( data );
- }
- else if( type == EventType.SENT )
- {
- nextFilter.messageSent( session, data );
- ByteBufferUtil.releaseIfPossible( data );
- }
- else if( type == EventType.EXCEPTION )
- {
- nextFilter.exceptionCaught( session, ( Throwable ) data );
- }
- else if( type == EventType.IDLE )
- {
- nextFilter.sessionIdle( session, ( IdleStatus ) data );
- }
- else if( type == EventType.OPENED )
- {
- nextFilter.sessionOpened( session );
- }
- else if( type == EventType.CLOSED )
- {
- nextFilter.sessionClosed( session );
- }
- }
-
- public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
- {
- nextFilter.filterWrite( session, writeRequest );
- }
-
- public void filterClose( NextFilter nextFilter, IoSession session, CloseFuture closeFuture ) throws Exception
- {
- nextFilter.filterClose( session, closeFuture );
- }
-
- public synchronized void init( IoFilterChain parent, NextFilter nextFilter )
- {
- if( parents.size() > 0 )
- {
- return;
- }
-
- parents.put( parent, Boolean.TRUE );
-
- shuttingDown = false;
- leader = new Worker();
- leader.start();
- leader.lead();
- }
-
- public synchronized void destroy( IoFilterChain parent, NextFilter nextFilter )
- {
- parents.remove( parent );
- if( parents.size() > 0 )
- {
- return;
- }
-
- shuttingDown = true;
- while( getPoolSize() != 0 )
- {
- List allWorkers;
- synchronized( poolSizeLock )
- {
- allWorkers = new ArrayList( this.allWorkers );
- }
-
- for( Iterator i = allWorkers.iterator(); i.hasNext(); )
- {
- Worker worker = ( Worker ) i.next();
- while( worker.isAlive() )
- {
- worker.interrupt();
- try
- {
- // This timeout will help us from
- // infinite lock-up and interrupt workers again.
- worker.join( 100 );
- }
- catch( InterruptedException e )
- {
- }
- }
- }
- }
-
- this.allSessionBuffers.clear();
- this.unfetchedSessionBuffers.clear();
- this.buffers.clear();
- this.followers.clear();
- this.leader = null;
- }
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.BlockingQueue;
+import org.apache.mina.util.ByteBufferUtil;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.Stack;
+
+/**
+ * A Thread-pooling filter. This filter forwards {@link IoHandler} events
+ * to its thread pool.
+ * <p>
+ * This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ThreadPoolFilter extends IoFilterAdapter
+{
+ /**
+ * Default maximum size of thread pool (2G).
+ */
+ public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+ /**
+ * Default keep-alive time of thread pool (1 min).
+ */
+ public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+ /**
+ * A queue which contains {@link Integer}s which represents reusable
+ * thread IDs. {@link Worker} first checks this queue and then
+ * uses {@link #threadId} when no reusable thread ID is available.
+ */
+ private static final Queue threadIdReuseQueue = new Queue();
+ private static int threadId = 0;
+
+ private static int acquireThreadId()
+ {
+ synchronized( threadIdReuseQueue )
+ {
+ Integer id = ( Integer ) threadIdReuseQueue.pop();
+ if( id == null )
+ {
+ return ++ threadId;
+ }
+ else
+ {
+ return id.intValue();
+ }
+ }
+ }
+
+ private static void releaseThreadId( int id )
+ {
+ synchronized( threadIdReuseQueue )
+ {
+ threadIdReuseQueue.push( new Integer( id ) );
+ }
+ }
+
+ private final String threadNamePrefix;
+ private final Map buffers = new IdentityHashMap();
+ private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
+ private final Set allSessionBuffers = new IdentityHashSet();
+
+ private Worker leader;
+ private final Stack followers = new Stack();
+ private final Set allWorkers = new IdentityHashSet();
+
+ private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+ private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+ private boolean shuttingDown;
+
+ private int poolSize;
+ private final Object poolSizeLock = new Object();
+
+ /**
+ * Creates a new instance of this filter with default thread pool settings.
+ */
+ public ThreadPoolFilter()
+ {
+ this( "IoThreadPool" );
+ }
+
+ /**
+ * Creates a new instance of this filter with the specified thread name prefix
+ * and other default settings.
+ * @param threadNamePrefix the prefix of the thread names this pool will create.
+ */
+ public ThreadPoolFilter( String threadNamePrefix )
+ {
+ if( threadNamePrefix == null )
+ {
+ throw new NullPointerException( "threadNamePrefix" );
+ }
+ threadNamePrefix = threadNamePrefix.trim();
+ if( threadNamePrefix.length() == 0 )
+ {
+ throw new IllegalArgumentException( "threadNamePrefix is empty." );
+ }
+ this.threadNamePrefix = threadNamePrefix;
+ }
+
+ public String getThreadNamePrefix()
+ {
+ return threadNamePrefix;
+ }
+
+ public int getPoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ return poolSize;
+ }
+ }
+
+ public int getMaximumPoolSize()
+ {
+ return maximumPoolSize;
+ }
+
+ public int getKeepAliveTime()
+ {
+ return keepAliveTime;
+ }
+
+ public void setMaximumPoolSize( int maximumPoolSize )
+ {
+ if( maximumPoolSize <= 0 )
+ throw new IllegalArgumentException();
+ this.maximumPoolSize = maximumPoolSize;
+ }
+
+ public void setKeepAliveTime( int keepAliveTime )
+ {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public void init()
+ {
+ shuttingDown = false;
+ leader = new Worker();
+ leader.start();
+ leader.lead();
+ }
+
+ public void destroy()
+ {
+ shuttingDown = true;
+ int expectedPoolSize = 0;
+ while( getPoolSize() != expectedPoolSize )
+ {
+ List allWorkers;
+ synchronized( poolSizeLock )
+ {
+ allWorkers = new ArrayList( this.allWorkers );
+ }
+
+ // You may not interrupt the current thread.
+ if( allWorkers.remove( Thread.currentThread() ) )
+ {
+ expectedPoolSize = 1;
+ }
+
+ for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+ {
+ Worker worker = ( Worker ) i.next();
+ while( worker.isAlive() )
+ {
+ worker.interrupt();
+ try
+ {
+ // This timeout will help us from
+ // infinite lock-up and interrupt workers again.
+ worker.join( 100 );
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+ }
+
+ this.allSessionBuffers.clear();
+ this.unfetchedSessionBuffers.clear();
+ this.buffers.clear();
+ this.followers.clear();
+ this.leader = null;
+ }
+
+ private void increasePoolSize( Worker worker )
+ {
+ synchronized( poolSizeLock )
+ {
+ poolSize++;
+ allWorkers.add( worker );
+ }
+ }
+
+ private void decreasePoolSize( Worker worker )
+ {
+ synchronized( poolSizeLock )
+ {
+ poolSize--;
+ allWorkers.remove( worker );
+ }
+ }
+
+ private void fireEvent( NextFilter nextFilter, IoSession session,
+ EventType type, Object data )
+ {
+ final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
+ final Set allSessionBuffers = this.allSessionBuffers;
+ final Event event = new Event( type, nextFilter, data );
+
+ synchronized( unfetchedSessionBuffers )
+ {
+ final SessionBuffer buf = getSessionBuffer( session );
+ final Queue eventQueue = buf.eventQueue;
+
+ synchronized( buf )
+ {
+ eventQueue.push( event );
+ }
+
+ if( !allSessionBuffers.contains( buf ) )
+ {
+ allSessionBuffers.add( buf );
+ unfetchedSessionBuffers.push( buf );
+ }
+ }
+ }
+
+ /**
+ * Implement this method to fetch (or pop) a {@link SessionBuffer} from
+ * the given <tt>unfetchedSessionBuffers</tt>. The default implementation
+ * simply pops the buffer from it. You could prioritize the fetch order.
+ *
+ * @return A non-null {@link SessionBuffer}
+ */
+ protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
+ {
+ return ( SessionBuffer ) unfetchedSessionBuffers.pop();
+ }
+
+ private SessionBuffer getSessionBuffer( IoSession session )
+ {
+ final Map buffers = this.buffers;
+ SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+ if( buf == null )
+ {
+ synchronized( buffers )
+ {
+ buf = ( SessionBuffer ) buffers.get( session );
+ if( buf == null )
+ {
+ buf = new SessionBuffer( session );
+ buffers.put( session, buf );
+ }
+ }
+ }
+ return buf;
+ }
+
+ private void removeSessionBuffer( SessionBuffer buf )
+ {
+ final Map buffers = this.buffers;
+ final IoSession session = buf.session;
+ synchronized( buffers )
+ {
+ buffers.remove( session );
+ }
+ }
+
+ protected static class SessionBuffer
+ {
+ private final IoSession session;
+
+ private final Queue eventQueue = new Queue();
+
+ private SessionBuffer( IoSession session )
+ {
+ this.session = session;
+ }
+
+ public IoSession getSession()
+ {
+ return session;
+ }
+
+ public Queue getEventQueue()
+ {
+ return eventQueue;
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ private final int id;
+ private final Object promotionLock = new Object();
+ private boolean dead;
+
+ private Worker()
+ {
+ int id = acquireThreadId();
+ this.id = id;
+ this.setName( threadNamePrefix + '-' + id );
+ increasePoolSize( this );
+ }
+
+ public boolean lead()
+ {
+ final Object promotionLock = this.promotionLock;
+ synchronized( promotionLock )
+ {
+ if( dead )
+ {
+ return false;
+ }
+
+ leader = this;
+ promotionLock.notify();
+ }
+
+ return true;
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ if( !waitForPromotion() )
+ break;
+
+ SessionBuffer buf = fetchBuffer();
+ giveUpLead();
+ if( buf == null )
+ {
+ break;
+ }
+
+ processEvents( buf );
+ follow();
+ releaseBuffer( buf );
+ }
+
+ decreasePoolSize( this );
+ releaseThreadId( id );
+ }
+
+ private SessionBuffer fetchBuffer()
+ {
+ BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+ synchronized( unfetchedSessionBuffers )
+ {
+ while( !shuttingDown )
+ {
+ try
+ {
+ unfetchedSessionBuffers.waitForNewItem();
+ }
+ catch( InterruptedException e )
+ {
+ continue;
+ }
+
+ return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers );
+ }
+ }
+
+ return null;
+ }
+
+ private void processEvents( SessionBuffer buf )
+ {
+ final IoSession session = buf.session;
+ final Queue eventQueue = buf.eventQueue;
+ for( ;; )
+ {
+ Event event;
+ synchronized( buf )
+ {
+ event = ( Event ) eventQueue.pop();
+ if( event == null )
+ break;
+ }
+ processEvent( event.getNextFilter(), session,
+ event.getType(), event.getData() );
+ }
+ }
+
+ private void follow()
+ {
+ final Object promotionLock = this.promotionLock;
+ final Stack followers = ThreadPoolFilter.this.followers;
+ synchronized( promotionLock )
+ {
+ if( this != leader )
+ {
+ synchronized( followers )
+ {
+ followers.push( this );
+ }
+ }
+ }
+ }
+
+ private void releaseBuffer( SessionBuffer buf )
+ {
+ final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+ final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
+ final Queue eventQueue = buf.eventQueue;
+
+ synchronized( unfetchedSessionBuffers )
+ {
+ if( eventQueue.isEmpty() )
+ {
+ allSessionBuffers.remove( buf );
+ removeSessionBuffer( buf );
+ }
+ else
+ {
+ unfetchedSessionBuffers.push( buf );
+ }
+ }
+ }
+
+ private boolean waitForPromotion()
+ {
+ final Object promotionLock = this.promotionLock;
+
+ long startTime = System.currentTimeMillis();
+ long currentTime = System.currentTimeMillis();
+
+ synchronized( promotionLock )
+ {
+ while( this != leader && !shuttingDown )
+ {
+ // Calculate remaining keep-alive time
+ int keepAliveTime = getKeepAliveTime();
+ if( keepAliveTime > 0 )
+ {
+ keepAliveTime -= ( currentTime - startTime );
+ }
+ else
+ {
+ keepAliveTime = Integer.MAX_VALUE;
+ }
+
+ // Break the loop if there's no remaining keep-alive time.
+ if( keepAliveTime <= 0 )
+ {
+ break;
+ }
+
+ // Wait for promotion
+ try
+ {
+ promotionLock.wait( keepAliveTime );
+ }
+ catch( InterruptedException e )
+ {
+ }
+
+ // Update currentTime for the next iteration
+ currentTime = System.currentTimeMillis();
+ }
+
+ boolean timeToLead = this == leader && !shuttingDown;
+
+ if( !timeToLead )
+ {
+ // time to die
+ synchronized( followers )
+ {
+ followers.remove( this );
+ }
+
+ // Mark as dead explicitly when we've got promotionLock.
+ dead = true;
+ }
+
+ return timeToLead;
+ }
+ }
+
+ private void giveUpLead()
+ {
+ final Stack followers = ThreadPoolFilter.this.followers;
+ Worker worker;
+ do
+ {
+ synchronized( followers )
+ {
+ worker = ( Worker ) followers.pop();
+ }
+
+ if( worker == null )
+ {
+ // Increase the number of threads if we
+ // are not shutting down and we can increase the number.
+ if( !shuttingDown
+ && getPoolSize() < getMaximumPoolSize() )
+ {
+ worker = new Worker();
+ worker.lead();
+ worker.start();
+ }
+
+ // This loop should end because:
+ // 1) lead() is called already,
+ // 2) or it is shutting down and there's no more threads left.
+ break;
+ }
+ }
+ while( !worker.lead() );
+ }
+ }
+
+ protected static class EventType
+ {
+ public static final EventType OPENED = new EventType( "OPENED" );
+
+ public static final EventType CLOSED = new EventType( "CLOSED" );
+
+ public static final EventType READ = new EventType( "READ" );
+
+ public static final EventType WRITTEN = new EventType( "WRITTEN" );
+
+ public static final EventType RECEIVED = new EventType( "RECEIVED" );
+
+ public static final EventType SENT = new EventType( "SENT" );
+
+ public static final EventType IDLE = new EventType( "IDLE" );
+
+ public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+ private final String value;
+
+ private EventType( String value )
+ {
+ this.value = value;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
+ }
+
+ protected static class Event
+ {
+ private final EventType type;
+ private final NextFilter nextFilter;
+ private final Object data;
+
+ public Event( EventType type, NextFilter nextFilter, Object data )
+ {
+ this.type = type;
+ this.nextFilter = nextFilter;
+ this.data = data;
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+
+ public NextFilter getNextFilter()
+ {
+ return nextFilter;
+ }
+
+
+ public EventType getType()
+ {
+ return type;
+ }
+ }
+
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ nextFilter.sessionCreated( session );
+ }
+
+ public void sessionOpened( NextFilter nextFilter,
+ IoSession session )
+ {
+ fireEvent( nextFilter, session, EventType.OPENED, null );
+ }
+
+ public void sessionClosed( NextFilter nextFilter,
+ IoSession session )
+ {
+ fireEvent( nextFilter, session, EventType.CLOSED, null );
+ }
+
+ public void sessionIdle( NextFilter nextFilter,
+ IoSession session, IdleStatus status )
+ {
+ fireEvent( nextFilter, session, EventType.IDLE, status );
+ }
+
+ public void exceptionCaught( NextFilter nextFilter,
+ IoSession session, Throwable cause )
+ {
+ fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
+ }
+
+ public void messageReceived( NextFilter nextFilter,
+ IoSession session, Object message )
+ {
+ ByteBufferUtil.acquireIfPossible( message );
+ fireEvent( nextFilter, session, EventType.RECEIVED, message );
+ }
+
+ public void messageSent( NextFilter nextFilter,
+ IoSession session, Object message )
+ {
+ ByteBufferUtil.acquireIfPossible( message );
+ fireEvent( nextFilter, session, EventType.SENT, message );
+ }
+
+ protected void processEvent( NextFilter nextFilter,
+ IoSession session, EventType type,
+ Object data )
+ {
+ if( type == EventType.RECEIVED )
+ {
+ nextFilter.messageReceived( session, data );
+ ByteBufferUtil.releaseIfPossible( data );
+ }
+ else if( type == EventType.SENT )
+ {
+ nextFilter.messageSent( session, data );
+ ByteBufferUtil.releaseIfPossible( data );
+ }
+ else if( type == EventType.EXCEPTION )
+ {
+ nextFilter.exceptionCaught( session, ( Throwable ) data );
+ }
+ else if( type == EventType.IDLE )
+ {
+ nextFilter.sessionIdle( session, ( IdleStatus ) data );
+ }
+ else if( type == EventType.OPENED )
+ {
+ nextFilter.sessionOpened( session );
+ }
+ else if( type == EventType.CLOSED )
+ {
+ nextFilter.sessionClosed( session );
+ }
+ }
+
+ public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ }
+
+ public void filterClose( NextFilter nextFilter, IoSession session, CloseFuture closeFuture ) throws Exception
+ {
+ nextFilter.filterClose( session, closeFuture );
+ }
}
Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/CumulativeProtocolDecoder.java Wed Nov 30 21:17:41 2005
@@ -1,171 +1,171 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-
-/**
- * A {@link ProtocolDecoder} that cumulates the content of received
- * buffers to a <em>cumulative buffer</em> to help users implement decoders.
- * <p>
- * If the received {@link ByteBuffer} is only a part of a message.
- * decoders should cumulate received buffers to make a message complete or
- * to postpone decoding until more buffers arrive.
- * <p>
- * Here is an example decoder that decodes a list of integers:
- * <pre>
- * public class IntegerDecoder extends CumulativeProtocolDecoder {
- *
- * public IntegerDecoder() {
- * super(4);
- * }
- *
- * protected boolean doDecode(IoSession session, ByteBuffer in,
- * ProtocolDecoderOutput out) throws ProtocolViolationException {
- * if (in.remaining() < 4) {
- * return false; // Cumulate remainder to decode later.
- * }
- *
- * out.write(new Integer(in.getInt()));
- *
- * // Decoded one integer; CumulativeProtocolDecoder will call me again,
- * // so I can decode as many integers as possible.
- * return true;
- * }
- * }
- * </pre>
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
-
- private static final String BUFFER = CumulativeProtocolDecoder.class.getName() + ".Buffer";
-
- private final int initialCapacity;
-
- /**
- * Creates a new instance with the 16 bytes initial capacity of
- * cumulative buffer. Please note that the capacity increases
- * automatically.
- */
- protected CumulativeProtocolDecoder()
- {
- this( 16 );
- }
-
- /**
- * Creates a new instance with the specified initial capacity of
- * cumulative buffer. Please note that the capacity increases
- * automatically.
- */
- protected CumulativeProtocolDecoder( int initialCapacity )
- {
- if( initialCapacity < 0 )
- {
- throw new IllegalArgumentException( "initialCapacity: " + initialCapacity );
- }
-
- this.initialCapacity = initialCapacity;
- }
-
- /**
- * Cumulates content of <tt>in</tt> into internal buffer and forwards
- * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- * and the cumulative buffer is compacted after decoding ends.
- *
- * @throws IllegalStateException if your <tt>doDecode()</tt> returned
- * <tt>true</tt> not consuming the cumulative buffer.
- */
- public void decode( IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out ) throws Exception
- {
- ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
- if( buf == null )
- {
- buf = ByteBuffer.allocate( initialCapacity );
- buf.setAutoExpand( true );
- session.setAttribute( BUFFER, buf );
- }
-
- buf.put( in );
- buf.flip();
-
- try
- {
- for( ;; )
- {
- int oldPos = buf.position();
- boolean decoded = doDecode( session, buf, out );
- if( decoded )
- {
- if( buf.position() == oldPos )
- {
- throw new IllegalStateException(
- "doDecode() can't return true when buffer is not consumed." );
- }
-
- if( !buf.hasRemaining() )
- {
- break;
- }
- }
- else
- {
- break;
- }
- }
- }
- finally
- {
- buf.compact();
- }
- }
-
- /**
- * Implement this method to consume the specified cumulative buffer and
- * decode its content into message(s).
- *
- * @param in the cumulative buffer
- * @return <tt>true</tt> if and only if there's more to decode in the buffer
- * and you want to have <tt>doDecode</tt> method invoked again.
- * Return <tt>false</tt> if remaining data is not enough to decode,
- * then this method will be invoked again when more data is cumulated.
- * @throws Exception if cannot decode <tt>in</tt>.
- */
- protected abstract boolean doDecode( IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out ) throws Exception;
-
- /**
- * Releases the cumulative buffer used by the specified <tt>session</tt>.
- * Please don't forget to call <tt>super.dispose( session )</tt> when
- * you override this method.
- */
- public void dispose( IoSession session ) throws Exception
- {
- ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
- if( buf != null )
- {
- buf.release();
- session.removeAttribute( BUFFER );
- }
- }
-}
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes a list of integers:
+ * <pre>
+ * public class IntegerDecoder extends CumulativeProtocolDecoder {
+ *
+ * public IntegerDecoder() {
+ * super(4);
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out) throws ProtocolViolationException {
+ * if (in.remaining() < 4) {
+ * return false; // Cumulate remainder to decode later.
+ * }
+ *
+ * out.write(new Integer(in.getInt()));
+ *
+ * // Decoded one integer; CumulativeProtocolDecoder will call me again,
+ * // so I can decode as many integers as possible.
+ * return true;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = CumulativeProtocolDecoder.class.getName() + ".Buffer";
+
+ private final int initialCapacity;
+
+ /**
+ * Creates a new instance with the 16 bytes initial capacity of
+ * cumulative buffer. Please note that the capacity increases
+ * automatically.
+ */
+ protected CumulativeProtocolDecoder()
+ {
+ this( 16 );
+ }
+
+ /**
+ * Creates a new instance with the specified initial capacity of
+ * cumulative buffer. Please note that the capacity increases
+ * automatically.
+ */
+ protected CumulativeProtocolDecoder( int initialCapacity )
+ {
+ if( initialCapacity < 0 )
+ {
+ throw new IllegalArgumentException( "initialCapacity: " + initialCapacity );
+ }
+
+ this.initialCapacity = initialCapacity;
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf == null )
+ {
+ buf = ByteBuffer.allocate( initialCapacity );
+ buf.setAutoExpand( true );
+ session.setAttribute( BUFFER, buf );
+ }
+
+ buf.put( in );
+ buf.flip();
+
+ try
+ {
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
+ {
+ if( buf.position() == oldPos )
+ {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
+ }
+
+ if( !buf.hasRemaining() )
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ finally
+ {
+ buf.compact();
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
+ {
+ buf.release();
+ session.removeAttribute( BUFFER );
+ }
+ }
+}
Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecException.java Wed Nov 30 21:17:41 2005
@@ -1,64 +1,64 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-/**
- * An exception that is thrown when {@link ProtocolEncoder} or
- * {@link ProtocolDecoder} cannot understand or failed to validate
- * data to process.
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class ProtocolCodecException extends Exception
-{
- private static final long serialVersionUID = 5939878548186330695L;
-
- /**
- * Constructs a new instance.
- */
- public ProtocolCodecException()
- {
- }
-
- /**
- * Constructs a new instance with the specified message.
- */
- public ProtocolCodecException( String message )
- {
- super( message );
- }
-
- /**
- * Constructs a new instance with the specified cause.
- */
- public ProtocolCodecException( Throwable cause )
- {
- super( cause );
- }
-
- /**
- * Constructs a new instance with the specified message and the specified
- * cause.
- */
- public ProtocolCodecException( String message, Throwable cause )
- {
- super( message, cause );
- }
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+/**
+ * An exception that is thrown when {@link ProtocolEncoder} or
+ * {@link ProtocolDecoder} cannot understand or failed to validate
+ * data to process.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ProtocolCodecException extends Exception
+{
+ private static final long serialVersionUID = 5939878548186330695L;
+
+ /**
+ * Constructs a new instance.
+ */
+ public ProtocolCodecException()
+ {
+ }
+
+ /**
+ * Constructs a new instance with the specified message.
+ */
+ public ProtocolCodecException( String message )
+ {
+ super( message );
+ }
+
+ /**
+ * Constructs a new instance with the specified cause.
+ */
+ public ProtocolCodecException( Throwable cause )
+ {
+ super( cause );
+ }
+
+ /**
+ * Constructs a new instance with the specified message and the specified
+ * cause.
+ */
+ public ProtocolCodecException( String message, Throwable cause )
+ {
+ super( message, cause );
+ }
}
Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java Wed Nov 30 21:17:41 2005
@@ -1,45 +1,45 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-/**
- * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
- * binary or protocol specific data into message object and vice versa.
- * <p>
- * Please refer to
- * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
- * example.
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public interface ProtocolCodecFactory
-{
- /**
- * Returns a new (or reusable) instance of {@link ProtocolEncoder} which
- * encodes message objects into binary or protocol-specific data.
- */
- ProtocolEncoder getEncoder();
-
- /**
- * Returns a new (or reusable) instance of {@link ProtocolDecoder} which
- * decodes binary or protocol-specific data into message objects.
- */
- ProtocolDecoder getDecoder();
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+/**
+ * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
+ * binary or protocol specific data into message object and vice versa.
+ * <p>
+ * Please refer to
+ * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
+ * example.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ProtocolCodecFactory
+{
+ /**
+ * Returns a new (or reusable) instance of {@link ProtocolEncoder} which
+ * encodes message objects into binary or protocol-specific data.
+ */
+ ProtocolEncoder getEncoder();
+
+ /**
+ * Returns a new (or reusable) instance of {@link ProtocolDecoder} which
+ * decodes binary or protocol-specific data into message objects.
+ */
+ ProtocolDecoder getDecoder();
}
Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Wed Nov 30 21:17:41 2005
@@ -1,289 +1,298 @@
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ByteBufferProxy;
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
-import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.SessionLog;
-
-public class ProtocolCodecFilter extends IoFilterAdapter
-{
- public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder";
- public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder";
- public static final String ENCODER_OUT = ProtocolCodecFilter.class.getName() + ".encoderOutput";
- public static final String DECODER_OUT = ProtocolCodecFilter.class.getName() + ".decoderOutput";
-
- private final ProtocolCodecFactory factory;
-
- public ProtocolCodecFilter( ProtocolCodecFactory factory )
- {
- if( factory == null )
- {
- throw new NullPointerException( "factory" );
- }
- this.factory = factory;
- }
-
- public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
- {
- if( !( message instanceof ByteBuffer ) )
- {
- nextFilter.messageReceived( session, message );
- return;
- }
-
- ByteBuffer in = ( ByteBuffer ) message;
- ProtocolDecoder decoder = getDecoder( session );
- SimpleProtocolDecoderOutput decoderOut = getDecoderOut( session );
-
- try
- {
- synchronized( decoder )
- {
- decoder.decode( session, in, decoderOut );
- }
- }
- catch( Throwable t )
- {
- ProtocolDecoderException pde;
- if( t instanceof ProtocolDecoderException )
- {
- pde = ( ProtocolDecoderException ) t;
- }
- else
- {
- pde = new ProtocolDecoderException( t );
- }
- pde.setHexdump( in.getHexDump() );
- throw pde;
- }
- finally
- {
- // Dispose the decoder if this session is connectionless.
- if( session.getTransportType().isConnectionless() )
- {
- disposeDecoder( session );
- }
-
- // Release the read buffer.
- in.release();
-
- Queue queue = decoderOut.getMessageQueue();
- synchronized( queue )
- {
- if( !queue.isEmpty() )
- {
- do
- {
- nextFilter.messageReceived( session, queue.pop() );
- }
- while( !queue.isEmpty() );
- }
- }
- }
- }
-
- public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
- {
- if( ! ( message instanceof MessageByteBuffer ) )
- {
- nextFilter.messageSent( session, message );
- return;
- }
-
- MessageByteBuffer buf = ( MessageByteBuffer ) message;
- try
- {
- buf.release();
- }
- finally
- {
- nextFilter.messageSent( session, buf.message );
- }
- }
-
- public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
- {
- Object message = writeRequest.getMessage();
- if( message instanceof ByteBuffer )
- {
- nextFilter.filterWrite( session, writeRequest );
- return;
- }
-
- ProtocolEncoder encoder = getEncoder( session );
- ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session );
- encoderOut.nextFilter = nextFilter;
-
- try
- {
- encoder.encode( session, message, encoderOut );
- }
- catch( Throwable t )
- {
- ProtocolEncoderException pee;
- if( t instanceof ProtocolEncoderException )
- {
- pee = ( ProtocolEncoderException ) t;
- }
- else
- {
- pee = new ProtocolEncoderException( t );
- }
- throw pee;
- }
- finally
- {
- // Dispose the encoder if this session is connectionless.
- if( session.getTransportType().isConnectionless() )
- {
- disposeEncoder( session );
- }
- }
-
- encoderOut.writeRequest = writeRequest;
- encoderOut.flush();
- }
-
- public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception
- {
- disposeEncoder( session );
- disposeDecoder( session );
- nextFilter.sessionClosed( session );
- }
-
- private ProtocolEncoder getEncoder( IoSession session )
- {
- ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
- if( encoder == null )
- {
- encoder = factory.getEncoder();
- session.setAttribute( ENCODER, encoder );
- }
- return encoder;
- }
-
- private ProtocolEncoderOutputImpl getEncoderOut( IoSession session )
- {
- ProtocolEncoderOutputImpl out = ( ProtocolEncoderOutputImpl ) session.getAttribute( ENCODER_OUT );
- if( out == null )
- {
- out = new ProtocolEncoderOutputImpl( session );
- session.setAttribute( ENCODER_OUT, out );
- }
- return out;
- }
-
- private ProtocolDecoder getDecoder( IoSession session )
- {
- ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
- if( decoder == null )
- {
- decoder = factory.getDecoder();
- session.setAttribute( DECODER, decoder );
- }
- return decoder;
- }
-
- private SimpleProtocolDecoderOutput getDecoderOut( IoSession session )
- {
- SimpleProtocolDecoderOutput out = ( SimpleProtocolDecoderOutput ) session.getAttribute( DECODER_OUT );
- if( out == null )
- {
- out = new SimpleProtocolDecoderOutput();
- session.setAttribute( DECODER_OUT, out );
- }
- return out;
- }
-
- private void disposeEncoder( IoSession session )
- {
- session.removeAttribute( ENCODER_OUT );
- ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
- if( encoder == null )
- {
- return;
- }
-
- try
- {
- encoder.dispose( session );
- }
- catch( Throwable t )
- {
- SessionLog.warn(
- session,
- "Failed to dispose: " + encoder.getClass().getName() +
- " (" + encoder + ')' );
- }
- }
-
- private void disposeDecoder( IoSession session )
- {
- session.removeAttribute( DECODER_OUT );
- ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
- if( decoder == null )
- {
- return;
- }
-
- try
- {
- decoder.dispose( session );
- }
- catch( Throwable t )
- {
- SessionLog.warn(
- session,
- "Falied to dispose: " + decoder.getClass().getName() +
- " (" + decoder + ')' );
- }
- }
-
- private static class MessageByteBuffer extends ByteBufferProxy
- {
- private final Object message;
-
- private MessageByteBuffer( ByteBuffer buf, Object message )
- {
- super( buf );
- this.message = message;
- }
- }
-
- private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput
- {
- private final IoSession session;
- private NextFilter nextFilter;
- private WriteRequest writeRequest;
-
- public ProtocolEncoderOutputImpl( IoSession session )
- {
- this.session = session;
- }
-
- protected WriteFuture doFlush( ByteBuffer buf )
- {
- WriteFuture future;
- if( writeRequest != null )
- {
- future = writeRequest.getFuture();
- nextFilter.filterWrite(
- session,
- new WriteRequest(
- new MessageByteBuffer(
- buf, writeRequest.getMessage() ), future ) );
- }
- else
- {
- future = new WriteFuture();
- nextFilter.filterWrite( session, new WriteRequest( buf, future ) );
- }
- return future;
- }
- }
-}
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ByteBufferProxy;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.SessionLog;
+
+public class ProtocolCodecFilter extends IoFilterAdapter
+{
+ public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder";
+ public static final String ENCODER_OUT = ProtocolCodecFilter.class.getName() + ".encoderOutput";
+ public static final String DECODER_OUT = ProtocolCodecFilter.class.getName() + ".decoderOutput";
+
+ private final ProtocolCodecFactory factory;
+
+ public ProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public void onPostAdd( IoFilterChain parent, String name, NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one ProtocolCodecFilter." );
+ }
+ }
+
+ public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( !( message instanceof ByteBuffer ) )
+ {
+ nextFilter.messageReceived( session, message );
+ return;
+ }
+
+ ByteBuffer in = ( ByteBuffer ) message;
+ ProtocolDecoder decoder = getDecoder( session );
+ SimpleProtocolDecoderOutput decoderOut = getDecoderOut( session );
+
+ try
+ {
+ synchronized( decoder )
+ {
+ decoder.decode( session, in, decoderOut );
+ }
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.release();
+
+ Queue queue = decoderOut.getMessageQueue();
+ synchronized( queue )
+ {
+ if( !queue.isEmpty() )
+ {
+ do
+ {
+ nextFilter.messageReceived( session, queue.pop() );
+ }
+ while( !queue.isEmpty() );
+ }
+ }
+ }
+ }
+
+ public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( ! ( message instanceof MessageByteBuffer ) )
+ {
+ nextFilter.messageSent( session, message );
+ return;
+ }
+
+ MessageByteBuffer buf = ( MessageByteBuffer ) message;
+ try
+ {
+ buf.release();
+ }
+ finally
+ {
+ nextFilter.messageSent( session, buf.message );
+ }
+ }
+
+ public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
+ {
+ Object message = writeRequest.getMessage();
+ if( message instanceof ByteBuffer )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ return;
+ }
+
+ ProtocolEncoder encoder = getEncoder( session );
+ ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session );
+ encoderOut.nextFilter = nextFilter;
+
+ try
+ {
+ encoder.encode( session, message, encoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolEncoderException pee;
+ if( t instanceof ProtocolEncoderException )
+ {
+ pee = ( ProtocolEncoderException ) t;
+ }
+ else
+ {
+ pee = new ProtocolEncoderException( t );
+ }
+ throw pee;
+ }
+ finally
+ {
+ // Dispose the encoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeEncoder( session );
+ }
+ }
+
+ encoderOut.writeRequest = writeRequest;
+ encoderOut.flush();
+ }
+
+ public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ disposeEncoder( session );
+ disposeDecoder( session );
+ nextFilter.sessionClosed( session );
+ }
+
+ private ProtocolEncoder getEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder();
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ private ProtocolEncoderOutputImpl getEncoderOut( IoSession session )
+ {
+ ProtocolEncoderOutputImpl out = ( ProtocolEncoderOutputImpl ) session.getAttribute( ENCODER_OUT );
+ if( out == null )
+ {
+ out = new ProtocolEncoderOutputImpl( session );
+ session.setAttribute( ENCODER_OUT, out );
+ }
+ return out;
+ }
+
+ private ProtocolDecoder getDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder();
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+ private SimpleProtocolDecoderOutput getDecoderOut( IoSession session )
+ {
+ SimpleProtocolDecoderOutput out = ( SimpleProtocolDecoderOutput ) session.getAttribute( DECODER_OUT );
+ if( out == null )
+ {
+ out = new SimpleProtocolDecoderOutput();
+ session.setAttribute( DECODER_OUT, out );
+ }
+ return out;
+ }
+
+ private void disposeEncoder( IoSession session )
+ {
+ session.removeAttribute( ENCODER_OUT );
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ session.removeAttribute( DECODER_OUT );
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+ private static class MessageByteBuffer extends ByteBufferProxy
+ {
+ private final Object message;
+
+ private MessageByteBuffer( ByteBuffer buf, Object message )
+ {
+ super( buf );
+ this.message = message;
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput
+ {
+ private final IoSession session;
+ private NextFilter nextFilter;
+ private WriteRequest writeRequest;
+
+ public ProtocolEncoderOutputImpl( IoSession session )
+ {
+ this.session = session;
+ }
+
+ protected WriteFuture doFlush( ByteBuffer buf )
+ {
+ WriteFuture future;
+ if( writeRequest != null )
+ {
+ future = writeRequest.getFuture();
+ nextFilter.filterWrite(
+ session,
+ new WriteRequest(
+ new MessageByteBuffer(
+ buf, writeRequest.getMessage() ), future ) );
+ }
+ else
+ {
+ future = new WriteFuture();
+ nextFilter.filterWrite( session, new WriteRequest( buf, future ) );
+ }
+ return future;
+ }
+ }
+}
Modified: directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java?rev=350169&r1=350168&r2=350169&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/codec/ProtocolDecoder.java Wed Nov 30 21:17:41 2005
@@ -1,57 +1,57 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-
-/**
- * Decodes binary or protocol-specific data into higher-level message objects.
- * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
- * method with read data, and then the decoder implementation puts decoded
- * messages into {@link ProtocolDecoderOutput} by calling
- * {@link ProtocolDecoderOutput#write(Object)}.
- * <p>
- * Please refer to
- * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/TextLineDecoder.html"><code>TextLineDecoder</code></a>
- * example.
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public interface ProtocolDecoder
-{
- /**
- * Decodes binary or protocol-specific content into higher-level message objects.
- * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
- * method with read data, and then the decoder implementation puts decoded
- * messages into {@link ProtocolDecoderOutput}.
- *
- * @throws Exception if the read data violated protocol specification
- */
- void decode( IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out ) throws Exception;
-
- /**
- * Releases all resources related with this decoder.
- *
- * @throws Exception if failed to dispose all resources
- */
- void dispose( IoSession session ) throws Exception;
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * Decodes binary or protocol-specific data into higher-level message objects.
+ * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
+ * method with read data, and then the decoder implementation puts decoded
+ * messages into {@link ProtocolDecoderOutput} by calling
+ * {@link ProtocolDecoderOutput#write(Object)}.
+ * <p>
+ * Please refer to
+ * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/TextLineDecoder.html"><code>TextLineDecoder</code></a>
+ * example.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ProtocolDecoder
+{
+ /**
+ * Decodes binary or protocol-specific content into higher-level message objects.
+ * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)}
+ * method with read data, and then the decoder implementation puts decoded
+ * messages into {@link ProtocolDecoderOutput}.
+ *
+ * @throws Exception if the read data violated protocol specification
+ */
+ void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception;
+
+ /**
+ * Releases all resources related with this decoder.
+ *
+ * @throws Exception if failed to dispose all resources
+ */
+ void dispose( IoSession session ) throws Exception;
}