You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/11/05 04:38:40 UTC

svn commit: r330965 - in /directory/network: branches/0.8/src/java/org/apache/mina/util/ trunk/src/java/org/apache/mina/filter/ trunk/src/java/org/apache/mina/util/ trunk/src/test/org/apache/mina/filter/

Author: trustin
Date: Fri Nov  4 19:38:32 2005
New Revision: 330965

URL: http://svn.apache.org/viewcvs?rev=330965&view=rev
Log:
Resolved issue: DIRMINA-117 ThreadPoolFilters doesn't stop all worker threads.
* Now thread pool filter manages the list of all workers so that all threads shut down properly.
* Remove ThreadPool interface from trunk
* Added a link to Leader/Followers pattern paper in ThreadPoolFilter


Added:
    directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java   (with props)
Removed:
    directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java
Modified:
    directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java
    directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
    directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java

Modified: directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java?rev=330965&r1=330964&r2=330965&view=diff
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java (original)
+++ directory/network/branches/0.8/src/java/org/apache/mina/util/BaseThreadPool.java Fri Nov  4 19:38:32 2005
@@ -18,8 +18,10 @@
  */
 package org.apache.mina.util;
 
-import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -80,11 +82,12 @@
 
     private final String threadNamePrefix;
     private final Map buffers = new IdentityHashMap();
-    private final Stack followers = new Stack();
     private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
-    private final Set allSessionBuffers = new HashSet();
+    private final Set allSessionBuffers = new IdentityHashSet();
 
     private Worker leader;
+    private final Stack followers = new Stack();
+    private final Set allWorkers = new IdentityHashSet();
 
     private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
     private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
@@ -170,47 +173,49 @@
             return;
 
         shuttingDown = true;
-        Worker lastLeader = null;
-        for( ;; )
+        while( getPoolSize() != 0 )
         {
-            Worker leader = this.leader;
-            if( lastLeader == leader )
-                break;
-
-            while( leader.isAlive() )
+            List allWorkers;
+            synchronized( poolSizeLock )
             {
-                leader.interrupt();
-                try
-                {
-                    // This timeout (100) will help us from 
-                    // infinite lock-up and interrupt workers again.
-                    // (Or we could acquire a monitor for unfetchedSessionBuffers.)
-                    leader.join( 100 );
-                }
-                catch( InterruptedException e )
+                allWorkers = new ArrayList( this.allWorkers );
+            }
+            
+            for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+            {
+                Worker worker = ( Worker ) i.next();
+                while( worker.isAlive() )
                 {
+                    worker.interrupt();
+                    try
+                    {
+                        worker.join(); 
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
                 }
             }
-
-            lastLeader = leader;
         }
 
         started = false;
     }
 
-    private void increasePoolSize()
+    private void increasePoolSize( Worker worker )
     {
         synchronized( poolSizeLock )
         {
             poolSize++;
+            allWorkers.add( worker );
         }
     }
 
-    private void decreasePoolSize()
+    private void decreasePoolSize( Worker worker )
     {
         synchronized( poolSizeLock )
         {
             poolSize--;
+            allWorkers.remove( worker );
         }
     }
 
@@ -320,7 +325,7 @@
             int id = acquireThreadId();
             this.id = id;
             this.setName( threadNamePrefix + '-' + id );
-            increasePoolSize();
+            increasePoolSize( this );
         }
 
         public boolean lead()
@@ -360,7 +365,7 @@
                 releaseBuffer( buf );
             }
 
-            decreasePoolSize();
+            decreasePoolSize( this );
             releaseThreadId( id );
         }
 

Added: directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java?rev=330965&view=auto
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java (added)
+++ directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java Fri Nov  4 19:38:32 2005
@@ -0,0 +1,70 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.AbstractSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An {@link IdentityHashMap}-backed {@link Set}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class IdentityHashSet extends AbstractSet
+{
+    private final Map delegate = new IdentityHashMap();
+
+    public IdentityHashSet()
+    {
+    }
+
+    public int size()
+    {
+        return delegate.size();
+    }
+
+    public boolean contains( Object o )
+    {
+        return delegate.containsKey( o );
+    }
+
+    public Iterator iterator()
+    {
+        return delegate.keySet().iterator();
+    }
+
+    public boolean add( Object arg0 )
+    {
+        return delegate.put( arg0, Boolean.TRUE ) == null;
+    }
+
+    public boolean remove( Object o )
+    {
+        return delegate.remove( o ) != null;
+    }
+
+    public void clear()
+    {
+        delegate.clear();
+    }
+}

Propchange: directory/network/branches/0.8/src/java/org/apache/mina/util/IdentityHashSet.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=330965&r1=330964&r2=330965&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Fri Nov  4 19:38:32 2005
@@ -18,7 +18,10 @@
  */
 package org.apache.mina.filter;
 
+import java.util.ArrayList;
 import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,16 +36,17 @@
 import org.apache.mina.util.IdentityHashSet;
 import org.apache.mina.util.Queue;
 import org.apache.mina.util.Stack;
-import org.apache.mina.util.ThreadPool;
 
 /**
  * A Thread-pooling filter.  This filter forwards {@link IoHandler} events
  * to its thread pool.
+ * <p>
+ * This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
  * 
  * @author The Apache Directory Project (dev@directory.apache.org)
  * @version $Rev$, $Date$
- * 
- * @see ThreadPool
  */
 public class ThreadPoolFilter implements IoFilter
 {
@@ -91,11 +95,12 @@
     private final String threadNamePrefix;
     private final Map parents = new IdentityHashMap(); 
     private final Map buffers = new IdentityHashMap();
-    private final Stack followers = new Stack();
     private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
     private final Set allSessionBuffers = new IdentityHashSet();
 
     private Worker leader;
+    private final Stack followers = new Stack();
+    private final Set allWorkers = new IdentityHashSet();
 
     private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
     private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
@@ -167,19 +172,21 @@
         this.keepAliveTime = keepAliveTime;
     }
 
