You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/17 17:20:09 UTC
svn commit: r816232 - in /qpid/branches/java-network-refactor/qpid/java:
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/org/apache/qpid/client/protocol/
client/src/main/java/org/apache/qpid/client/transport/ common/src/main/...
Author: aidan
Date: Thu Sep 17 15:19:54 2009
New Revision: 816232
URL: http://svn.apache.org/viewvc?rev=816232&view=rev
Log:
QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter,
ReadWriteThreadModel. Move the couple of necessary methods to Job.
Fix imports.
Removed:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
Modified:
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Sep 17 15:19:54 2009
@@ -62,10 +62,7 @@
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Event;
import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -172,14 +169,13 @@
_networkDriver = driver;
_codecFactory = new AMQCodecFactory(true, this);
-
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
- _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
- _poolReference.acquireExecutorService();
+
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +208,7 @@
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
@Override
public void run()
@@ -232,7 +228,7 @@
}
}
}
- }));
+ });
}
catch (Exception e)
{
@@ -459,14 +455,14 @@
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
{
@Override
public void run()
{
_networkDriver.send(buf);
}
- }));
+ });
}
public AMQShortString getContextKey()
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Sep 17 15:19:54 2009
@@ -20,20 +20,22 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -42,32 +44,29 @@
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.Event;
import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
* network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -107,9 +106,6 @@
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
@@ -191,9 +187,8 @@
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
- _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
@@ -436,7 +431,7 @@
_readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
@Override
public void run()
@@ -495,7 +490,7 @@
}
}
}
- }));
+ });
}
catch (Exception e)
{
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Thu Sep 17 15:19:54 2009
@@ -20,37 +20,20 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.net.InetAddressCachePolicy;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.GeneralSecurityException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.net.ssl.SSLEngine;
-
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Sep 17 15:19:54 2009
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
@@ -30,20 +36,12 @@
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.thread.QpidThreadExecutor;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Thu Sep 17 15:19:54 2009
@@ -20,20 +20,18 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class VmPipeTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);
Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Sep 17 15:19:54 2009
@@ -25,7 +25,6 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,40 +55,28 @@
*/
public class Job implements ReadWriteRunnable
{
+
+ /** Defines the maximum number of events that will be batched into a single job. */
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
/** Holds the queue of events that make up the job. */
- private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+ private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
/** Holds a status flag, that indicates when the job is actively running. */
private final AtomicBoolean _active = new AtomicBoolean();
- /** Holds the completion continuation, called upon completion of a run of the job. */
- private final JobCompletionHandler _completionHandler;
-
private final boolean _readJob;
+ private ReferenceCountingExecutorService _poolReference;
+
private final static Logger _logger = LoggerFactory.getLogger(Job.class);
- /**
- * Creates a new job that aggregates many continuations together.
- *
- * @param session The Mina session.
- * @param completionHandler The per job run, terminal continuation.
- * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
- * @param readJob
- */
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
- {
- _completionHandler = completionHandler;
- _maxEvents = maxEvents;
- _readJob = readJob;
- }
-
- public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
+ public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
{
- _completionHandler = completionHandler;
+ _poolReference = poolReference;
_maxEvents = maxEvents;
_readJob = readJob;
}
@@ -99,7 +86,7 @@
*
* @param evt The continuation to enqueue.
*/
- public void add(Event evt)
+ public void add(Runnable evt)
{
_eventQueue.add(evt);
}
@@ -113,14 +100,14 @@
int i = _maxEvents;
while( --i != 0 )
{
- Event e = _eventQueue.poll();
+ Runnable e = _eventQueue.poll();
if (e == null)
{
return true;
}
else
{
- e.process();
+ e.run();
}
}
return false;
@@ -162,11 +149,11 @@
if(processAll())
{
deactivate();
- _completionHandler.completed(this);
+ completed();
}
else
{
- _completionHandler.notCompleted(this);
+ notCompleted();
}
}
@@ -174,19 +161,6 @@
{
return _readJob;
}
-
- /**
- * Another interface for a continuation.
- *
- * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
- * Runnable or a custom Continuation interface.
- */
- static interface JobCompletionHandler
- {
- public void completed(Job job);
-
- public void notCompleted(final Job job);
- }
/**
* Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
@@ -194,7 +168,7 @@
* @param job The job.
* @param event The event to hand off asynchronously.
*/
- public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
{
job.add(event);
@@ -221,4 +195,59 @@
}
+ /**
+ * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+ * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
+ *
+ * @param session The Mina session to work in.
+ * @param job The job that completed.
+ */
+ public void completed()
+ {
+ if (!isComplete())
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
+ // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+ // Can the pool be shutdown at this point?
+ if (activate())
+ {
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+ }
+ }
+
+ public void notCompleted()
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
}
Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=816232&r1=816231&r2=816232&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Thu Sep 17 15:19:54 2009
@@ -28,8 +28,6 @@
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLEngine;
-
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
@@ -50,7 +48,6 @@
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -58,7 +55,6 @@
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.apache.qpid.transport.OpenException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,14 +144,6 @@
sc.setTcpNoDelay(config.getTcpNoDelay());
}
- // if we do not use the executor pool threading model we get the default
- // leader follower
- // implementation provided by MINA
- if (_executorPool)
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
if (sslFactory != null)
{
_sslFactory = sslFactory;
@@ -227,14 +215,6 @@
}
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
@@ -258,8 +238,6 @@
throw new OpenException("Could not open connection", _lastException);
}
_ioSession = future.getSession();
- ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession);
- ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession);
_ioSession.setAttachment(engine);
engine.setNetworkDriver(this);
_protocolEngine = engine;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org