You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/08 13:21:19 UTC

svn commit: r494042 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/transport/ common/src/main/java/org/apache/qpid/pool/ common/src/test/java/org/apache/qpid/pool/

Author: rgreig
Date: Mon Jan  8 04:21:18 2007
New Revision: 494042

URL: http://svn.apache.org/viewvc?view=rev&rev=494042
Log:
QPID-252 : Reduce unnecessary object creation

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
    incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Mon Jan  8 04:21:18 2007
@@ -49,10 +49,10 @@
         final VmPipeConnector ioConnector = new VmPipeConnector();
         final IoServiceConfig cfg = ioConnector.getDefaultConfig();
         ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
-        PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
+        PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
                                                     "AsynchronousReadFilter");
         cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
-        PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
+        PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, 
                                                      "AsynchronousWriteFilter");
         cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
         

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java Mon Jan  8 04:21:18 2007
@@ -25,90 +25,66 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IdleStatus;
 
-/**
- * Represents an operation on IoFilter.
- */
-enum EventType
-{
-    OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION
-}
 
-class Event
+abstract public class Event
 {
-    private static final Logger _log = Logger.getLogger(Event.class);
-
-    private final EventType type;
-    private final IoFilter.NextFilter nextFilter;
-    private final Object data;
 
-    public Event(IoFilter.NextFilter nextFilter, EventType type, Object data)
+    public Event()
     {
-        this.type = type;
-        this.nextFilter = nextFilter;
-        this.data = data;
-        if (type == EventType.EXCEPTION)
-        {
-            _log.error("Exception event constructed: " + data, (Throwable) data);
-        }
     }
 
-    public Object getData()
-    {
-        return data;
-    }
 
+    abstract public void process(IoSession session);
 
-    public IoFilter.NextFilter getNextFilter()
-    {
-        return nextFilter;
-    }
 
-
-    public EventType getType()
+    public static final class ReceivedEvent extends Event
     {
-        return type;
-    }
+        private final Object _data;
 
-    void process(IoSession session)
-    {
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("Processing " + this);
-        }
-        if (type == EventType.RECEIVED)
-        {
-            nextFilter.messageReceived(session, data);
-            //ByteBufferUtil.releaseIfPossible( data );
-        }
-        else if (type == EventType.SENT)
+        private final IoFilter.NextFilter _nextFilter;
+
+        public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
         {
-            nextFilter.messageSent(session, data);
-            //ByteBufferUtil.releaseIfPossible( data );
+            super();
+            _nextFilter = nextFilter;
+            _data = data;
         }
-        else if (type == EventType.EXCEPTION)
+
+        public void process(IoSession session)
         {
-            nextFilter.exceptionCaught(session, (Throwable) data);
+            _nextFilter.messageReceived(session, _data);
         }
-        else if (type == EventType.IDLE)
+
+        public IoFilter.NextFilter getNextFilter()
         {
-            nextFilter.sessionIdle(session, (IdleStatus) data);
+            return _nextFilter;
         }
-        else if (type == EventType.OPENED)
+    }
+
+
+    public static final class WriteEvent extends Event
+    {
+        private final IoFilter.WriteRequest _data;
+        private final IoFilter.NextFilter _nextFilter;
+
+        public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
         {
-            nextFilter.sessionOpened(session);
+            super();
+            _nextFilter = nextFilter;
+            _data = data;
         }
-        else if (type == EventType.WRITE)
+
+
+        public void process(IoSession session)
         {
-            nextFilter.filterWrite(session, (IoFilter.WriteRequest) data);
+            _nextFilter.filterWrite(session, _data);
         }
-        else if (type == EventType.CLOSED)
+
+        public IoFilter.NextFilter getNextFilter()
         {
-            nextFilter.sessionClosed(session);
+            return _nextFilter;
         }
     }
 
-    public String toString()
-    {
-        return "Event: type " + type + ", data: " + data;
-    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Mon Jan  8 04:21:18 2007
@@ -25,51 +25,39 @@
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.EnumSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
 {
     private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
-    public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
-    public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
 
     private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
     private final ReferenceCountingExecutorService _poolReference;
-    private final Set<EventType> _asyncTypes;
 
     private final String _name;
     private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
 
-    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
+    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
     {
         _poolReference = refCountingPool;
-        _asyncTypes = asyncTypes;
         _name = name;
     }
 
-    private void fireEvent(IoSession session, Event event)
+    void fireAsynchEvent(IoSession session, Event event)
     {
-        if (_asyncTypes.contains(event.getType()))
+        Job job = getJobForSession(session);
+        job.acquire(); //prevents this job being removed from _jobs
+        job.add(event);
+
+        //Additional checks on pool to check that it hasn't shutdown.
+        // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
+        if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
         {
-            Job job = getJobForSession(session);
-            job.acquire(); //prevents this job being removed from _jobs
-            job.add(event);
-
-            //Additional checks on pool to check that it hasn't shutdown.
-            // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
-            if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
-            {
-                _poolReference.getPool().execute(job);
-            }
-        }
-        else
-        {
-            event.process(session);
+            _poolReference.getPool().execute(job);
         }
+
     }
 
     private Job getJobForSession(IoSession session)
@@ -114,45 +102,44 @@
 
     //IoFilter methods that are processed by threads on the pool
 
-    public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
+    public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
     {
-        fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
+        nextFilter.sessionOpened(session);
     }
 
-    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
+    public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
     {
-        fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
+        nextFilter.sessionClosed(session);
     }
 
-    public void sessionIdle(NextFilter nextFilter, IoSession session,
-                            IdleStatus status) throws Exception
+    public void sessionIdle(final NextFilter nextFilter, final IoSession session,
+                            final IdleStatus status) throws Exception
     {
-        fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
+        nextFilter.sessionIdle(session, status);
     }
 
-    public void exceptionCaught(NextFilter nextFilter, IoSession session,
-                                Throwable cause) throws Exception
+    public void exceptionCaught(final NextFilter nextFilter, final IoSession session,
+                                final Throwable cause) throws Exception
     {
-        fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
+            nextFilter.exceptionCaught(session,cause);
     }
 
-    public void messageReceived(NextFilter nextFilter, IoSession session,
-                                Object message) throws Exception
+    public void messageReceived(final NextFilter nextFilter, final IoSession session,
+                                final Object message) throws Exception
     {
-        //ByteBufferUtil.acquireIfPossible( message );
-        fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
+        nextFilter.messageReceived(session,message);
     }
 
-    public void messageSent(NextFilter nextFilter, IoSession session,
-                            Object message) throws Exception
+    public void messageSent(final NextFilter nextFilter, final IoSession session,
+                            final Object message) throws Exception
     {
-        //ByteBufferUtil.acquireIfPossible( message );
-        fireEvent(session, new Event(nextFilter, EventType.SENT, message));
+        nextFilter.messageSent(session, message);
     }
 
-    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+    public void filterWrite(final NextFilter nextFilter, final IoSession session,
+                            final WriteRequest writeRequest) throws Exception
     {
-        fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
+        nextFilter.filterWrite(session, writeRequest);
     }
 
     //IoFilter methods that are processed on current thread (NOT on pooled thread)
@@ -188,5 +175,52 @@
         // when the reference count gets to zero we release the executor service
         _poolReference.releaseExecutorService();
     }
+
+    public static class AsynchReadPoolingFilter extends PoolingFilter
+    {
+
+        public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+        {
+            super(refCountingPool, name);
+        }
+
+        public void messageReceived(final NextFilter nextFilter, final IoSession session,
+                                final Object message) throws Exception
+        {
+
+            fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+        }
+
+
+    }
+
+    public static class AsynchWritePoolingFilter extends PoolingFilter
+    {
+
+        public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+        {
+            super(refCountingPool, name);
+        }
+
+
+        public void filterWrite(final NextFilter nextFilter, final IoSession session,
+                                final WriteRequest writeRequest) throws Exception
+        {
+            fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+        }
+
+    }
+
+    public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+    {
+        return new AsynchReadPoolingFilter(refCountingPool,name);
+    }
+
+
+    public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+    {
+        return new AsynchWritePoolingFilter(refCountingPool,name);
+    }
+
 }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Mon Jan  8 04:21:18 2007
@@ -29,11 +29,8 @@
     public void buildFilterChain(IoFilterChain chain) throws Exception
     {
         ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
-        PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS,
-                                                    "AsynchronousReadFilter");
-        PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS,
-                                                     "AsynchronousWriteFilter");
-
+        PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+        PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
         chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
         chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java?view=diff&rev=494042&r1=494041&r2=494042
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java Mon Jan  8 04:21:18 2007
@@ -39,7 +39,7 @@
         //Create Pool
         _executorService = ReferenceCountingExecutorService.getInstance();
         _executorService.acquireExecutorService();
-        _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS,
+        _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, 
                                   "AsynchronousWriteFilter");
 
     }