You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/17 17:20:09 UTC

svn commit: r816232 - in /qpid/branches/java-network-refactor/qpid/java: broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/main/java/org/apache/qpid/client/transport/ common/src/main/...

Author: aidan
Date: Thu Sep 17 15:19:54 2009
New Revision: 816232

URL: http://svn.apache.org/viewvc?rev=816232&view=rev
Log:
QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter, 
ReadWriteThreadModel. Move the couple of  necessary methods to Job.

Fix imports. 

Removed:
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
Modified:
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Sep 17 15:19:54 2009
@@ -62,10 +62,7 @@
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Event;
 import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -172,14 +169,13 @@
         _networkDriver = driver;
         
         _codecFactory = new AMQCodecFactory(true, this);
-
-        ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
-        _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
-        _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+        _poolReference.acquireExecutorService();
+        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
         _actor.message(ConnectionMessages.CON_1001(null, null, false, false));
-        _poolReference.acquireExecutorService();
+
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +208,7 @@
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
             {
                 @Override
                 public void run()
@@ -232,7 +228,7 @@
                         }
                     }
                 }
-            }));
+            });
         }
         catch (Exception e)
         {
@@ -459,14 +455,14 @@
         final ByteBuffer buf = frame.toNioByteBuffer();
         _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
-        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
+        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
         {
             @Override
             public void run()
             {
                 _networkDriver.send(buf);
             }
-        }));
+        });
     }
 
     public AMQShortString getContextKey()

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Sep 17 15:19:54 2009
@@ -20,20 +20,22 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverHandler;
 import org.apache.qpid.client.failover.FailoverState;
@@ -42,32 +44,29 @@
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.Event;
 import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
  * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -107,9 +106,6 @@
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
  * <tr><td> Maintain fail-over state.
  * <tr><td>
  * </table>
@@ -191,9 +187,8 @@
         _protocolSession = new AMQProtocolSession(this, _connection);
         _stateManager = new AMQStateManager(_protocolSession);
         _codecFactory = new AMQCodecFactory(false, _protocolSession);
-        ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
-        _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
-        _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
         _poolReference.acquireExecutorService();
         _failoverHandler = new FailoverHandler(this);
     }
@@ -436,7 +431,7 @@
             _readBytes += msg.remaining();
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
 
-            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
             {
                 @Override
                 public void run()
@@ -495,7 +490,7 @@
                         }
                     }
                 }
