You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/10/17 16:55:03 UTC

svn commit: r325903 - in /directory/network: branches/0.7/src/java/org/apache/mina/util/ trunk/src/java/org/apache/mina/filter/ trunk/src/java/org/apache/mina/util/

Author: trustin
Date: Mon Oct 17 07:54:54 2005
New Revision: 325903

URL: http://svn.apache.org/viewcvs?rev=325903&view=rev
Log:
Resolved issue: DIRMINA-100 - Prioritized SessionBuffer fetching in ThreadPoolFilter
* Replaced BlockingSet with BlockingQueue because unfetchedSessionBuffers works more efficiently with Queue
* Removed unused classes
* Added ThreadPoolFilter.fetchSessionBuffer() to allow users to prioritize buffer fetching


Added:
    directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java   (with props)
    directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java   (with props)
Removed:
    directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java
    directory/network/trunk/src/java/org/apache/mina/util/BlockingSet.java
    directory/network/trunk/src/java/org/apache/mina/util/Event.java
    directory/network/trunk/src/java/org/apache/mina/util/EventType.java
Modified:
    directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java
    directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java

Modified: directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java?rev=325903&r1=325902&r2=325903&view=diff
==============================================================================
--- directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java (original)
+++ directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java Mon Oct 17 07:54:54 2005
@@ -20,7 +20,6 @@
 
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -82,7 +81,7 @@
     private final String threadNamePrefix;
     private final Map buffers = new IdentityHashMap();
     private final Stack followers = new Stack();
-    private final BlockingSet unfetchedSessionBuffers = new BlockingSet();
+    private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
     private final Set allSessionBuffers = new HashSet();
 
     private Worker leader;
@@ -215,7 +214,7 @@
     protected void fireEvent( Object nextFilter, Session session,
                               EventType type, Object data )
     {
-        final BlockingSet unfetchedSessionBuffers = this.unfetchedSessionBuffers;
+        final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
         final Set allSessionBuffers = this.allSessionBuffers;
         final Event event = new Event( type, nextFilter, data );
 
@@ -232,7 +231,7 @@
             if( !allSessionBuffers.contains( buf ) )
             {
                 allSessionBuffers.add( buf );
-                unfetchedSessionBuffers.add( buf );
+                unfetchedSessionBuffers.push( buf );
             }
         }
     }
@@ -243,6 +242,19 @@
     protected abstract void processEvent( Object nextFilter, Session session,
                                           EventType type, Object data );
 
+    
+    /**
+     * Implement this method to fetch (or pop) a {@link SessionBuffer} from
+     * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
+     * simply pops the buffer from it.  You could prioritize the fetch order.
+     * 
+     * @return A non-null {@link SessionBuffer}
+     */
+    protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
+    {
+        return ( SessionBuffer ) unfetchedSessionBuffers.pop();
+    }
+
     private SessionBuffer getSessionBuffer( Session session )
     {
         final Map buffers = this.buffers;
@@ -272,7 +284,7 @@
         }
     }
 
-    private static class SessionBuffer
+    protected static class SessionBuffer
     {
         private final Session session;
 
@@ -341,8 +353,7 @@
 
         private SessionBuffer fetchBuffer()
         {
-            SessionBuffer buf;
-            BlockingSet unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
+            BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
             synchronized( unfetchedSessionBuffers )
             {
                 for( ;; )
@@ -363,13 +374,7 @@
                         }
                     }
 
-                    Iterator it = unfetchedSessionBuffers.iterator();
-                    while( it.hasNext() )
-                    {
-                        buf = ( SessionBuffer ) it.next();
-                        it.remove();
-                        return buf;
-                    }
+                    return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
                 }
             }
         }
@@ -410,7 +415,7 @@
 
         private void releaseBuffer( SessionBuffer buf )
         {
-            final BlockingSet unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
+            final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
             final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
             final Queue eventQueue = buf.eventQueue;
 
@@ -423,7 +428,7 @@
                 }
                 else
                 {
-                    unfetchedSessionBuffers.add( buf );
+                    unfetchedSessionBuffers.push( buf );
                 }
             }
         }

Added: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java?rev=325903&view=auto
==============================================================================
--- directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java (added)
+++ directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java Mon Oct 17 07:54:54 2005
@@ -0,0 +1,109 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A synchronized version of {@link Queue}.
+ *
+ * @author Trustin Lee
+ * @version $Rev$, $Date$
+ */
+public class BlockingQueue extends Queue
+{
+    private static final long serialVersionUID = 5516588196355725567L;
+
+    private int waiters = 0;
+
+    public BlockingQueue()
+    {
+    }
+
+    public synchronized int capacity()
+    {
+        return super.capacity();
+    }
+
+    public synchronized void clear()
+    {
+        super.clear();
+    }
+
+    public synchronized Object first()
+    {
+        return super.first();
+    }
+
+    public synchronized Object get( int idx )
+    {
+        return super.get( idx );
+    }
+
+    public synchronized boolean isEmpty()
+    {
+        return super.isEmpty();
+    }
+
+    public synchronized Object last()
+    {
+        return super.last();
+    }
+
+    public synchronized Object pop()
+    {
+        return super.pop();
+    }
+
+    public synchronized void push( Object obj )
+    {
+        super.push( obj );
+        if( waiters > 0 )
+            notify();
+    }
+
+    public synchronized int size()
+    {
+        return super.size();
+    }
+
+    public synchronized String toString()
+    {
+        return super.toString();
+    }
+
+    /**
+     * Waits until any elements are in this queue.
+     * 
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public synchronized void waitForNewItem() throws InterruptedException
+    {
+        waiters++;
+        try
+        {
+            while( super.isEmpty() )
+            {
+                wait();
+            }
+        }
+        finally
+        {
+            waiters--;
+        }
+    }
+}

Propchange: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=325903&r1=325902&r2=325903&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Mon Oct 17 07:54:54 2005
@@ -20,7 +20,6 @@
 
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,8 +27,7 @@
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.util.BaseThreadPool;
-import org.apache.mina.util.BlockingSet;
+import org.apache.mina.util.BlockingQueue;
 import org.apache.mina.util.ByteBufferUtil;
 import org.apache.mina.util.Queue;
 import org.apache.mina.util.Stack;
@@ -43,7 +41,6 @@
  * @version $Rev$, $Date$
  * 
  * @see ThreadPool
- * @see BaseThreadPool
  */
 public class ThreadPoolFilter implements IoFilter
 {
@@ -93,7 +90,7 @@
     private final Map parents = new IdentityHashMap(); 
     private final Map buffers = new IdentityHashMap();
     private final Stack followers = new Stack();
-    private final BlockingSet unfetchedSessionBuffers = new BlockingSet();
+    private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
     private final Set allSessionBuffers = new HashSet();
 
     private Worker leader;
@@ -184,10 +181,10 @@
         }
     }
 
-    protected void fireEvent( NextFilter nextFilter, IoSession session,
+    private void fireEvent( NextFilter nextFilter, IoSession session,
                               EventType type, Object data )
     {
-        final BlockingSet unfetchedSessionBuffers = this.unfetchedSessionBuffers;
+        final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
         final Set allSessionBuffers = this.allSessionBuffers;
         final Event event = new Event( type, nextFilter, data );
 
@@ -204,10 +201,22 @@
             if( !allSessionBuffers.contains( buf ) )
             {
                 allSessionBuffers.add( buf );
-                unfetchedSessionBuffers.add( buf );
+                unfetchedSessionBuffers.push( buf );
             }
         }
     }
+    
+    /**
+     * Implement this method to fetch (or pop) a {@link SessionBuffer} from
+     * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
+     * simply pops the buffer from it.  You could prioritize the fetch order.
+     * 
+     * @return A non-null {@link SessionBuffer}
+     */
+    protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
+    {
+        return ( SessionBuffer ) unfetchedSessionBuffers.pop();
+    }
 
     private SessionBuffer getSessionBuffer( IoSession session )
     {
@@ -238,7 +247,7 @@
         }
     }
 
-    private static class SessionBuffer
+    protected static class SessionBuffer
     {
         private final IoSession session;
 
@@ -306,8 +315,7 @@
 
         private SessionBuffer fetchBuffer()
         {
-            SessionBuffer buf;
-            BlockingSet unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+            BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
             synchronized( unfetchedSessionBuffers )
             {
                 for( ;; )
@@ -328,13 +336,7 @@
                         }
                     }
 
-                    Iterator it = unfetchedSessionBuffers.iterator();
-                    while( it.hasNext() )
-                    {
-                        buf = ( SessionBuffer ) it.next();
-                        it.remove();
-                        return buf;
-                    }
+                    return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers );
                 }
             }
         }
@@ -375,7 +377,7 @@
 
         private void releaseBuffer( SessionBuffer buf )
         {
-            final BlockingSet unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+            final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
             final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
             final Queue eventQueue = buf.eventQueue;
 
@@ -388,7 +390,7 @@
                 }
                 else
                 {
-                    unfetchedSessionBuffers.add( buf );
+                    unfetchedSessionBuffers.push( buf );
                 }
             }
         }

Added: directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java?rev=325903&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java Mon Oct 17 07:54:54 2005
@@ -0,0 +1,109 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A synchronized version of {@link Queue}.
+ *
+ * @author Trustin Lee
+ * @version $Rev$, $Date$
+ */
+public class BlockingQueue extends Queue
+{
+    private static final long serialVersionUID = 5516588196355725567L;
+
+    private int waiters = 0;
+
+    public BlockingQueue()
+    {
+    }
+
+    public synchronized int capacity()
+    {
+        return super.capacity();
+    }
+
+    public synchronized void clear()
+    {
+        super.clear();
+    }
+
+    public synchronized Object first()
+    {
+        return super.first();
+    }
+
+    public synchronized Object get( int idx )
+    {
+        return super.get( idx );
+    }
+
+    public synchronized boolean isEmpty()
+    {
+        return super.isEmpty();
+    }
+
+    public synchronized Object last()
+    {
+        return super.last();
+    }
+
+    public synchronized Object pop()
+    {
+        return super.pop();
+    }
+
+    public synchronized void push( Object obj )
+    {
+        super.push( obj );
+        if( waiters > 0 )
+            notify();
+    }
+
+    public synchronized int size()
+    {
+        return super.size();
+    }
+
+    public synchronized String toString()
+    {
+        return super.toString();
+    }
+
+    /**
+     * Waits until any elements are in this queue.
+     * 
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public synchronized void waitForNewItem() throws InterruptedException
+    {
+        waiters++;
+        try
+        {
+            while( super.isEmpty() )
+            {
+                wait();
+            }
+        }
+        finally
+        {
+            waiters--;
+        }
+    }
+}

Propchange: directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision