You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/07/11 08:49:08 UTC
svn commit: r555186 - in
/mina/trunk/core/src/main/java/org/apache/mina/filter/executor:
AbstractExecutorFilter.java ExecutorFilter.java UnorderedExecutorFilter.java
Author: trustin
Date: Tue Jul 10 23:49:07 2007
New Revision: 555186
URL: http://svn.apache.org/viewvc?view=rev&rev=555186
Log:
Extracted common parts from ExecutorFilter and UnorderedExecutorFilter into AbstractExecutorFilter
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedExecutorFilter.java
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java?view=auto&rev=555186
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java Tue Jul 10 23:49:07 2007
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+
+/**
+ * A base abstract class for generic filters that forward I/O events to
+ * {@link Executor} to enforce a certain thread model.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class AbstractExecutorFilter extends IoFilterAdapter
+{
+ private final Executor executor;
+
+ /**
+ * Creates a new instance with the default thread pool implementation
+ * (<tt>new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() )</tt>).
+ */
+ protected AbstractExecutorFilter()
+ {
+ this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() ) );
+ }
+
+ /**
+ * Creates a new instance with the specified <tt>executor</tt>.
+ */
+ protected AbstractExecutorFilter( Executor executor )
+ {
+ if( executor == null )
+ {
+ throw new NullPointerException( "executor" );
+ }
+
+ this.executor = executor;
+ }
+
+ /**
+ * Returns the underlying {@link Executor} instance this filter uses.
+ */
+ public final Executor getExecutor()
+ {
+ return executor;
+ }
+
+ protected abstract void fireEvent(
+ NextFilter nextFilter, IoSession session, EventType type, Object data );
+
+ protected static class EventType
+ {
+ public static final EventType OPENED = new EventType( "OPENED" );
+
+ public static final EventType CLOSED = new EventType( "CLOSED" );
+
+ public static final EventType READ = new EventType( "READ" );
+
+ public static final EventType WRITTEN = new EventType( "WRITTEN" );
+
+ public static final EventType RECEIVED = new EventType( "RECEIVED" );
+
+ public static final EventType SENT = new EventType( "SENT" );
+
+ public static final EventType IDLE = new EventType( "IDLE" );
+
+ public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+ private final String value;
+
+ private EventType( String value )
+ {
+ this.value = value;
+ }
+
+ @Override
+ public String toString()
+ {
+ return value;
+ }
+ }
+
+ protected static class Event
+ {
+ private final EventType type;
+ private final NextFilter nextFilter;
+ private final Object data;
+
+ protected Event( EventType type, NextFilter nextFilter, Object data )
+ {
+ this.type = type;
+ this.nextFilter = nextFilter;
+ this.data = data;
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+ public NextFilter getNextFilter()
+ {
+ return nextFilter;
+ }
+
+ public EventType getType()
+ {
+ return type;
+ }
+ }
+
+ public final void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ nextFilter.sessionCreated( session );
+ }
+
+ public final void sessionOpened( NextFilter nextFilter,
+ IoSession session )
+ {
+ fireEvent( nextFilter, session, EventType.OPENED, null );
+ }
+
+ public final void sessionClosed( NextFilter nextFilter,
+ IoSession session )
+ {
+ fireEvent( nextFilter, session, EventType.CLOSED, null );
+ }
+
+ public final void sessionIdle( NextFilter nextFilter,
+ IoSession session, IdleStatus status )
+ {
+ fireEvent( nextFilter, session, EventType.IDLE, status );
+ }
+
+ public final void exceptionCaught( NextFilter nextFilter,
+ IoSession session, Throwable cause )
+ {
+ fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
+ }
+
+ public final void messageReceived( NextFilter nextFilter,
+ IoSession session, Object message )
+ {
+ fireEvent( nextFilter, session, EventType.RECEIVED, message );
+ }
+
+ public final void messageSent( NextFilter nextFilter,
+ IoSession session, WriteRequest writeRequest )
+ {
+ fireEvent( nextFilter, session, EventType.SENT, writeRequest );
+ }
+
+ protected final void processEvent( NextFilter nextFilter, IoSession session, EventType type, Object data )
+ {
+ if( type == EventType.RECEIVED )
+ {
+ nextFilter.messageReceived( session, data );
+ }
+ else if( type == EventType.SENT )
+ {
+ nextFilter.messageSent( session, (WriteRequest) data );
+ }
+ else if( type == EventType.EXCEPTION )
+ {
+ nextFilter.exceptionCaught( session, (Throwable) data );
+ }
+ else if( type == EventType.IDLE )
+ {
+ nextFilter.sessionIdle( session, (IdleStatus) data );
+ }
+ else if( type == EventType.OPENED )
+ {
+ nextFilter.sessionOpened( session );
+ }
+ else if( type == EventType.CLOSED )
+ {
+ nextFilter.sessionClosed( session );
+ }
+ }
+
+ public final void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ }
+
+ public final void filterClose( NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ nextFilter.filterClose( session );
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/AbstractExecutorFilter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?view=diff&rev=555186&r1=555185&r2=555186
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java Tue Jul 10 23:49:07 2007
@@ -22,20 +22,15 @@
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A filter that forward events to {@link Executor}.
+ * A filter that forwards I/O events to {@link Executor} to enforce a certain
+ * thread model while maintaining the event order.
* You can apply various thread model by inserting this filter to the {@link IoFilterChain}.
* <p>
* Please note that this filter doesn't manage the life cycle of the underlying
@@ -56,10 +51,9 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 2005) $
*/
-public class ExecutorFilter extends IoFilterAdapter
+public class ExecutorFilter extends AbstractExecutorFilter
{
private final Logger logger = LoggerFactory.getLogger(getClass());
- private final Executor executor;
/**
* Creates a new instance with the default thread pool implementation
@@ -67,7 +61,7 @@
*/
public ExecutorFilter()
{
- this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() ) );
+ super();
}
/**
@@ -75,23 +69,10 @@
*/
public ExecutorFilter( Executor executor )
{
- if( executor == null )
- {
- throw new NullPointerException( "executor" );
- }
-
- this.executor = executor;
+ super( executor );
}
- /**
- * Returns the underlying {@link Executor} instance this filter uses.
- */
- public Executor getExecutor()
- {
- return executor;
- }
-
- private void fireEvent( NextFilter nextFilter, IoSession session,
+ protected void fireEvent( NextFilter nextFilter, IoSession session,
EventType type, Object data )
{
Event event = new Event( type, nextFilter, data );
@@ -119,7 +100,7 @@
logger.debug( "Launching thread for " + session.getRemoteAddress() );
}
- executor.execute( new ProcessEventsRunnable( buf ) );
+ getExecutor().execute( new ProcessEventsRunnable( buf ) );
}
}
@@ -149,155 +130,6 @@
{
this.session = session;
}
- }
-
- protected static class EventType
- {
- public static final EventType OPENED = new EventType( "OPENED" );
-
- public static final EventType CLOSED = new EventType( "CLOSED" );
-
- public static final EventType READ = new EventType( "READ" );
-
- public static final EventType WRITTEN = new EventType( "WRITTEN" );
-
- public static final EventType RECEIVED = new EventType( "RECEIVED" );
-
- public static final EventType SENT = new EventType( "SENT" );
-
- public static final EventType IDLE = new EventType( "IDLE" );
-
- public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
-
- private final String value;
-
- private EventType( String value )
- {
- this.value = value;
- }
-
- @Override
- public String toString()
- {
- return value;
- }
- }
-
- protected static class Event
- {
- private final EventType type;
- private final NextFilter nextFilter;
- private final Object data;
-
- Event( EventType type, NextFilter nextFilter, Object data )
- {
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- }
-
- public Object getData()
- {
- return data;
- }
-
- public NextFilter getNextFilter()
- {
- return nextFilter;
- }
-
- public EventType getType()
- {
- return type;
- }
- }
-
- @Override
- public void sessionCreated( NextFilter nextFilter, IoSession session )
- {
- nextFilter.sessionCreated( session );
- }
-
- @Override
- public void sessionOpened( NextFilter nextFilter,
- IoSession session )
- {
- fireEvent( nextFilter, session, EventType.OPENED, null );
- }
-
- @Override
- public void sessionClosed( NextFilter nextFilter,
- IoSession session )
- {
- fireEvent( nextFilter, session, EventType.CLOSED, null );
- }
-
- @Override
- public void sessionIdle( NextFilter nextFilter,
- IoSession session, IdleStatus status )
- {
- fireEvent( nextFilter, session, EventType.IDLE, status );
- }
-
- @Override
- public void exceptionCaught( NextFilter nextFilter,
- IoSession session, Throwable cause )
- {
- fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
- }
-
- @Override
- public void messageReceived( NextFilter nextFilter,
- IoSession session, Object message )
- {
- fireEvent( nextFilter, session, EventType.RECEIVED, message );
- }
-
- @Override
- public void messageSent( NextFilter nextFilter,
- IoSession session, WriteRequest writeRequest )
- {
- fireEvent( nextFilter, session, EventType.SENT, writeRequest );
- }
-
- protected void processEvent( NextFilter nextFilter, IoSession session, EventType type, Object data )
- {
- if( type == EventType.RECEIVED )
- {
- nextFilter.messageReceived( session, data );
- }
- else if( type == EventType.SENT )
- {
- nextFilter.messageSent( session, (WriteRequest) data );
- }
- else if( type == EventType.EXCEPTION )
- {
- nextFilter.exceptionCaught( session, (Throwable) data );
- }
- else if( type == EventType.IDLE )
- {
- nextFilter.sessionIdle( session, (IdleStatus) data );
- }
- else if( type == EventType.OPENED )
- {
- nextFilter.sessionOpened( session );
- }
- else if( type == EventType.CLOSED )
- {
- nextFilter.sessionClosed( session );
- }
- }
-
- @Override
- public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
- {
- nextFilter.filterWrite( session, writeRequest );
- }
-
- @Override
- public void filterClose( NextFilter nextFilter, IoSession session ) throws Exception
- {
- nextFilter.filterClose( session );
}
private class ProcessEventsRunnable implements Runnable
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedExecutorFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedExecutorFilter.java?view=diff&rev=555186&r1=555185&r2=555186
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedExecutorFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedExecutorFilter.java Tue Jul 10 23:49:07 2007
@@ -20,18 +20,14 @@
package org.apache.mina.filter.executor;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteRequest;
/**
- * A filter that forward events to {@link Executor}.
+ * A filter that forwards I/O events to {@link Executor} to enforce a certain
+ * thread model while allowing the events per session to be processed
+ * simultaneously.
* You can apply various thread model by inserting this filter to the {@link IoFilterChain}.
* <p>
* Please note that this filter doesn't manage the life cycle of the underlying
@@ -54,17 +50,15 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
-public class UnorderedExecutorFilter extends IoFilterAdapter
+public class UnorderedExecutorFilter extends AbstractExecutorFilter
{
- private final Executor executor;
-
/**
* Creates a new instance with the default thread pool implementation
* (<tt>new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() )</tt>).
*/
public UnorderedExecutorFilter()
{
- this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() ) );
+ super();
}
/**
@@ -72,176 +66,14 @@
*/
public UnorderedExecutorFilter( Executor executor )
{
- if( executor == null )
- {
- throw new NullPointerException( "executor" );
- }
-
- this.executor = executor;
- }
-
- /**
- * Returns the underlying {@link Executor} instance this filter uses.
- */
- public Executor getExecutor()
- {
- return executor;
+ super(executor);
}
- private void fireEvent( NextFilter nextFilter, IoSession session,
+ protected void fireEvent( NextFilter nextFilter, IoSession session,
EventType type, Object data )
{
Event event = new Event( type, nextFilter, data );
- executor.execute(new ProcessEventRunnable(session, event));
- }
-
- protected static class EventType
- {
- public static final EventType OPENED = new EventType( "OPENED" );
-
- public static final EventType CLOSED = new EventType( "CLOSED" );
-
- public static final EventType READ = new EventType( "READ" );
-
- public static final EventType WRITTEN = new EventType( "WRITTEN" );
-
- public static final EventType RECEIVED = new EventType( "RECEIVED" );
-
- public static final EventType SENT = new EventType( "SENT" );
-
- public static final EventType IDLE = new EventType( "IDLE" );
-
- public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
-
- private final String value;
-
- private EventType( String value )
- {
- this.value = value;
- }
-
- @Override
- public String toString()
- {
- return value;
- }
- }
-
- protected static class Event
- {
- private final EventType type;
- private final NextFilter nextFilter;
- private final Object data;
-
- Event( EventType type, NextFilter nextFilter, Object data )
- {
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- }
-
- public Object getData()
- {
- return data;
- }
-
- public NextFilter getNextFilter()
- {
- return nextFilter;
- }
-
- public EventType getType()
- {
- return type;
- }
- }
-
- @Override
- public void sessionCreated( NextFilter nextFilter, IoSession session )
- {
- nextFilter.sessionCreated( session );
- }
-
- @Override
- public void sessionOpened( NextFilter nextFilter,
- IoSession session )
- {
- fireEvent( nextFilter, session, EventType.OPENED, null );
- }
-
- @Override
- public void sessionClosed( NextFilter nextFilter,
- IoSession session )
- {
- fireEvent( nextFilter, session, EventType.CLOSED, null );
- }
-
- @Override
- public void sessionIdle( NextFilter nextFilter,
- IoSession session, IdleStatus status )
- {
- fireEvent( nextFilter, session, EventType.IDLE, status );
- }
-
- @Override
- public void exceptionCaught( NextFilter nextFilter,
- IoSession session, Throwable cause )
- {
- fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
- }
-
- @Override
- public void messageReceived( NextFilter nextFilter,
- IoSession session, Object message )
- {
- fireEvent( nextFilter, session, EventType.RECEIVED, message );
- }
-
- @Override
- public void messageSent( NextFilter nextFilter,
- IoSession session, WriteRequest writeRequest )
- {
- fireEvent( nextFilter, session, EventType.SENT, writeRequest );
- }
-
- protected void processEvent( NextFilter nextFilter, IoSession session, EventType type, Object data )
- {
- if( type == EventType.RECEIVED )
- {
- nextFilter.messageReceived( session, data );
- }
- else if( type == EventType.SENT )
- {
- nextFilter.messageSent( session, (WriteRequest) data );
- }
- else if( type == EventType.EXCEPTION )
- {
- nextFilter.exceptionCaught( session, (Throwable) data );
- }
- else if( type == EventType.IDLE )
- {
- nextFilter.sessionIdle( session, (IdleStatus) data );
- }
- else if( type == EventType.OPENED )
- {
- nextFilter.sessionOpened( session );
- }
- else if( type == EventType.CLOSED )
- {
- nextFilter.sessionClosed( session );
- }
- }
-
- @Override
- public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest )
- {
- nextFilter.filterWrite( session, writeRequest );
- }
-
- @Override
- public void filterClose( NextFilter nextFilter, IoSession session ) throws Exception
- {
- nextFilter.filterClose( session );
+ getExecutor().execute(new ProcessEventRunnable(session, event));
}
private class ProcessEventRunnable implements Runnable