-    private void increasePoolSize()
+    private void increasePoolSize( Worker worker )
     {
         synchronized( poolSizeLock )
         {
             poolSize++;
+            allWorkers.add( worker );
         }
     }
 
-    private void decreasePoolSize()
+    private void decreasePoolSize( Worker worker )
     {
         synchronized( poolSizeLock )
         {
             poolSize--;
+            allWorkers.remove( worker );
         }
     }
 
@@ -282,7 +289,7 @@
             int id = acquireThreadId();
             this.id = id;
             this.setName( threadNamePrefix + '-' + id );
-            increasePoolSize();
+            increasePoolSize( this );
         }
 
         public boolean lead()
@@ -321,7 +328,7 @@
                 releaseBuffer( buf );
             }
 
-            decreasePoolSize();
+            decreasePoolSize( this );
             releaseThreadId( id );
         }
 
@@ -674,29 +681,32 @@
         }
 
         shuttingDown = true;
-        Worker lastLeader = null;
-        for( ;; )
+        while( getPoolSize() != 0 )
         {
-            Worker leader = this.leader;
-            if( lastLeader == leader )
-                break;
-
-            while( leader.isAlive() )
+            List allWorkers;
+            synchronized( poolSizeLock )
             {
-                leader.interrupt();
-                try
-                {
-                    // This timeout (100) will help us from 
-                    // infinite lock-up and interrupt workers again.
-                    // (Or we could acquire a monitor for unfetchedSessionBuffers.)
-                    leader.join( 100 ); 
-                }
-                catch( InterruptedException e )
+                allWorkers = new ArrayList( this.allWorkers );
+            }
+            
+            for( Iterator i = allWorkers.iterator(); i.hasNext(); )
+            {
+                Worker worker = ( Worker ) i.next();
+                while( worker.isAlive() )
                 {
+                    worker.interrupt();
+                    try
+                    {
+                        // This timeout (100) will help us from 
+                        // infinite lock-up and interrupt workers again.
+                        // (Or we could acquire a monitor for unfetchedSessionBuffers.)
+                        worker.join(); 
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
                 }
             }
-
-            lastLeader = leader;
         }
     }
 }

Modified: directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java?rev=330965&r1=330964&r2=330965&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java Fri Nov  4 19:38:32 2005
@@ -88,6 +88,45 @@
         Assert.assertEquals( 1, filter.getPoolSize() );
     }
     
+    public void testShutdown() throws Exception
+    {
+        final IoSession[] sessions = new IoSession[]
+        {
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+            new DummySession(),
+        };
+        final int end = sessions.length - 1;
+        final NextFilter nextFilter = new DummyNextFilter();
+
+        for( int i = 0; i < 10000; i ++ )
+        {
+            if( i % 100 == 0 )
+            {
+                System.out.println( "Shutdown: " + i );
+            }
+            
+            WriteFuture future = null;
+            for( int j = end; j >= 0; j-- )
+            {
+                future = new WriteFuture();
+                filter.messageReceived( nextFilter, sessions[ j ], future );
+            }
+            
+            future.join();
+            
+            filter.destroy( FILTER_PARENT, null );
+            filter.init( FILTER_PARENT, null );
+        }
+    }
+    
     private static class EventOrderCounter extends BaseIoSession
     {
         private Integer lastCount = null;
@@ -200,6 +239,98 @@
         }
 
         public void sessionCreated( IoSession session )
+        {
+        }
+    }
+    
+    private static class DummySession extends BaseIoSession
+    {
+        protected void updateTrafficMask()
+        {
+        }
+
+        public IoHandler getHandler()
+        {
+            return null;
+        }
+
+        public IoFilterChain getFilterChain()
+        {
+            return null;
+        }
+
+        public WriteFuture write( Object message )
+        {
+            return null;
+        }
+
+        public CloseFuture close()
+        {
+            return null;
+        }
+
+        public TransportType getTransportType()
+        {
+            return null;
+        }
+
+        public boolean isConnected()
+        {
+            return false;
+        }
+
+        public SocketAddress getRemoteAddress()
+        {
+            return null;
+        }
+
+        public SocketAddress getLocalAddress()
+        {
+            return null;
+        }
+
+        public int getScheduledWriteRequests()
+        {
+            return 0;
+        }
+    }
+    
+    private static class DummyNextFilter implements NextFilter
+    {
+        public void sessionCreated( IoSession session )
+        {
+        }
+
+        public void sessionOpened( IoSession session )
+        {
+        }
+
+        public void sessionClosed( IoSession session )
+        {
+        }
+
+        public void sessionIdle( IoSession session, IdleStatus status )
+        {
+        }
+
+        public void exceptionCaught( IoSession session, Throwable cause )
+        {
+        }
+
+        public void messageReceived( IoSession session, Object message )
+        {
+            ( ( WriteFuture ) message ).setWritten( true );
+        }
+
+        public void messageSent( IoSession session, Object message )
+        {
+        }
+
+        public void filterWrite( IoSession session, WriteRequest writeRequest )
+        {
+        }
+
+        public void filterClose( IoSession session, CloseFuture closeFuture )
         {
         }
     }