-            }));
+            });
         }
         catch (Exception e)
         {

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Thu Sep 17 15:19:54 2009
@@ -20,37 +20,20 @@
  */
 package org.apache.qpid.client.transport;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
 import org.apache.qpid.client.SSLConfiguration;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.network.mina.MINANetworkDriver;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import sun.net.InetAddressCachePolicy;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.GeneralSecurityException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.net.ssl.SSLEngine;
-
 public class SocketTransportConnection implements ITransportConnection
 {
     private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Sep 17 15:19:54 2009
@@ -20,6 +20,12 @@
  */
 package org.apache.qpid.client.transport;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoServiceConfig;
@@ -30,20 +36,12 @@
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.thread.QpidThreadExecutor;
 import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
 /**
  * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
  * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Thu Sep 17 15:19:54 2009
@@ -20,20 +20,18 @@
  */
 package org.apache.qpid.client.transport;
 
+import java.io.IOException;
+
 import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class VmPipeTransportConnection implements ITransportConnection
 {
     private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Sep 17 15:19:54 2009
@@ -25,7 +25,6 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.mina.common.IoSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,40 +55,28 @@
  */
 public class Job implements ReadWriteRunnable
 {
+    
+    /** Defines the maximum number of events that will be batched into a single job. */
+    public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
     /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
     private final int _maxEvents;
 
     /** Holds the queue of events that make up the job. */
-    private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+    private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
 
     /** Holds a status flag, that indicates when the job is actively running. */
     private final AtomicBoolean _active = new AtomicBoolean();
 
-    /** Holds the completion continuation, called upon completion of a run of the job. */
-    private final JobCompletionHandler _completionHandler;
-
     private final boolean _readJob;
 
+    private ReferenceCountingExecutorService _poolReference;
+
     private final static Logger _logger = LoggerFactory.getLogger(Job.class);
     
-    /**
-     * Creates a new job that aggregates many continuations together.
-     *
-     * @param session           The Mina session.
-     * @param completionHandler The per job run, terminal continuation.
-     * @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
-     * @param readJob
-     */
-    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
-    {
-        _completionHandler = completionHandler;
-        _maxEvents = maxEvents;
-        _readJob = readJob;
-    }
-
-    public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
+    public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
     {
-        _completionHandler = completionHandler;
+        _poolReference = poolReference;
         _maxEvents = maxEvents;
         _readJob = readJob;
     }
@@ -99,7 +86,7 @@
      *
      * @param evt The continuation to enqueue.
      */
-    public void add(Event evt)
+    public void add(Runnable evt)
     {
         _eventQueue.add(evt);
     }
@@ -113,14 +100,14 @@
         int i = _maxEvents;
         while( --i != 0 )
         {
-            Event e = _eventQueue.poll();
+            Runnable e = _eventQueue.poll();
             if (e == null)
             {
                 return true;
             }
             else
             {
-                e.process();
+                e.run();
             }
         }
         return false;
@@ -162,11 +149,11 @@
         if(processAll())
         {
             deactivate();
-            _completionHandler.completed(this);
+            completed();
         }
         else
         {
-            _completionHandler.notCompleted(this);
+            notCompleted();
         }
     }
 
@@ -174,19 +161,6 @@
     {
         return _readJob;
     }
-
-    /**
-     * Another interface for a continuation.
-     *
-     * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
-     *       Runnable or a custom Continuation interface.
-     */
-    static interface JobCompletionHandler
-    {
-        public void completed(Job job);
-
-        public void notCompleted(final Job job);
-    }
     
     /**
      * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
@@ -194,7 +168,7 @@
      * @param job The job.
      * @param event   The event to hand off asynchronously.
      */
-    public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+    public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
     {
 
         job.add(event);
@@ -221,4 +195,59 @@
 
     }
     
+    /**
+     * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+     * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
+     *
+     * @param session The Mina session to work in.
+     * @param job     The job that completed.
+     */
+    public void completed()
+    {
+        if (!isComplete())
+        {
+            final ExecutorService pool = _poolReference.getPool();
+
+            if(pool == null)
+            {
+                return;
+            }
+
+
+            // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+            // Can the pool be shutdown at this point?
+            if (activate())
+            {
+                try
+                {
+                    pool.execute(this);
+                }
+                catch(RejectedExecutionException e)
+                {
+                    _logger.warn("Thread pool shutdown while tasks still outstanding");
+                }
+
+            }
+        }
+    }
+
+    public void notCompleted()
+    {
+        final ExecutorService pool = _poolReference.getPool();
+
+        if(pool == null)
+        {
+            return;
+        }
+
+        try
+        {
+            pool.execute(this);
+        }
+        catch(RejectedExecutionException e)
+        {
+            _logger.warn("Thread pool shutdown while tasks still outstanding");
+        }
+    }
+    
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Thu Sep 17 15:19:54 2009
@@ -28,8 +28,6 @@
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLEngine;
-
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoAcceptor;
@@ -50,7 +48,6 @@
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.NewThreadExecutor;
 import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.ssl.SSLContextFactory;
@@ -58,7 +55,6 @@
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.NetworkDriverConfiguration;
 import org.apache.qpid.transport.OpenException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,14 +144,6 @@
             sc.setTcpNoDelay(config.getTcpNoDelay());
         }
 
-        // if we do not use the executor pool threading model we get the default
-        // leader follower
-        // implementation provided by MINA
-        if (_executorPool)
-        {
-            sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
-        }
-
         if (sslFactory != null)
         {
             _sslFactory = sslFactory;
@@ -227,14 +215,6 @@
         }
 
         SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
-
-        // if we do not use our own thread model we get the MINA default which is to use
-        // its own leader-follower model
-        boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
-        if (readWriteThreading)
-        {
-            cfg.setThreadModel(ReadWriteThreadModel.getInstance());
-        }
         
         SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
         scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() :  true);
@@ -258,8 +238,6 @@
             throw new OpenException("Could not open connection", _lastException);
         }
         _ioSession = future.getSession();
-        ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession);
-        ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession);
         _ioSession.setAttachment(engine);
         engine.setNetworkDriver(this);
         _protocolEngine = engine;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org