You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 17:37:16 UTC
svn commit: r501096 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/org/apache/qpid/client/handler/
client/src/main/java/org/apache/qpid/clien...
Author: rgreig
Date: Mon Jan 29 08:37:13 2007
New Revision: 501096
URL: http://svn.apache.org/viewvc?view=rev&rev=501096
Log:
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Jan 29 08:37:13 2007
@@ -324,7 +324,7 @@
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
if (connectorConfig.enableNonSSL)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 29 08:37:13 2007
@@ -23,10 +23,13 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -119,6 +122,23 @@
_codecFactory = codecFactory;
+
+
+ try
+ {
+ IoServiceConfig config = session.getServiceConfig();
+ ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ // throw e;
+
+ }
+
+
// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Jan 29 08:37:13 2007
@@ -127,10 +127,17 @@
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
- clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
- clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
- clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ try
+ {
+ clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
+ clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
+ clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
+ clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
@@ -141,6 +148,7 @@
new AMQShortString(selectedLocale), // locale
new AMQShortString(mechanism), // mechanism
saslResponse)); // response
+
}
catch (UnsupportedEncodingException e)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 29 08:37:13 2007
@@ -24,12 +24,14 @@
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -131,6 +133,19 @@
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
+
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Mon Jan 29 08:37:13 2007
@@ -71,7 +71,7 @@
boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
if (readWriteThreading)
{
- cfg.setThreadModel(new ReadWriteThreadModel());
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
}
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Mon Jan 29 08:37:13 2007
@@ -70,7 +70,7 @@
IoServiceConfig config = _acceptor.getDefaultConfig();
- config.setThreadModel(new ReadWriteThreadModel());
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
}
public static ITransportConnection getInstance() throws AMQTransportConnectionException
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java Mon Jan 29 08:37:13 2007
@@ -82,7 +82,7 @@
sc.setSendBufferSize(32768);
sc.setReceiveBufferSize(32768);
- config.setThreadModel(new ReadWriteThreadModel());
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
acceptor.bind(new InetSocketAddress(PORT),
new TestHandler());
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java Mon Jan 29 08:37:13 2007
@@ -83,7 +83,7 @@
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
String host = InetAddress.getLocalHost().getHostName();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Mon Jan 29 08:37:13 2007
@@ -51,4 +51,9 @@
AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Mon Jan 29 08:37:13 2007
@@ -164,4 +164,14 @@
protocolMajor + "." + protocolMinor + " not found in protocol version list.");
}
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(header));
+ buffer.append(Integer.toHexString(protocolClass));
+ buffer.append(Integer.toHexString(protocolInstance));
+ buffer.append(Integer.toHexString(protocolMajor));
+ buffer.append(Integer.toHexString(protocolMinor));
+ return buffer.toString();
+ }
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Mon Jan 29 08:37:13 2007
@@ -30,13 +30,13 @@
* Holds events for a session that will be processed asynchronously by
* the thread pool in PoolingFilter.
*/
-class Job implements Runnable
+public class Job implements Runnable
{
private final int _maxEvents;
private final IoSession _session;
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
private final AtomicBoolean _active = new AtomicBoolean();
- private final AtomicInteger _refCount = new AtomicInteger();
+ //private final AtomicInteger _refCount = new AtomicInteger();
private final JobCompletionHandler _completionHandler;
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
@@ -45,21 +45,21 @@
_completionHandler = completionHandler;
_maxEvents = maxEvents;
}
-
- void acquire()
- {
- _refCount.incrementAndGet();
- }
-
- void release()
- {
- _refCount.decrementAndGet();
- }
-
- boolean isReferenced()
- {
- return _refCount.get() > 0;
- }
+//
+// void acquire()
+// {
+// _refCount.incrementAndGet();
+// }
+//
+// void release()
+// {
+// _refCount.decrementAndGet();
+// }
+//
+// boolean isReferenced()
+// {
+// return _refCount.get() > 0;
+// }
void add(Event evt)
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Mon Jan 29 08:37:13 2007
@@ -48,7 +48,7 @@
void fireAsynchEvent(IoSession session, Event event)
{
Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
+ // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
//Additional checks on pool to check that it hasn't shutdown.
@@ -60,10 +60,25 @@
}
+ public void createNewJobForSession(IoSession session)
+ {
+ Job job = new Job(session, this, _maxEvents);
+ session.setAttribute(_name, job);
+ }
+
private Job getJobForSession(IoSession session)
{
- Job job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;
+ return (Job) session.getAttribute(_name);
+
+/* if(job == null)
+ {
+ System.err.println("Error in " + _name);
+ Thread.dumpStack();
+ }
+
+
+ job = _jobs.get(session);
+ return job == null ? createJobForSession(session) : job;*/
}
private Job createJobForSession(IoSession session)
@@ -81,15 +96,16 @@
//Job.JobCompletionHandler
public void completed(IoSession session, Job job)
{
- if (job.isComplete())
- {
- job.release();
- if (!job.isReferenced())
- {
- _jobs.remove(session);
- }
- }
- else
+// if (job.isComplete())
+// {
+// job.release();
+// if (!job.isReferenced())
+// {
+// _jobs.remove(session);
+// }
+// }
+// else
+ if(!job.isComplete())
{
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Mon Jan 29 08:37:13 2007
@@ -26,12 +26,38 @@
public class ReadWriteThreadModel implements ThreadModel
{
+
+ private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+
+ private final PoolingFilter _asynchronousReadFilter;
+ private final PoolingFilter _asynchronousWriteFilter;
+
+ private ReadWriteThreadModel()
+ {
+ final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+ _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
+ }
+
+ public PoolingFilter getAsynchronousReadFilter()
+ {
+ return _asynchronousReadFilter;
+ }
+
+ public PoolingFilter getAsynchronousWriteFilter()
+ {
+ return _asynchronousWriteFilter;
+ }
+
public void buildFilterChain(IoFilterChain chain) throws Exception
{
- ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
- PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+
+ chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
+ chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
+ }
+
+ public static ReadWriteThreadModel getInstance()
+ {
+ return _instance;
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java Mon Jan 29 08:37:13 2007
@@ -36,25 +36,32 @@
public void setUp()
{
+
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}
public void testRejectedExecution() throws Exception
{
- _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message"));
+
+ TestSession testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
+ _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
//Shutdown the pool
_executorService.getPool().shutdownNow();
try
{
+
+ testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
//prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, new TestSession(), null);
+ _pool.filterWrite(null, testSession, null);
}
catch (RejectedExecutionException rje)
{
Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java Mon Jan 29 08:37:13 2007
@@ -24,9 +24,13 @@
import java.net.SocketAddress;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
public class TestSession implements IoSession
{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
public TestSession()
{
}
@@ -68,42 +72,42 @@
public Object getAttachment()
{
- return null; //TODO
+ return getAttribute("");
}
public Object setAttachment(Object attachment)
{
- return null; //TODO
+ return setAttribute("",attachment);
}
public Object getAttribute(String key)
{
- return null; //TODO
+ return attributes.get(key);
}
public Object setAttribute(String key, Object value)
{
- return null; //TODO
+ return attributes.put(key,value);
}
public Object setAttribute(String key)
{
- return null; //TODO
+ return attributes.put(key, Boolean.TRUE);
}
public Object removeAttribute(String key)
{
- return null; //TODO
+ return attributes.remove(key);
}
public boolean containsAttribute(String key)
{
- return false; //TODO
+ return attributes.containsKey(key);
}
public Set getAttributeKeys()
{
- return null; //TODO
+ return attributes.keySet();
}
public TransportType getTransportType()
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Mon Jan 29 08:37:13 2007
@@ -106,6 +106,8 @@
// TODO: fix hardcoded protocol version data
TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
false,
false,
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Jan 29 08:37:13 2007
@@ -153,7 +153,10 @@
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0);
+ BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
+ null,false,false,new AMQShortString(id),0);
return request;
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Mon Jan 29 08:37:13 2007
@@ -169,6 +169,8 @@
// TODO: Establish some way to determine the version for the test.
BasicPublishBody publish = new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
immediate,
false,
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Mon Jan 29 08:37:13 2007
@@ -99,6 +99,8 @@
// TODO: Establish some way to determine the version for the test.
BasicPublishBody publishBody = new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
new AMQShortString("someExchange"),
false,
false,
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Mon Jan 29 08:37:13 2007
@@ -63,6 +63,8 @@
// TODO: Establish some way to determine the version for the test.
BasicPublishBody publish = new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
immediate,
false,
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Mon Jan 29 08:37:13 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -167,5 +168,10 @@
public byte getProtocolMinorVersion()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
}
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Mon Jan 29 08:37:13 2007
@@ -52,7 +52,7 @@
{
}
- public void removeMessage(StoreContext s, long messageId)
+ public void removeMessage(StoreContext s, Long messageId)
{
}
@@ -82,27 +82,27 @@
return null;
}
- public long getNewMessageId()
+ public Long getNewMessageId()
{
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
{
}
- public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
}
- public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
return null;
}
- public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
{
return null;
}
@@ -112,12 +112,12 @@
}
- public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
{
}
- public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
{
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Mon Jan 29 08:37:13 2007
@@ -53,6 +53,8 @@
// TODO: fix hardcoded protocol version data
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
false,
false,
@@ -82,6 +84,8 @@
// TODO: fix hardcoded protocol version data
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
false,
false,