You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/05/14 19:46:57 UTC

svn commit: r406384 - in /directory/trunks/mina: ./ core/src/main/java/org/apache/mina/filter/ java5/ java5/src/ java5/src/main/ java5/src/main/java/ java5/src/main/java/org/ java5/src/main/java/org/apache/ java5/src/main/java/org/apache/mina/ java5/sr...

Author: proyal
Date: Sun May 14 10:46:56 2006
New Revision: 406384

URL: http://svn.apache.org/viewcvs?rev=406384&view=rev
Log:
DIRMINA-184 - Alternate ThreadPoolFilter that uses a pluggable thead pool. A Leader-Follower implementation, as well a java5 based pool that uses an ExecutorService

Added:
    directory/trunks/mina/core/src/main/java/org/apache/mina/filter/LeaderFollowerThreadPool.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPool.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPoolThreadPoolFilter.java
    directory/trunks/mina/java5/
    directory/trunks/mina/java5/LICENSE.txt
    directory/trunks/mina/java5/NOTICE.txt
    directory/trunks/mina/java5/pom.xml
    directory/trunks/mina/java5/src/
    directory/trunks/mina/java5/src/main/
    directory/trunks/mina/java5/src/main/java/
    directory/trunks/mina/java5/src/main/java/org/
    directory/trunks/mina/java5/src/main/java/org/apache/
    directory/trunks/mina/java5/src/main/java/org/apache/mina/
    directory/trunks/mina/java5/src/main/java/org/apache/mina/filter/
    directory/trunks/mina/java5/src/main/java/org/apache/mina/filter/ExecutorThreadPool.java
Modified:
    directory/trunks/mina/pom.xml
    directory/trunks/mina/release.xml

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/filter/LeaderFollowerThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/filter/LeaderFollowerThreadPool.java?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/filter/LeaderFollowerThreadPool.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/filter/LeaderFollowerThreadPool.java Sun May 14 10:46:56 2006
@@ -0,0 +1,479 @@
+/*
+ *   @(#) $Id:  $
+ *
+ *   Copyright 2006 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import org.apache.mina.util.BlockingQueue;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.Stack;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoFilter;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 2005) $
+ */
+public class LeaderFollowerThreadPool implements ThreadPool
+{
+    /**
+     * Default maximum size of thread pool (16).
+     */
+    public static final int DEFAULT_MAXIMUM_POOL_SIZE = 16;
+
+    /**
+     * Default keep-alive time of thread pool (1 min).
+     */
+    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+    /**
+     * A queue which contains {@link Integer}s which represents reusable
+     * thread IDs.  {@link LeaderFollowerThreadPool.Worker} first checks this queue and then
+     * uses {@link #threadId} when no reusable thread ID is available.
+     */
+    private static final Queue threadIdReuseQueue = new Queue();
+    private static int threadId = 0;
+
+    private static int acquireThreadId()
+    {
+        synchronized( threadIdReuseQueue )
+        {
+            Integer id = ( Integer ) threadIdReuseQueue.pop();
+            if( id == null )
+            {
+                return ++ threadId;
+            }
+            else
+            {
+                return id.intValue();
+            }
+        }
+    }
+
+    private static void releaseThreadId( int id )
+    {
+        synchronized( threadIdReuseQueue )
+        {
+            threadIdReuseQueue.push( new Integer( id ) );
+        }
+    }
+
+    private String threadNamePrefix;
+    private final BlockingQueue unfetchedRunnables = new BlockingQueue();
+
+    private Worker leader;
+    private final Stack followers = new Stack();
+    private final Set allWorkers = new IdentityHashSet();
+
+    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+    private boolean shuttingDown;
+
+    private int poolSize;
+    private final Object poolSizeLock = new Object();
+
+    /**
+     * Creates a new instance of this filter with default thread pool settings.
+     */
+    public LeaderFollowerThreadPool()
+    {
+        this( "LeaderFollowerThreadPool" );
+    }
+
+    /**
+     * Creates a new instance of this filter with the specified thread name prefix
+     * and other default settings.
+     * @param threadNamePrefix the prefix of the thread names this pool will create.
+     */
+    public LeaderFollowerThreadPool( String threadNamePrefix )
+    {
+        this( threadNamePrefix, DEFAULT_MAXIMUM_POOL_SIZE );
+    }
+
+    /**
+     * Creates a new instance of this filter with the specified thread name prefix
+     * and other default settings.
+     * @param threadNamePrefix the prefix of the thread names this pool will create.
+     * @param maximumPoolSize Maximum size of thread pool
+     */
+    public LeaderFollowerThreadPool( String threadNamePrefix, int maximumPoolSize )
+    {
+        setThreadNamePrefix( threadNamePrefix );
+        setMaximumPoolSize( maximumPoolSize );
+    }
+
+    public String getThreadNamePrefix()
+    {
+        return threadNamePrefix;
+    }
+
+    public void setThreadNamePrefix( String threadNamePrefix )
+    {
+        if( threadNamePrefix == null )
+        {
+            throw new NullPointerException( "threadNamePrefix" );
+        }
+        threadNamePrefix = threadNamePrefix.trim();
+        if( threadNamePrefix.length() == 0 )
+        {
+            throw new IllegalArgumentException( "threadNamePrefix is empty." );
+        }
+        this.threadNamePrefix = threadNamePrefix;
+
+        synchronized( poolSizeLock )
+        {
+            for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+            {
+                ( ( Worker ) i.next() ).updateName();
+            }
+        }
+    }
+
+    public int getPoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            return poolSize;
+        }
+    }
+
+    public int getMaximumPoolSize()
+    {
+        return maximumPoolSize;
+    }
+
+    public int getKeepAliveTime()
+    {
+        return keepAliveTime;
+    }
+
+    public void setMaximumPoolSize( int maximumPoolSize )
+    {
+        if( maximumPoolSize <= 0 )
+            throw new IllegalArgumentException();
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public void setKeepAliveTime( int keepAliveTime )
+    {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    //TODO this should be in the filter, inits on pre-add if we have not been init'ed
+    public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter )
+        throws Exception
+    {
+        if( leader == null )
+        {
+            init();
+        }
+    }
+
+    public void init()
+    {
+        shuttingDown = false;
+        leader = new Worker();
+        leader.start();
+        leader.lead();
+    }
+
+    public void destroy()
+    {
+        shuttingDown = true;
+        int expectedPoolSize = 0;
+        while( getPoolSize() != expectedPoolSize )
+        {
+            List allWorkers;
+            synchronized( poolSizeLock )
+            {
+                allWorkers = new ArrayList( this.allWorkers );
+            }
+
+            // You may not interrupt the current thread.
+            if( allWorkers.remove( Thread.currentThread() ) )
+            {
+                expectedPoolSize = 1;
+            }
+
+            for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+            {
+                Worker worker = ( Worker ) i.next();
+                while( worker.isAlive() )
+                {
+                    worker.interrupt();
+                    try
+                    {
+                        // This timeout will help us from
+                        // infinite lock-up and interrupt workers again.
+                        worker.join( 100 );
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+                }
+            }
+        }
+
+        this.unfetchedRunnables.clear();
+        this.followers.clear();
+        this.leader = null;
+    }
+
+    private void increasePoolSize( Worker worker )
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize++;
+            allWorkers.add( worker );
+        }
+    }
+
+    private void decreasePoolSize( Worker worker )
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize--;
+            allWorkers.remove( worker );
+        }
+    }
+
+    /**
+     * Implement this method to fetch (or pop) a {@link Runnable} from
+     * the given <tt>unfetchedRunnables</tt>.  The default implementation
+     * simply pops the Runnable from it.  You could prioritize the fetch order.
+     *
+     * @return A non-null {@link Runnable}
+     */
+    protected Runnable fetchRunnable( Queue unfetchedSessionBuffers )
+    {
+        return ( Runnable ) unfetchedSessionBuffers.pop();
+    }
+
+    public void submit( Runnable runnable )
+    {
+        synchronized( unfetchedRunnables )
+        {
+            unfetchedRunnables.add( runnable );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        private final int id;
+        private final Object promotionLock = new Object();
+        private boolean dead;
+
+        private Worker()
+        {
+            int id = acquireThreadId();
+            this.id = id;
+            updateName();
+            increasePoolSize( this );
+            setDaemon( true );
+        }
+
+        public void updateName()
+        {
+            this.setName( threadNamePrefix + '-' + id );
+        }
+
+        public boolean lead()
+        {
+            final Object promotionLock = this.promotionLock;
+            synchronized( promotionLock )
+            {
+                if( dead )
+                {
+                    return false;
+                }
+
+                leader = this;
+                promotionLock.notify();
+            }
+
+            return true;
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                if( !waitForPromotion() )
+                    break;
+
+                Runnable runnable = fetchRunnable();
+                giveUpLead();
+                if( runnable == null )
+                {
+                    break;
+                }
+
+                runnable.run();
+                // previously, follow() occured between the two bits here, dunno how much of a
+                //difference that makes
+                follow();
+            }
+
+            decreasePoolSize( this );
+            releaseThreadId( id );
+        }
+
+        private Runnable fetchRunnable()
+        {
+            BlockingQueue unfetchedRunnables = LeaderFollowerThreadPool.this.unfetchedRunnables;
+
+            synchronized( unfetchedRunnables )
+            {
+                while( !shuttingDown )
+                {
+                    try
+                    {
+                        unfetchedRunnables.waitForNewItem();
+                    }
+                    catch( InterruptedException e )
+                    {
+                        continue;
+                    }
+
+                    return LeaderFollowerThreadPool.this.fetchRunnable( unfetchedRunnables );
+                }
+            }
+
+            return null;
+        }
+
+
+        private void follow()
+        {
+            final Object promotionLock = this.promotionLock;
+            final Stack followers = LeaderFollowerThreadPool.this.followers;
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    synchronized( followers )
+                    {
+                        followers.push( this );
+                    }
+                }
+            }
+        }
+
+
+        private boolean waitForPromotion()
+        {
+            final Object promotionLock = this.promotionLock;
+
+            long startTime = System.currentTimeMillis();
+            long currentTime = System.currentTimeMillis();
+
+            synchronized( promotionLock )
+            {
+                while( this != leader && !shuttingDown )
+                {
+                    // Calculate remaining keep-alive time
+                    int keepAliveTime = getKeepAliveTime();
+                    if( keepAliveTime > 0 )
+                    {
+                        keepAliveTime -= ( currentTime - startTime );
+                    }
+                    else
+                    {
+                        keepAliveTime = Integer.MAX_VALUE;
+                    }
+
+                    // Break the loop if there's no remaining keep-alive time.
+                    if( keepAliveTime <= 0 )
+                    {
+                        break;
+                    }
+
+                    // Wait for promotion
+                    try
+                    {
+                        promotionLock.wait( keepAliveTime );
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+
+                    // Update currentTime for the next iteration
+                    currentTime = System.currentTimeMillis();
+                }
+
+                boolean timeToLead = this == leader && !shuttingDown;
+
+                if( !timeToLead )
+                {
+                    // time to die
+                    synchronized( followers )
+                    {
+                        followers.remove( this );
+                    }
+
+                    // Mark as dead explicitly when we've got promotionLock.
+                    dead = true;
+                }
+
+                return timeToLead;
+            }
+        }
+
+        private void giveUpLead()
+        {
+            final Stack followers = LeaderFollowerThreadPool.this.followers;
+            LeaderFollowerThreadPool.Worker worker;
+            do
+            {
+                synchronized( followers )
+                {
+                    worker = ( LeaderFollowerThreadPool.Worker ) followers.pop();
+                }
+
+                if( worker == null )
+                {
+                    // Increase the number of threads if we
+                    // are not shutting down and we can increase the number.
+                    if( !shuttingDown
+                        && getPoolSize() < getMaximumPoolSize() )
+                    {
+                        worker = new LeaderFollowerThreadPool.Worker();
+                        worker.lead();
+                        worker.start();
+                    }
+
+                    // This loop should end because:
+                    // 1) lead() is called already,
+                    // 2) or it is shutting down and there's no more threads left.
+                    break;
+                }
+            }
+            while( !worker.lead() );
+        }
+    }
+}

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPool.java?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPool.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPool.java Sun May 14 10:46:56 2006
@@ -0,0 +1,45 @@
+/*
+ *   @(#) $Id:  $
+ *
+ *   Copyright 2006 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+/**
+ * An interface for a pool of threads, capable of handling units of work that are <code>Runnable</code>.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 397284 $, $Date: 2006-04-26 20:09:11Z $
+ */
+public interface ThreadPool
+{
+    /**
+     * Submit a <code>Runnable</code> to this thread pool.
+     *
+     * @param runnable <code>Runnable</code> to submit to this pool
+     */
+    void submit( Runnable runnable );
+
+    /**
+     * Initialize this pool
+     */
+    void init();
+
+    /**
+     * Destroy this pool.
+     */
+    void destroy();
+}

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPoolThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPoolThreadPoolFilter.java?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPoolThreadPoolFilter.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ThreadPoolThreadPoolFilter.java Sun May 14 10:46:56 2006
@@ -0,0 +1,266 @@
+package org.apache.mina.filter;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Thread-pooling filter.  This filter forwards {@link IoHandler} events to its thread pool.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 2005) $
+ */
+public class ThreadPoolThreadPoolFilter extends IoFilterAdapter
+{
+	private static final Logger logger = LoggerFactory.getLogger( ThreadPoolThreadPoolFilter.class.getName() );
+	private final ThreadPool threadPool;
+
+	public ThreadPoolThreadPoolFilter( ThreadPool threadPool )
+	{
+		this.threadPool = threadPool;
+	}
+
+	public void init() throws Exception
+	{
+		threadPool.init();
+	}
+
+	public void destroy()
+	{
+		threadPool.destroy();
+	}
+
+	private void fireEvent( NextFilter nextFilter, IoSession session,
+							EventType type, Object data )
+	{
+		Event event = new Event( type, nextFilter, data );
+		SessionBuffer buf = SessionBuffer.getSessionBuffer( session );
+
+		synchronized( buf.eventQueue )
+		{
+			buf.eventQueue.add( event );
+			if( buf.processingCompleted )
+			{
+				buf.processingCompleted = false;
+				if ( logger.isDebugEnabled() ) {
+					logger.debug( "Launching thread for " + session.getRemoteAddress() );
+				}
+
+                threadPool.submit( new ProcessEventsRunnable( buf ) );
+			}
+		}
+	}
+
+	private static class SessionBuffer
+	{
+		private static final String KEY = SessionBuffer.class.getName() + ".KEY";
+
+		private static SessionBuffer getSessionBuffer( IoSession session )
+		{
+			synchronized( session )
+			{
+				SessionBuffer buf = (SessionBuffer)session.getAttribute( KEY );
+				if( buf == null )
+				{
+					buf = new SessionBuffer( session );
+					session.setAttribute( KEY, buf );
+				}
+				return buf;
+			}
+		}
+
+		private final IoSession session;
+		private final List eventQueue = new ArrayList();
+		private boolean processingCompleted = true;
+
+		private SessionBuffer( IoSession session )
+		{
+			this.session = session;
+		}
+	}
+
+	protected static class EventType
+	{
+		public static final EventType OPENED = new EventType( "OPENED" );
+
+		public static final EventType CLOSED = new EventType( "CLOSED" );
+
+		public static final EventType READ = new EventType( "READ" );
+
+		public static final EventType WRITTEN = new EventType( "WRITTEN" );
+
+		public static final EventType RECEIVED = new EventType( "RECEIVED" );
+
+		public static final EventType SENT = new EventType( "SENT" );
+
+		public static final EventType IDLE = new EventType( "IDLE" );
+
+		public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+		private final String value;
+
+		private EventType( String value )
+		{
+			this.value = value;
+		}
+
+		public String toString()
+		{
+			return value;
+		}
+	}
+
+	protected static class Event
+	{
+		private final EventType type;
+		private final NextFilter nextFilter;
+		private final Object data;
+
+		Event( EventType type, NextFilter nextFilter, Object data )
+		{
+			this.type = type;
+			this.nextFilter = nextFilter;
+			this.data = data;
+		}
+
+		public Object getData()
+		{
+			return data;
+		}
+
+		public NextFilter getNextFilter()
+		{
+			return nextFilter;
+		}
+
+		public EventType getType()
+		{
+			return type;
+		}
+	}
+
+	public void sessionCreated( NextFilter nextFilter, IoSession session )
+	{
+		nextFilter.sessionCreated( session );
+	}
+
+	public void sessionOpened( NextFilter nextFilter,
+							   IoSession session )
+	{
+		fireEvent( nextFilter, session, EventType.OPENED, null );
+	}
+
+	public void sessionClosed( NextFilter nextFilter,
+							   IoSession session )
+	{
+		fireEvent( nextFilter, session, EventType.CLOSED, null );
+	}
+
+	public void sessionIdle( NextFilter nextFilter,
+							 IoSession session, IdleStatus status )
+	{
+		fireEvent( nextFilter, session, EventType.IDLE, status );
+	}
+
+	public void exceptionCaught( NextFilter nextFilter,
+								 IoSession session, Throwable cause )
+	{
+		fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
+	}
+
+	public void messageReceived( NextFilter nextFilter,
+								 IoSession session, Object message )
+	{
+		ByteBufferUtil.acquireIfPossible( message );
+		fireEvent( nextFilter, session, EventType.RECEIVED, message );
+	}
+
+	public void messageSent( NextFilter nextFilter,
+							 IoSession session, Object message )
+	{
+		ByteBufferUtil.acquireIfPossible( message );
+		fireEvent( nextFilter, session, EventType.SENT, message );
+	}
+
+	protected void processEvent( NextFilter nextFilter, IoSession session, EventType type, Object data )
+	{
+		if( type == EventType.RECEIVED )
+		{
+			nextFilter.messageReceived( session, data );
+			ByteBufferUtil.releaseIfPossible( data );
+		}
+		else if( type == EventType.SENT )
+		{
+			nextFilter.messageSent( session, data );
+			ByteBufferUtil.releaseIfPossible( data );
+		}
+		else if( type == EventType.EXCEPTION )
+		{
+			nextFilter.exceptionCaught( session, (Throwable)data );
+		}
+		else if( type == EventType.IDLE )
+		{
+			nextFilter.sessionIdle( session, (IdleStatus)data );
+		}
+		else if( type == EventType.OPENED )
+		{
+			nextFilter.sessionOpened( session );
+		}
+		else if( type == EventType.CLOSED )
+		{
+			nextFilter.sessionClosed( session );
+		}
+	}
+
+	public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
+	{
+		nextFilter.filterWrite( session, writeRequest );
+	}
+
+	public void filterClose( NextFilter nextFilter, IoSession session ) throws Exception
+	{
+		nextFilter.filterClose( session );
+	}
+
+	private class ProcessEventsRunnable implements Runnable
+	{
+		private final SessionBuffer buffer;
+
+		ProcessEventsRunnable( SessionBuffer buffer )
+		{
+			this.buffer = buffer;
+		}
+
+		public void run()
+		{
+			while( true )
+			{
+				Event event;
+
+				synchronized( buffer.eventQueue )
+				{
+					if( buffer.eventQueue.isEmpty() )
+					{
+						buffer.processingCompleted = true;
+						break;
+					}
+
+					event = ( Event ) buffer.eventQueue.remove( 0 );
+				}
+
+				processEvent( event.getNextFilter(), buffer.session, event.getType(), event.getData() );
+			}
+
+			if ( logger.isDebugEnabled() ) {
+				logger.debug( "Exiting since queue is empty for " + buffer.session.getRemoteAddress() );
+			}
+		}
+	}
+}

Added: directory/trunks/mina/java5/LICENSE.txt
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/java5/LICENSE.txt?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/java5/LICENSE.txt (added)
+++ directory/trunks/mina/java5/LICENSE.txt Sun May 14 10:46:56 2006
@@ -0,0 +1,20 @@
+/*
+ *                           Apache License
+ *                      Version 2.0, January 2004
+ *                   http://www.apache.org/licenses/
+ *
+ *   Copyright ${pom.inceptionYear} Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */

Added: directory/trunks/mina/java5/NOTICE.txt
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/java5/NOTICE.txt?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/java5/NOTICE.txt (added)
+++ directory/trunks/mina/java5/NOTICE.txt Sun May 14 10:46:56 2006
@@ -0,0 +1,2 @@
+This product includes software developed by
+The Apache Software Foundation (http://www.apache.org/).

Added: directory/trunks/mina/java5/pom.xml
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/java5/pom.xml?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/java5/pom.xml (added)
+++ directory/trunks/mina/java5/pom.xml Sun May 14 10:46:56 2006
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<project>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.directory.mina</groupId>
+    <artifactId>build</artifactId>
+    <version>0.9.4-SNAPSHOT</version>
+  </parent>
+  <artifactId>mina-java5</artifactId>
+  <name>Apache MINA Java5 Extensions</name>
+  <packaging>jar</packaging>  
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.directory.mina</groupId>
+      <artifactId>mina-core</artifactId>
+      <version>${pom.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.0</version>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.4</source>
+          <target>1.4</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
+

Added: directory/trunks/mina/java5/src/main/java/org/apache/mina/filter/ExecutorThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/java5/src/main/java/org/apache/mina/filter/ExecutorThreadPool.java?rev=406384&view=auto
==============================================================================
--- directory/trunks/mina/java5/src/main/java/org/apache/mina/filter/ExecutorThreadPool.java (added)
+++ directory/trunks/mina/java5/src/main/java/org/apache/mina/filter/ExecutorThreadPool.java Sun May 14 10:46:56 2006
@@ -0,0 +1,50 @@
+/*
+ *   @(#) $Id:  $
+ *
+ *   Copyright 2006 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import java.util.concurrent.Executor;
+
+/**
+ * ThreadPool implementation that hands excecution off to a Executor
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 400068 $, $Date: 2006-05-05 12:56:58Z $
+ */
+public class ExecutorThreadPool implements ThreadPool
+{
+    private final Executor executor;
+
+    public ExecutorThreadPool( Executor executor )
+    {
+        this.executor = executor;
+    }
+
+    public void init()
+    {
+    }
+
+    public void destroy()
+    {
+    }
+
+    public void submit( Runnable runnable )
+    {
+        executor.execute( runnable );
+    }
+}

Modified: directory/trunks/mina/pom.xml
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/pom.xml?rev=406384&r1=406383&r2=406384&view=diff
==============================================================================
--- directory/trunks/mina/pom.xml (original)
+++ directory/trunks/mina/pom.xml Sun May 14 10:46:56 2006
@@ -162,6 +162,7 @@
       </activation>
       <modules>
         <module>core</module>
+        <module>java5</module>
         <module>filter-ssl</module>
         <module>filter-codec-asn1</module>
         <module>filter-codec-netty</module>
@@ -198,6 +199,7 @@
 
 Build with 1.4 jvm will not include modules:
  
+ o mina-java5
  o mina-filter-ssl
  o mina-integration-spring
  o mina-examples

Modified: directory/trunks/mina/release.xml
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/release.xml?rev=406384&r1=406383&r2=406384&view=diff
==============================================================================
--- directory/trunks/mina/release.xml (original)
+++ directory/trunks/mina/release.xml Sun May 14 10:46:56 2006
@@ -59,7 +59,14 @@
       <includes>
         <include>*.jar</include>
       </includes>
-    </fileSet>
+    </fileSet>     
+	<fileSet>
+      <directory>java5/target</directory>
+      <outputDirectory></outputDirectory>
+      <includes>
+        <include>*.jar</include>
+      </includes>
+    </fileSet> 
     <fileSet>
       <directory>integration-spring/target</directory>
       <outputDirectory></outputDirectory>