You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/08 13:21:19 UTC
svn commit: r494042 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/transport/
common/src/main/java/org/apache/qpid/pool/
common/src/test/java/org/apache/qpid/pool/
Author: rgreig
Date: Mon Jan 8 04:21:18 2007
New Revision: 494042
URL: http://svn.apache.org/viewvc?view=rev&rev=494042
Log:
QPID-252 : Reduce unnecessary object creation
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Mon Jan 8 04:21:18 2007
@@ -49,10 +49,10 @@
final VmPipeConnector ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
"AsynchronousReadFilter");
cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
- PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
"AsynchronousWriteFilter");
cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java Mon Jan 8 04:21:18 2007
@@ -25,90 +25,66 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IdleStatus;
-/**
- * Represents an operation on IoFilter.
- */
-enum EventType
-{
- OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION
-}
-class Event
+abstract public class Event
{
- private static final Logger _log = Logger.getLogger(Event.class);
-
- private final EventType type;
- private final IoFilter.NextFilter nextFilter;
- private final Object data;
- public Event(IoFilter.NextFilter nextFilter, EventType type, Object data)
+ public Event()
{
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- if (type == EventType.EXCEPTION)
- {
- _log.error("Exception event constructed: " + data, (Throwable) data);
- }
}
- public Object getData()
- {
- return data;
- }
+ abstract public void process(IoSession session);
- public IoFilter.NextFilter getNextFilter()
- {
- return nextFilter;
- }
-
- public EventType getType()
+ public static final class ReceivedEvent extends Event
{
- return type;
- }
+ private final Object _data;
- void process(IoSession session)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Processing " + this);
- }
- if (type == EventType.RECEIVED)
- {
- nextFilter.messageReceived(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
- }
- else if (type == EventType.SENT)
+ private final IoFilter.NextFilter _nextFilter;
+
+ public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
{
- nextFilter.messageSent(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.EXCEPTION)
+
+ public void process(IoSession session)
{
- nextFilter.exceptionCaught(session, (Throwable) data);
+ _nextFilter.messageReceived(session, _data);
}
- else if (type == EventType.IDLE)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionIdle(session, (IdleStatus) data);
+ return _nextFilter;
}
- else if (type == EventType.OPENED)
+ }
+
+
+ public static final class WriteEvent extends Event
+ {
+ private final IoFilter.WriteRequest _data;
+ private final IoFilter.NextFilter _nextFilter;
+
+ public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
{
- nextFilter.sessionOpened(session);
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.WRITE)
+
+
+ public void process(IoSession session)
{
- nextFilter.filterWrite(session, (IoFilter.WriteRequest) data);
+ _nextFilter.filterWrite(session, _data);
}
- else if (type == EventType.CLOSED)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionClosed(session);
+ return _nextFilter;
}
}
- public String toString()
- {
- return "Event: type " + type + ", data: " + data;
- }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Mon Jan 8 04:21:18 2007
@@ -25,51 +25,39 @@
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
{
private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
- public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
- public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
private final ReferenceCountingExecutorService _poolReference;
- private final Set<EventType> _asyncTypes;
private final String _name;
private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
_poolReference = refCountingPool;
- _asyncTypes = asyncTypes;
_name = name;
}
- private void fireEvent(IoSession session, Event event)
+ void fireAsynchEvent(IoSession session, Event event)
{
- if (_asyncTypes.contains(event.getType()))
+ Job job = getJobForSession(session);
+ job.acquire(); //prevents this job being removed from _jobs
+ job.add(event);
+
+ //Additional checks on pool to check that it hasn't shutdown.
+ // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
{
- Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
- job.add(event);
-
- //Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
- {
- _poolReference.getPool().execute(job);
- }
- }
- else
- {
- event.process(session);
+ _poolReference.getPool().execute(job);
}
+
}
private Job getJobForSession(IoSession session)
@@ -114,45 +102,44 @@
//IoFilter methods that are processed by threads on the pool
- public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
+ nextFilter.sessionOpened(session);
}
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
+ nextFilter.sessionClosed(session);
}
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception
+ public void sessionIdle(final NextFilter nextFilter, final IoSession session,
+ final IdleStatus status) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
+ nextFilter.sessionIdle(session, status);
}
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
- Throwable cause) throws Exception
+ public void exceptionCaught(final NextFilter nextFilter, final IoSession session,
+ final Throwable cause) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
+ nextFilter.exceptionCaught(session,cause);
}
- public void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
+ nextFilter.messageReceived(session,message);
}
- public void messageSent(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageSent(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.SENT, message));
+ nextFilter.messageSent(session, message);
}
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
+ nextFilter.filterWrite(session, writeRequest);
}
//IoFilter methods that are processed on current thread (NOT on pooled thread)
@@ -188,5 +175,52 @@
// when the reference count gets to zero we release the executor service
_poolReference.releaseExecutorService();
}
+
+ public static class AsynchReadPoolingFilter extends PoolingFilter
+ {
+
+ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
+ {
+
+ fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ }
+
+
+ }
+
+ public static class AsynchWritePoolingFilter extends PoolingFilter
+ {
+
+ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
+ {
+ fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ }
+
+ }
+
+ public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchReadPoolingFilter(refCountingPool,name);
+ }
+
+
+ public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchWritePoolingFilter(refCountingPool,name);
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Mon Jan 8 04:21:18 2007
@@ -29,11 +29,8 @@
public void buildFilterChain(IoFilterChain chain) throws Exception
{
ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS,
- "AsynchronousReadFilter");
- PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS,
- "AsynchronousWriteFilter");
-
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
}
Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java Mon Jan 8 04:21:18 2007
@@ -39,7 +39,7 @@
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}