You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/06/12 15:11:30 UTC
svn commit: r190270 - in /directory/network/trunk/src:
java/org/apache/mina/common/ java/org/apache/mina/filter/
java/org/apache/mina/registry/ test/org/apache/mina/common/
Author: trustin
Date: Sun Jun 12 06:11:30 2005
New Revision: 190270
URL: http://svn.apache.org/viewcvs?rev=190270&view=rev
Log:
Resolved: DIRMINA-40 Filter API needs callback for enabled notification
Modified:
directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java
directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java
directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java
directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java
directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java
directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
Modified: directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/AbstractIoFilterChain.java Sun Jun 12 06:11:30 2005
@@ -51,6 +51,40 @@
*/
public abstract class AbstractIoFilterChain implements IoFilterChain
{
+ private static final Map filterRefCntMap = new IdentityHashMap();
+
+ private static synchronized void increaseRefCnt( IoFilter filter )
+ {
+ Integer refCnt = ( Integer ) filterRefCntMap.get( filter );
+ if( refCnt == null ) // first registration
+ {
+ filter.init();
+ refCnt = new Integer( 1 );
+ filterRefCntMap.put( filter, refCnt );
+ }
+ else
+ {
+ refCnt = new Integer( refCnt.intValue() + 1 );
+ filterRefCntMap.put( filter, refCnt );
+ }
+
+ }
+
+ private static synchronized void decreaseRefCnt( IoFilter filter )
+ {
+ Integer refCnt = ( Integer ) filterRefCntMap.get( filter );
+ if( refCnt.intValue() == 1 ) // last remove
+ {
+ filter.destroy();
+ filterRefCntMap.remove( filter );
+ }
+ else
+ {
+ refCnt = new Integer( refCnt.intValue() - 1 );
+ filterRefCntMap.put( filter, refCnt );
+ }
+ }
+
private final Map name2entry = new HashMap();
private final Map filter2entry = new IdentityHashMap();
@@ -73,6 +107,14 @@
{
return new IoFilter()
{
+ public void init()
+ {
+ }
+
+ public void destroy()
+ {
+ }
+
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
nextFilter.sessionOpened( session );
@@ -132,6 +174,14 @@
{
return new IoFilter()
{
+ public void init()
+ {
+ }
+
+ public void destroy()
+ {
+ }
+
public void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception
{
session.getHandler().sessionOpened( session );
@@ -254,6 +304,8 @@
public synchronized void remove( String name )
{
Entry entry = checkOldName( name );
+ decreaseRefCnt( entry.filter );
+
Entry prevEntry = entry.prevEntry;
Entry nextEntry = entry.nextEntry;
prevEntry.nextEntry = nextEntry;
@@ -279,6 +331,8 @@
private void register( Entry prevEntry, String name, IoFilter filter )
{
+ increaseRefCnt( filter );
+
Entry newEntry = new Entry( prevEntry, prevEntry.nextEntry, name, filter );
prevEntry.nextEntry.prevEntry = newEntry;
prevEntry.nextEntry = newEntry;
@@ -478,6 +532,12 @@
return list;
}
+ protected void finalize() throws Throwable
+ {
+ this.clear();
+ super.finalize();
+ }
+
protected abstract void doWrite( IoSession session, WriteRequest writeRequest );
private class Entry
Modified: directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java Sun Jun 12 06:11:30 2005
@@ -44,6 +44,22 @@
public interface IoFilter
{
/**
+ * Invoked when this filter is added to {@link IoFilterChain}.
+ * This method is invoked only once even if you add a single filter to
+ * multiple filter chains because MINA maintains its reference count
+ * internally.
+ */
+ void init();
+
+ /**
+ * Invoked when this filter is removed from {@link IoFilterChain}.
+ * This method is invoked only once even if you remove a single filter from
+ * multiple filter chains because MINA maintains its reference count
+ * internally.
+ */
+ void destroy();
+
+ /**
* Filters {@link IoHandler#sessionOpened(IoSession)} event.
*/
void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception;
Modified: directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java Sun Jun 12 06:11:30 2005
@@ -29,6 +29,14 @@
*/
public class IoFilterAdapter implements IoFilter
{
+ public void init()
+ {
+ }
+
+ public void destroy()
+ {
+ }
+
public void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception
{
nextFilter.sessionOpened( session );
Modified: directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoSessionManagerFilterChain.java Sun Jun 12 06:11:30 2005
@@ -48,6 +48,14 @@
{
return new IoFilter()
{
+ public void init()
+ {
+ }
+
+ public void destroy()
+ {
+ }
+
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
( ( IoSessionFilterChain ) session.getFilterChain() ).sessionOpened( session );
Modified: directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java Sun Jun 12 06:11:30 2005
@@ -75,6 +75,14 @@
this.defaultLevel = defaultLevel;
}
+ public void init()
+ {
+ }
+
+ public void destroy()
+ {
+ }
+
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
SessionLog.log( defaultLevel, session, "OPENED" );
Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Sun Jun 12 06:11:30 2005
@@ -18,13 +18,21 @@
*/
package org.apache.mina.filter;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
import org.apache.mina.util.BaseThreadPool;
+import org.apache.mina.util.BlockingSet;
import org.apache.mina.util.ByteBufferUtil;
-import org.apache.mina.util.EventType;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.Stack;
import org.apache.mina.util.ThreadPool;
/**
@@ -37,17 +45,468 @@
* @see ThreadPool
* @see BaseThreadPool
*/
-public class ThreadPoolFilter extends BaseThreadPool implements ThreadPool, IoFilter
+public class ThreadPoolFilter implements IoFilter
{
+ /**
+ * Default maximum size of thread pool (2G).
+ */
+ public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+ /**
+ * Default keep-alive time of thread pool (1 min).
+ */
+ public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+ private static volatile int threadId = 0;
+
+ private final Map buffers = new IdentityHashMap();
+
+ private final Stack followers = new Stack();
+
+ private final BlockingSet readySessionBuffers = new BlockingSet();
+
+ private final Set busySessionBuffers = new HashSet();
+
+ private Worker leader;
+
+ private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+
+ private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+ private boolean started;
+
+ private boolean shuttingDown;
+
+ private int poolSize;
+
+ private final Object poolSizeLock = new Object();
/**
* Creates a new instanceof this filter with default thread pool settings.
- * You'll have to invoke {@link #start()} method to start threads actually.
*/
public ThreadPoolFilter()
{
}
+ public int getPoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ return poolSize;
+ }
+ }
+
+ public int getMaximumPoolSize()
+ {
+ return maximumPoolSize;
+ }
+
+ public int getKeepAliveTime()
+ {
+ return keepAliveTime;
+ }
+
+ public void setMaximumPoolSize( int maximumPoolSize )
+ {
+ if( maximumPoolSize <= 0 )
+ throw new IllegalArgumentException();
+ this.maximumPoolSize = maximumPoolSize;
+ }
+
+ public void setKeepAliveTime( int keepAliveTime )
+ {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public synchronized void init()
+ {
+ if( started )
+ return;
+
+ shuttingDown = false;
+
+ leader = new Worker();
+ leader.start();
+ leader.lead();
+
+ started = true;
+ }
+
+ public synchronized void destroy()
+ {
+ if( !started )
+ return;
+
+ shuttingDown = true;
+ Worker lastLeader = null;
+ for( ;; )
+ {
+ Worker leader = this.leader;
+ if( lastLeader == leader )
+ break;
+
+ while( leader.isAlive() )
+ {
+ leader.interrupt();
+ try
+ {
+ leader.join();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+
+ lastLeader = leader;
+ }
+
+ started = false;
+ }
+
+ private void increasePoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ poolSize++;
+ }
+ }
+
+ private void decreasePoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ poolSize--;
+ }
+ }
+
+ protected void fireEvent( NextFilter nextFilter, IoSession session,
+ EventType type, Object data )
+ {
+ final BlockingSet readySessionBuffers = this.readySessionBuffers;
+ final Set busySessionBuffers = this.busySessionBuffers;
+ final SessionBuffer buf = getSessionBuffer( session );
+ final Queue eventQueue = buf.eventQueue;
+ final Event event = new Event( type, nextFilter, data );
+
+ synchronized( buf )
+ {
+ eventQueue.push( event );
+ }
+
+ synchronized( readySessionBuffers )
+ {
+ if( !busySessionBuffers.contains( buf ) )
+ {
+ busySessionBuffers.add( buf );
+ readySessionBuffers.add( buf );
+ }
+ }
+ }
+
+ private SessionBuffer getSessionBuffer( IoSession session )
+ {
+ final Map buffers = this.buffers;
+ SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+ if( buf == null )
+ {
+ synchronized( buffers )
+ {
+ buf = ( SessionBuffer ) buffers.get( session );
+ if( buf == null )
+ {
+ buf = new SessionBuffer( session );
+ buffers.put( session, buf );
+ }
+ }
+ }
+ return buf;
+ }
+
+ private void removeSessionBuffer( SessionBuffer buf )
+ {
+ final Map buffers = this.buffers;
+ final IoSession session = buf.session;
+ synchronized( buffers )
+ {
+ buffers.remove( session );
+ }
+ }
+
+ private static class SessionBuffer
+ {
+ private final IoSession session;
+
+ private final Queue eventQueue = new Queue();
+
+ private SessionBuffer( IoSession session )
+ {
+ this.session = session;
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ private final Object promotionLock = new Object();
+
+ private Worker()
+ {
+ super( "IoThreadPool-" + ( threadId++ ) );
+ increasePoolSize();
+ }
+
+ public void lead()
+ {
+ final Object promotionLock = this.promotionLock;
+ synchronized( promotionLock )
+ {
+ leader = this;
+ promotionLock.notify();
+ }
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ if( !waitForPromotion() )
+ break;
+
+ SessionBuffer buf = fetchBuffer();
+ giveUpLead();
+
+ if( buf == null )
+ {
+ break;
+ }
+
+ processEvents( buf );
+ follow();
+ releaseBuffer( buf );
+ }
+
+ decreasePoolSize();
+ }
+
+ private SessionBuffer fetchBuffer()
+ {
+ SessionBuffer buf = null;
+ BlockingSet readySessionBuffers = ThreadPoolFilter.this.readySessionBuffers;
+ synchronized( readySessionBuffers )
+ {
+ do
+ {
+ buf = null;
+ try
+ {
+ readySessionBuffers.waitForNewItem();
+ }
+ catch( InterruptedException e )
+ {
+ break;
+ }
+
+ Iterator it = readySessionBuffers.iterator();
+ if( !it.hasNext() )
+ {
+ // exceeded keepAliveTime
+ break;
+ }
+
+ do
+ {
+ buf = null;
+ buf = ( SessionBuffer ) it.next();
+ it.remove();
+ }
+ while( buf != null && buf.eventQueue.isEmpty()
+ && it.hasNext() );
+ }
+ while( buf != null && buf.eventQueue.isEmpty() );
+ }
+
+ return buf;
+ }
+
+ private void processEvents( SessionBuffer buf )
+ {
+ final IoSession session = buf.session;
+ final Queue eventQueue = buf.eventQueue;
+ for( ;; )
+ {
+ Event event;
+ synchronized( buf )
+ {
+ event = ( Event ) eventQueue.pop();
+ if( event == null )
+ break;
+ }
+ processEvent( event.getNextFilter(), session,
+ event.getType(), event.getData() );
+ }
+ }
+
+ private void follow()
+ {
+ final Object promotionLock = this.promotionLock;
+ final Stack followers = ThreadPoolFilter.this.followers;
+ synchronized( promotionLock )
+ {
+ if( this != leader )
+ {
+ synchronized( followers )
+ {
+ followers.push( this );
+ }
+ }
+ }
+ }
+
+ private void releaseBuffer( SessionBuffer buf )
+ {
+ final BlockingSet readySessionBuffers = ThreadPoolFilter.this.readySessionBuffers;
+ final Set busySessionBuffers = ThreadPoolFilter.this.busySessionBuffers;
+ final Queue eventQueue = buf.eventQueue;
+
+ synchronized( readySessionBuffers )
+ {
+ busySessionBuffers.remove( buf );
+ if( eventQueue.isEmpty() )
+ {
+ removeSessionBuffer( buf );
+ }
+ else
+ {
+ readySessionBuffers.add( buf );
+ }
+ }
+ }
+
+ private boolean waitForPromotion()
+ {
+ final Object promotionLock = this.promotionLock;
+
+ synchronized( promotionLock )
+ {
+ if( this != leader )
+ {
+ try
+ {
+ int keepAliveTime = getKeepAliveTime();
+ if( keepAliveTime > 0 )
+ {
+ promotionLock.wait( keepAliveTime );
+ }
+ else
+ {
+ promotionLock.wait();
+ }
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+
+ boolean timeToLead = this == leader;
+
+ if( !timeToLead )
+ {
+ // time to die
+ synchronized( followers )
+ {
+ followers.remove( this );
+ }
+ }
+
+ return timeToLead;
+ }
+ }
+
+ private void giveUpLead()
+ {
+ final Stack followers = ThreadPoolFilter.this.followers;
+ Worker worker;
+ synchronized( followers )
+ {
+ worker = ( Worker ) followers.pop();
+ }
+
+ if( worker != null )
+ {
+ worker.lead();
+ }
+ else
+ {
+ if( !shuttingDown )
+ {
+ synchronized( ThreadPoolFilter.this )
+ {
+ if( !shuttingDown
+ && getPoolSize() < getMaximumPoolSize() )
+ {
+ worker = new Worker();
+ worker.start();
+ worker.lead();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static class EventType
+ {
+ public static final EventType OPENED = new EventType();
+
+ public static final EventType CLOSED = new EventType();
+
+ public static final EventType READ = new EventType();
+
+ public static final EventType WRITTEN = new EventType();
+
+ public static final EventType RECEIVED = new EventType();
+
+ public static final EventType SENT = new EventType();
+
+ public static final EventType IDLE = new EventType();
+
+ public static final EventType EXCEPTION = new EventType();
+
+ private EventType()
+ {
+ }
+ }
+
+ private static class Event
+ {
+ private final EventType type;
+ private final NextFilter nextFilter;
+ private final Object data;
+
+ public Event( EventType type, NextFilter nextFilter, Object data )
+ {
+ this.type = type;
+ this.nextFilter = nextFilter;
+ this.data = data;
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+
+ public NextFilter getNextFilter()
+ {
+ return nextFilter;
+ }
+
+
+ public EventType getType()
+ {
+ return type;
+ }
+ }
+
public void sessionOpened( NextFilter nextFilter,
IoSession session )
{
Modified: directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/registry/SimpleServiceRegistry.java Sun Jun 12 06:11:30 2005
@@ -42,9 +42,6 @@
public SimpleServiceRegistry()
{
- socketIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
- datagramIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
- vmPipeAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
}
public void bind( Service service, IoHandler ioHandler ) throws IOException
@@ -68,15 +65,6 @@
// ignore
}
- try
- {
- acceptor.unbind( service.getAddress() );
- }
- catch( Exception e )
- {
- // ignore
- }
-
services.remove( service );
stopThreadPools();
}
@@ -178,7 +166,9 @@
if( !services.isEmpty() )
return;
- threadPoolFilter.start();
+ socketIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
+ datagramIoAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
+ vmPipeAcceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
}
private void stopThreadPools()
@@ -186,6 +176,8 @@
if( !services.isEmpty() )
return;
- threadPoolFilter.stop();
+ socketIoAcceptor.getFilterChain().remove( "threadPool" );
+ datagramIoAcceptor.getFilterChain().remove( "threadPool" );
+ vmPipeAcceptor.getFilterChain().remove( "threadPool" );
}
}
Modified: directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java?rev=190270&r1=190269&r2=190270&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java Sun Jun 12 06:11:30 2005
@@ -55,8 +55,8 @@
public void testChained()
{
- chain.addLast( "A", new TestFilter( 'A' ) );
- chain.addLast( "B", new TestFilter( 'B' ) );
+ chain.addLast( "A", new EventOrderTestFilter( 'A' ) );
+ chain.addLast( "B", new EventOrderTestFilter( 'B' ) );
run( "ASO BSO HSO" +
"AMR BMR HMR" +
"BFW AFW AMS BMS HMS" +
@@ -65,6 +65,30 @@
"ASC BSC HSC" );
}
+ public void testInitDestroy()
+ {
+ IoFilterChainImpl chain2 = new IoFilterChainImpl();
+ IoFilter filter = new InitDestroyTestFilter();
+
+ chain.addFirst( "A", filter );
+ assertEquals( "INIT", result );
+
+ chain2.addFirst( "B", filter );
+ assertEquals( "INIT", result );
+
+ chain2.addFirst( "C", filter );
+ assertEquals( "INIT", result );
+
+ chain.remove( "A" );
+ assertEquals( "INIT", result );
+
+ chain2.remove( "B" );
+ assertEquals( "INIT", result );
+
+ chain2.remove( "C" );
+ assertEquals( formatResult( "INIT DESTROY" ), formatResult( result ) );
+ }
+
private void run( String expectedResult )
{
chain.sessionOpened( session );
@@ -169,15 +193,23 @@
}
}
- private class TestFilter implements IoFilter
+ private class EventOrderTestFilter implements IoFilter
{
private final char id;
- private TestFilter( char id )
+ private EventOrderTestFilter( char id )
{
this.id = id;
}
+ public void init()
+ {
+ }
+
+ public void destroy()
+ {
+ }
+
public void sessionOpened(NextFilter nextFilter, IoSession session) {
result += id + "SO";
nextFilter.sessionOpened( session );
@@ -211,6 +243,19 @@
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
result += id + "MS";
nextFilter.messageSent( session, message );
+ }
+ }
+
+ private class InitDestroyTestFilter extends IoFilterAdapter
+ {
+ public void init()
+ {
+ result += "INIT";
+ }
+
+ public void destroy()
+ {
+ result += "DESTROY";
}
}