You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/11/05 04:38:40 UTC
svn commit: r330965 - in /directory/network:
branches/0.8/src/java/org/apache/mina/util/
trunk/src/java/org/apache/mina/filter/ trunk/src/java/org/apache/mina/util/
trunk/src/test/org/apache/mina/filter/
Author: trustin
Date: Fri Nov 4 19:38:32 2005
New Revision: 330965
URL: http://svn.apache.org/viewcvs?rev=330965&view=rev
Log:
Resolved issue: DIRMINA-117 ThreadPoolFilters doesn't stop all worker threads.
* Now thread pool filter manages the list of all workers so that all threads shut down properly.
* Remove ThreadPool interface from trunk
* Added a link to Leader/Followers pattern paper in ThreadPoolFilter
Added:
directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java (with props)
Removed:
directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java
Modified:
directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java
directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
Modified: directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java?rev=330965&r1=330964&r2=330965&view=diff
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java (original)
+++ directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java Fri Nov 4 19:38:32 2005
@@ -18,8 +18,10 @@
*/
package org.apache.mina.util;
-import java.util.HashSet;
+import java.util.ArrayList;
import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -80,11 +82,12 @@
private final String threadNamePrefix;
private final Map buffers = new IdentityHashMap();
- private final Stack followers = new Stack();
private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
- private final Set allSessionBuffers = new HashSet();
+ private final Set allSessionBuffers = new IdentityHashSet();
private Worker leader;
+ private final Stack followers = new Stack();
+ private final Set allWorkers = new IdentityHashSet();
private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
@@ -170,47 +173,49 @@
return;
shuttingDown = true;
- Worker lastLeader = null;
- for( ;; )
+ while( getPoolSize() != 0 )
{
- Worker leader = this.leader;
- if( lastLeader == leader )
- break;
-
- while( leader.isAlive() )
+ List allWorkers;
+ synchronized( poolSizeLock )
{
- leader.interrupt();
- try
- {
- // This timeout (100) will help us from
- // infinite lock-up and interrupt workers again.
- // (Or we could acquire a monitor for unfetchedSessionBuffers.)
- leader.join( 100 );
- }
- catch( InterruptedException e )
+ allWorkers = new ArrayList( this.allWorkers );
+ }
+
+ for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+ {
+ Worker worker = ( Worker ) i.next();
+ while( worker.isAlive() )
{
+ worker.interrupt();
+ try
+ {
+ worker.join();
+ }
+ catch( InterruptedException e )
+ {
+ }
}
}
-
- lastLeader = leader;
}
started = false;
}
- private void increasePoolSize()
+ private void increasePoolSize( Worker worker )
{
synchronized( poolSizeLock )
{
poolSize++;
+ allWorkers.add( worker );
}
}
- private void decreasePoolSize()
+ private void decreasePoolSize( Worker worker )
{
synchronized( poolSizeLock )
{
poolSize--;
+ allWorkers.remove( worker );
}
}
@@ -320,7 +325,7 @@
int id = acquireThreadId();
this.id = id;
this.setName( threadNamePrefix + '-' + id );
- increasePoolSize();
+ increasePoolSize( this );
}
public boolean lead()
@@ -360,7 +365,7 @@
releaseBuffer( buf );
}
- decreasePoolSize();
+ decreasePoolSize( this );
releaseThreadId( id );
}
Added: directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java?rev=330965&view=auto
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java (added)
+++ directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java Fri Nov 4 19:38:32 2005
@@ -0,0 +1,70 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.AbstractSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An {@link IdentityHashMap}-backed {@link Set}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class IdentityHashSet extends AbstractSet
+{
+ private final Map delegate = new IdentityHashMap();
+
+ public IdentityHashSet()
+ {
+ }
+
+ public int size()
+ {
+ return delegate.size();
+ }
+
+ public boolean contains( Object o )
+ {
+ return delegate.containsKey( o );
+ }
+
+ public Iterator iterator()
+ {
+ return delegate.keySet().iterator();
+ }
+
+ public boolean add( Object arg0 )
+ {
+ return delegate.put( arg0, Boolean.TRUE ) == null;
+ }
+
+ public boolean remove( Object o )
+ {
+ return delegate.remove( o ) != null;
+ }
+
+ public void clear()
+ {
+ delegate.clear();
+ }
+}
Propchange: directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=330965&r1=330964&r2=330965&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Fri Nov 4 19:38:32 2005
@@ -18,7 +18,10 @@
*/
package org.apache.mina.filter;
+import java.util.ArrayList;
import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,16 +36,17 @@
import org.apache.mina.util.IdentityHashSet;
import org.apache.mina.util.Queue;
import org.apache.mina.util.Stack;
-import org.apache.mina.util.ThreadPool;
/**
* A Thread-pooling filter. This filter forwards {@link IoHandler} events
* to its thread pool.
+ * <p>
+ * This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
*
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
- *
- * @see ThreadPool
*/
public class ThreadPoolFilter implements IoFilter
{
@@ -91,11 +95,12 @@
private final String threadNamePrefix;
private final Map parents = new IdentityHashMap();
private final Map buffers = new IdentityHashMap();
- private final Stack followers = new Stack();
private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
private final Set allSessionBuffers = new IdentityHashSet();
private Worker leader;
+ private final Stack followers = new Stack();
+ private final Set allWorkers = new IdentityHashSet();
private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
@@ -167,19 +172,21 @@
this.keepAliveTime = keepAliveTime;
}
- private void increasePoolSize()
+ private void increasePoolSize( Worker worker )
{
synchronized( poolSizeLock )
{
poolSize++;
+ allWorkers.add( worker );
}
}
- private void decreasePoolSize()
+ private void decreasePoolSize( Worker worker )
{
synchronized( poolSizeLock )
{
poolSize--;
+ allWorkers.remove( worker );
}
}
@@ -282,7 +289,7 @@
int id = acquireThreadId();
this.id = id;
this.setName( threadNamePrefix + '-' + id );
- increasePoolSize();
+ increasePoolSize( this );
}
public boolean lead()
@@ -321,7 +328,7 @@
releaseBuffer( buf );
}
- decreasePoolSize();
+ decreasePoolSize( this );
releaseThreadId( id );
}
@@ -674,29 +681,32 @@
}
shuttingDown = true;
- Worker lastLeader = null;
- for( ;; )
+ while( getPoolSize() != 0 )
{
- Worker leader = this.leader;
- if( lastLeader == leader )
- break;
-
- while( leader.isAlive() )
+ List allWorkers;
+ synchronized( poolSizeLock )
{
- leader.interrupt();
- try
- {
- // This timeout (100) will help us from
- // infinite lock-up and interrupt workers again.
- // (Or we could acquire a monitor for unfetchedSessionBuffers.)
- leader.join( 100 );
- }
- catch( InterruptedException e )
+ allWorkers = new ArrayList( this.allWorkers );
+ }
+
+ for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+ {
+ Worker worker = ( Worker ) i.next();
+ while( worker.isAlive() )
{
+ worker.interrupt();
+ try
+ {
+ // This timeout (100) will help us from
+ // infinite lock-up and interrupt workers again.
+ // (Or we could acquire a monitor for unfetchedSessionBuffers.)
+ worker.join();
+ }
+ catch( InterruptedException e )
+ {
+ }
}
}
-
- lastLeader = leader;
}
}
}
Modified: directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java?rev=330965&r1=330964&r2=330965&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java Fri Nov 4 19:38:32 2005
@@ -88,6 +88,45 @@
Assert.assertEquals( 1, filter.getPoolSize() );
}
+ public void testShutdown() throws Exception
+ {
+ final IoSession[] sessions = new IoSession[]
+ {
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ new DummySession(),
+ };
+ final int end = sessions.length - 1;
+ final NextFilter nextFilter = new DummyNextFilter();
+
+ for( int i = 0; i < 10000; i ++ )
+ {
+ if( i % 100 == 0 )
+ {
+ System.out.println( "Shutdown: " + i );
+ }
+
+ WriteFuture future = null;
+ for( int j = end; j >= 0; j-- )
+ {
+ future = new WriteFuture();
+ filter.messageReceived( nextFilter, sessions[ j ], future );
+ }
+
+ future.join();
+
+ filter.destroy( FILTER_PARENT, null );
+ filter.init( FILTER_PARENT, null );
+ }
+ }
+
private static class EventOrderCounter extends BaseIoSession
{
private Integer lastCount = null;
@@ -200,6 +239,98 @@
}
public void sessionCreated( IoSession session )
+ {
+ }
+ }
+
+ private static class DummySession extends BaseIoSession
+ {
+ protected void updateTrafficMask()
+ {
+ }
+
+ public IoHandler getHandler()
+ {
+ return null;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null;
+ }
+
+ public WriteFuture write( Object message )
+ {
+ return null;
+ }
+
+ public CloseFuture close()
+ {
+ return null;
+ }
+
+ public TransportType getTransportType()
+ {
+ return null;
+ }
+
+ public boolean isConnected()
+ {
+ return false;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null;
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0;
+ }
+ }
+
+ private static class DummyNextFilter implements NextFilter
+ {
+ public void sessionCreated( IoSession session )
+ {
+ }
+
+ public void sessionOpened( IoSession session )
+ {
+ }
+
+ public void sessionClosed( IoSession session )
+ {
+ }
+
+ public void sessionIdle( IoSession session, IdleStatus status )
+ {
+ }
+
+ public void exceptionCaught( IoSession session, Throwable cause )
+ {
+ }
+
+ public void messageReceived( IoSession session, Object message )
+ {
+ ( ( WriteFuture ) message ).setWritten( true );
+ }
+
+ public void messageSent( IoSession session, Object message )
+ {
+ }
+
+ public void filterWrite( IoSession session, WriteRequest writeRequest )
+ {
+ }
+
+ public void filterClose( IoSession session, CloseFuture closeFuture )
{
}
}