You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/10/17 16:55:03 UTC
svn commit: r325903 - in /directory/network:
branches/0.7/src/java/org/apache/mina/util/
trunk/src/java/org/apache/mina/filter/ trunk/src/java/org/apache/mina/util/
Author: trustin
Date: Mon Oct 17 07:54:54 2005
New Revision: 325903
URL: http://svn.apache.org/viewcvs?rev=325903&view=rev
Log:
Resolved issue: DIRMINA-100 - Prioritized SessionBuffer fetching in ThreadPoolFilter
* Replaced BlockingSet with BlockingQueue because unfetchedSessionBuffers works more efficiently with Queue
* Removed unused classes
* Added ThreadPoolFilter.fetchSessionBuffer() to allow users to prioritize buffer fetching
Added:
directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java (with props)
directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java (with props)
Removed:
directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java
directory/network/trunk/src/java/org/apache/mina/util/BlockingSet.java
directory/network/trunk/src/java/org/apache/mina/util/Event.java
directory/network/trunk/src/java/org/apache/mina/util/EventType.java
Modified:
directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java
directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
Modified: directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java?rev=325903&r1=325902&r2=325903&view=diff
==============================================================================
--- directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java (original)
+++ directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java Mon Oct 17 07:54:54 2005
@@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.IdentityHashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -82,7 +81,7 @@
private final String threadNamePrefix;
private final Map buffers = new IdentityHashMap();
private final Stack followers = new Stack();
- private final BlockingSet unfetchedSessionBuffers = new BlockingSet();
+ private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
private final Set allSessionBuffers = new HashSet();
private Worker leader;
@@ -215,7 +214,7 @@
protected void fireEvent( Object nextFilter, Session session,
EventType type, Object data )
{
- final BlockingSet unfetchedSessionBuffers = this.unfetchedSessionBuffers;
+ final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
final Set allSessionBuffers = this.allSessionBuffers;
final Event event = new Event( type, nextFilter, data );
@@ -232,7 +231,7 @@
if( !allSessionBuffers.contains( buf ) )
{
allSessionBuffers.add( buf );
- unfetchedSessionBuffers.add( buf );
+ unfetchedSessionBuffers.push( buf );
}
}
}
@@ -243,6 +242,19 @@
protected abstract void processEvent( Object nextFilter, Session session,
EventType type, Object data );
+
+ /**
+ * Implement this method to fetch (or pop) a {@link SessionBuffer} from
+ * the given <tt>unfetchedSessionBuffers</tt>. The default implementation
+ * simply pops the buffer from it. You could prioritize the fetch order.
+ *
+ * @return A non-null {@link SessionBuffer}
+ */
+ protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
+ {
+ return ( SessionBuffer ) unfetchedSessionBuffers.pop();
+ }
+
private SessionBuffer getSessionBuffer( Session session )
{
final Map buffers = this.buffers;
@@ -272,7 +284,7 @@
}
}
- private static class SessionBuffer
+ protected static class SessionBuffer
{
private final Session session;
@@ -341,8 +353,7 @@
private SessionBuffer fetchBuffer()
{
- SessionBuffer buf;
- BlockingSet unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
+ BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
synchronized( unfetchedSessionBuffers )
{
for( ;; )
@@ -363,13 +374,7 @@
}
}
- Iterator it = unfetchedSessionBuffers.iterator();
- while( it.hasNext() )
- {
- buf = ( SessionBuffer ) it.next();
- it.remove();
- return buf;
- }
+ return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
}
}
}
@@ -410,7 +415,7 @@
private void releaseBuffer( SessionBuffer buf )
{
- final BlockingSet unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
+ final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
final Queue eventQueue = buf.eventQueue;
@@ -423,7 +428,7 @@
}
else
{
- unfetchedSessionBuffers.add( buf );
+ unfetchedSessionBuffers.push( buf );
}
}
}
Added: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java?rev=325903&view=auto
==============================================================================
--- directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java (added)
+++ directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java Mon Oct 17 07:54:54 2005
@@ -0,0 +1,109 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A synchronized version of {@link Queue}.
+ *
+ * @author Trustin Lee
+ * @version $Rev$, $Date$
+ */
+public class BlockingQueue extends Queue
+{
+ private static final long serialVersionUID = 5516588196355725567L;
+
+ private int waiters = 0;
+
+ public BlockingQueue()
+ {
+ }
+
+ public synchronized int capacity()
+ {
+ return super.capacity();
+ }
+
+ public synchronized void clear()
+ {
+ super.clear();
+ }
+
+ public synchronized Object first()
+ {
+ return super.first();
+ }
+
+ public synchronized Object get( int idx )
+ {
+ return super.get( idx );
+ }
+
+ public synchronized boolean isEmpty()
+ {
+ return super.isEmpty();
+ }
+
+ public synchronized Object last()
+ {
+ return super.last();
+ }
+
+ public synchronized Object pop()
+ {
+ return super.pop();
+ }
+
+ public synchronized void push( Object obj )
+ {
+ super.push( obj );
+ if( waiters > 0 )
+ notify();
+ }
+
+ public synchronized int size()
+ {
+ return super.size();
+ }
+
+ public synchronized String toString()
+ {
+ return super.toString();
+ }
+
+ /**
+ * Waits until any elements are in this queue.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public synchronized void waitForNewItem() throws InterruptedException
+ {
+ waiters++;
+ try
+ {
+ while( super.isEmpty() )
+ {
+ wait();
+ }
+ }
+ finally
+ {
+ waiters--;
+ }
+ }
+}
Propchange: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=325903&r1=325902&r2=325903&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Mon Oct 17 07:54:54 2005
@@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.IdentityHashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -28,8 +27,7 @@
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
-import org.apache.mina.util.BaseThreadPool;
-import org.apache.mina.util.BlockingSet;
+import org.apache.mina.util.BlockingQueue;
import org.apache.mina.util.ByteBufferUtil;
import org.apache.mina.util.Queue;
import org.apache.mina.util.Stack;
@@ -43,7 +41,6 @@
* @version $Rev$, $Date$
*
* @see ThreadPool
- * @see BaseThreadPool
*/
public class ThreadPoolFilter implements IoFilter
{
@@ -93,7 +90,7 @@
private final Map parents = new IdentityHashMap();
private final Map buffers = new IdentityHashMap();
private final Stack followers = new Stack();
- private final BlockingSet unfetchedSessionBuffers = new BlockingSet();
+ private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
private final Set allSessionBuffers = new HashSet();
private Worker leader;
@@ -184,10 +181,10 @@
}
}
- protected void fireEvent( NextFilter nextFilter, IoSession session,
+ private void fireEvent( NextFilter nextFilter, IoSession session,
EventType type, Object data )
{
- final BlockingSet unfetchedSessionBuffers = this.unfetchedSessionBuffers;
+ final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
final Set allSessionBuffers = this.allSessionBuffers;
final Event event = new Event( type, nextFilter, data );
@@ -204,10 +201,22 @@
if( !allSessionBuffers.contains( buf ) )
{
allSessionBuffers.add( buf );
- unfetchedSessionBuffers.add( buf );
+ unfetchedSessionBuffers.push( buf );
}
}
}
+
+ /**
+ * Implement this method to fetch (or pop) a {@link SessionBuffer} from
+ * the given <tt>unfetchedSessionBuffers</tt>. The default implementation
+ * simply pops the buffer from it. You could prioritize the fetch order.
+ *
+ * @return A non-null {@link SessionBuffer}
+ */
+ protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
+ {
+ return ( SessionBuffer ) unfetchedSessionBuffers.pop();
+ }
private SessionBuffer getSessionBuffer( IoSession session )
{
@@ -238,7 +247,7 @@
}
}
- private static class SessionBuffer
+ protected static class SessionBuffer
{
private final IoSession session;
@@ -306,8 +315,7 @@
private SessionBuffer fetchBuffer()
{
- SessionBuffer buf;
- BlockingSet unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+ BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
synchronized( unfetchedSessionBuffers )
{
for( ;; )
@@ -328,13 +336,7 @@
}
}
- Iterator it = unfetchedSessionBuffers.iterator();
- while( it.hasNext() )
- {
- buf = ( SessionBuffer ) it.next();
- it.remove();
- return buf;
- }
+ return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers );
}
}
}
@@ -375,7 +377,7 @@
private void releaseBuffer( SessionBuffer buf )
{
- final BlockingSet unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
+ final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
final Queue eventQueue = buf.eventQueue;
@@ -388,7 +390,7 @@
}
else
{
- unfetchedSessionBuffers.add( buf );
+ unfetchedSessionBuffers.push( buf );
}
}
}
Added: directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java?rev=325903&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java Mon Oct 17 07:54:54 2005
@@ -0,0 +1,109 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A synchronized version of {@link Queue}.
+ *
+ * @author Trustin Lee
+ * @version $Rev$, $Date$
+ */
+public class BlockingQueue extends Queue
+{
+ private static final long serialVersionUID = 5516588196355725567L;
+
+ private int waiters = 0;
+
+ public BlockingQueue()
+ {
+ }
+
+ public synchronized int capacity()
+ {
+ return super.capacity();
+ }
+
+ public synchronized void clear()
+ {
+ super.clear();
+ }
+
+ public synchronized Object first()
+ {
+ return super.first();
+ }
+
+ public synchronized Object get( int idx )
+ {
+ return super.get( idx );
+ }
+
+ public synchronized boolean isEmpty()
+ {
+ return super.isEmpty();
+ }
+
+ public synchronized Object last()
+ {
+ return super.last();
+ }
+
+ public synchronized Object pop()
+ {
+ return super.pop();
+ }
+
+ public synchronized void push( Object obj )
+ {
+ super.push( obj );
+ if( waiters > 0 )
+ notify();
+ }
+
+ public synchronized int size()
+ {
+ return super.size();
+ }
+
+ public synchronized String toString()
+ {
+ return super.toString();
+ }
+
+ /**
+ * Waits until any elements are in this queue.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public synchronized void waitForNewItem() throws InterruptedException
+ {
+ waiters++;
+ try
+ {
+ while( super.isEmpty() )
+ {
+ wait();
+ }
+ }
+ finally
+ {
+ waiters--;
+ }
+ }
+}
Propchange: directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision