You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 17:37:16 UTC

svn commit: r501096 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/clien...

Author: rgreig
Date: Mon Jan 29 08:37:13 2007
New Revision: 501096

URL: http://svn.apache.org/viewvc?view=rev&rev=501096
Log:
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job


Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
    incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
    incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Jan 29 08:37:13 2007
@@ -324,7 +324,7 @@
             // implementation provided by MINA
             if (connectorConfig.enableExecutorPool)
             {
-                sconfig.setThreadModel(new ReadWriteThreadModel());
+                sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
             }
 
             if (connectorConfig.enableNonSSL)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 29 08:37:13 2007
@@ -23,10 +23,13 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -119,6 +122,23 @@
 
 
         _codecFactory = codecFactory;
+
+
+        try
+        {
+            IoServiceConfig config = session.getServiceConfig();
+            ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
+            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+        }
+        catch (RuntimeException e)
+        {
+            e.printStackTrace();
+        //    throw e;
+
+        }
+
+
 
 
 //        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Jan 29 08:37:13 2007
@@ -127,10 +127,17 @@
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                 FieldTable clientProperties = FieldTableFactory.newFieldTable();
 
-                clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
-                clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
-                clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
-                clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+                try
+                {
+                    clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
+                    clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
+                    clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
+                    clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+                }
+                catch (Exception e)
+                {
+                    e.printStackTrace();
+                }
                 // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
                 // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                 // Be aware of possible changes to parameter order as versions change.
@@ -141,6 +148,7 @@
                     new AMQShortString(selectedLocale),	// locale
                     new AMQShortString(mechanism),	// mechanism
                     saslResponse));	// response
+                
             }
             catch (UnsupportedEncodingException e)
             {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 29 08:37:13 2007
@@ -24,12 +24,14 @@
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
@@ -131,6 +133,19 @@
             session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
         }
 
+
+        try
+        {
+
+            ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+        }
+        catch (RuntimeException e)
+        {
+            e.printStackTrace();
+        }
+  
         _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
         _protocolSession.init();
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Mon Jan 29 08:37:13 2007
@@ -71,7 +71,7 @@
         boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
         if (readWriteThreading)
         {
-            cfg.setThreadModel(new ReadWriteThreadModel());
+            cfg.setThreadModel(ReadWriteThreadModel.getInstance());
         }
 
         SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Mon Jan 29 08:37:13 2007
@@ -70,7 +70,7 @@
 
         IoServiceConfig config = _acceptor.getDefaultConfig();
 
-        config.setThreadModel(new ReadWriteThreadModel());
+        config.setThreadModel(ReadWriteThreadModel.getInstance());
     }
 
     public static ITransportConnection getInstance() throws AMQTransportConnectionException

Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java Mon Jan 29 08:37:13 2007
@@ -82,7 +82,7 @@
         sc.setSendBufferSize(32768);
         sc.setReceiveBufferSize(32768);
 
-        config.setThreadModel(new ReadWriteThreadModel());
+        config.setThreadModel(ReadWriteThreadModel.getInstance());
 
         acceptor.bind(new InetSocketAddress(PORT),
                       new TestHandler());

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java Mon Jan 29 08:37:13 2007
@@ -83,7 +83,7 @@
             // implementation provided by MINA
             if (connectorConfig.enableExecutorPool)
             {
-                sconfig.setThreadModel(new ReadWriteThreadModel());
+                sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
             }
 
             String host = InetAddress.getLocalHost().getHostName();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Mon Jan 29 08:37:13 2007
@@ -51,4 +51,9 @@
         AMQType type = AMQTypeMap.getType(buffer.get());
         return new AMQTypedValue(type, buffer);
     }
+
+    public String toString()
+    {
+        return "["+getType()+": "+getValue()+"]";
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Mon Jan 29 08:37:13 2007
@@ -164,4 +164,14 @@
                 protocolMajor + "." +  protocolMinor + " not found in protocol version list.");
         }
     }
+
+    public String toString()
+    {
+        StringBuffer buffer = new StringBuffer(new String(header));
+        buffer.append(Integer.toHexString(protocolClass));
+        buffer.append(Integer.toHexString(protocolInstance));
+        buffer.append(Integer.toHexString(protocolMajor));
+        buffer.append(Integer.toHexString(protocolMinor));
+        return buffer.toString();
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Mon Jan 29 08:37:13 2007
@@ -30,13 +30,13 @@
  * Holds events for a session that will be processed asynchronously by
  * the thread pool in PoolingFilter.
  */
-class Job implements Runnable
+public class Job implements Runnable
 {
     private final int _maxEvents;
     private final IoSession _session;
     private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
     private final AtomicBoolean _active = new AtomicBoolean();
-    private final AtomicInteger _refCount = new AtomicInteger();
+    //private final AtomicInteger _refCount = new AtomicInteger();
     private final JobCompletionHandler _completionHandler;
 
     Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
@@ -45,21 +45,21 @@
         _completionHandler = completionHandler;
         _maxEvents = maxEvents;
     }
-
-    void acquire()
-    {
-        _refCount.incrementAndGet();
-    }
-
-    void release()
-    {
-        _refCount.decrementAndGet();
-    }
-
-    boolean isReferenced()
-    {
-        return _refCount.get() > 0;
-    }
+//
+//    void acquire()
+//    {
+//        _refCount.incrementAndGet();
+//    }
+//
+//    void release()
+//    {
+//        _refCount.decrementAndGet();
+//    }
+//
+//    boolean isReferenced()
+//    {
+//        return _refCount.get() > 0;
+//    }
 
     void add(Event evt)
     {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Mon Jan 29 08:37:13 2007
@@ -48,7 +48,7 @@
     void fireAsynchEvent(IoSession session, Event event)
     {
         Job job = getJobForSession(session);
-        job.acquire(); //prevents this job being removed from _jobs
+ //       job.acquire(); //prevents this job being removed from _jobs
         job.add(event);
 
         //Additional checks on pool to check that it hasn't shutdown.
@@ -60,10 +60,25 @@
 
     }
 
+    public void createNewJobForSession(IoSession session)
+    {
+        Job job = new Job(session, this, _maxEvents);
+        session.setAttribute(_name, job);
+    }
+
     private Job getJobForSession(IoSession session)
     {
-        Job job = _jobs.get(session);
-        return job == null ? createJobForSession(session) : job;
+        return (Job) session.getAttribute(_name);
+
+/*        if(job == null)
+        {
+            System.err.println("Error in " + _name);
+            Thread.dumpStack();
+        }
+
+
+        job = _jobs.get(session);
+        return job == null ? createJobForSession(session) : job;*/
     }
 
     private Job createJobForSession(IoSession session)
@@ -81,15 +96,16 @@
     //Job.JobCompletionHandler
     public void completed(IoSession session, Job job)
     {
-        if (job.isComplete())
-        {
-            job.release();
-            if (!job.isReferenced())
-            {
-                _jobs.remove(session);
-            }
-        }
-        else
+//        if (job.isComplete())
+//        {
+//            job.release();
+//            if (!job.isReferenced())
+//            {
+//                _jobs.remove(session);
+//            }
+//        }
+//        else
+        if(!job.isComplete())
         {
             // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
             //                       Can the pool be shutdown at this point?

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Mon Jan 29 08:37:13 2007
@@ -26,12 +26,38 @@
 
 public class ReadWriteThreadModel implements ThreadModel
 {
+
+    private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+
+    private final PoolingFilter _asynchronousReadFilter;
+    private final PoolingFilter _asynchronousWriteFilter;
+
+    private ReadWriteThreadModel()
+    {
+        final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+        _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+        _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
+    }
+
+    public PoolingFilter getAsynchronousReadFilter()
+    {
+        return _asynchronousReadFilter;
+    }
+
+    public PoolingFilter getAsynchronousWriteFilter()
+    {
+        return _asynchronousWriteFilter;
+    }
+
     public void buildFilterChain(IoFilterChain chain) throws Exception
     {
-        ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
-        PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
-        PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
-        chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
-        chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+
+        chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
+        chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
+    }
+
+    public static ReadWriteThreadModel getInstance()
+    {
+        return _instance;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java Mon Jan 29 08:37:13 2007
@@ -36,25 +36,32 @@
 
     public void setUp()
     {
+
         //Create Pool
         _executorService = ReferenceCountingExecutorService.getInstance();
         _executorService.acquireExecutorService();
-        _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, 
+        _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
                                   "AsynchronousWriteFilter");
 
     }
 
     public void testRejectedExecution() throws Exception
     {
-        _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message"));
+
+        TestSession testSession = new TestSession();
+        _pool.createNewJobForSession(testSession);
+        _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
 
         //Shutdown the pool
         _executorService.getPool().shutdownNow();
 
         try
         {
+
+            testSession = new TestSession();
+            _pool.createNewJobForSession(testSession);
             //prior to fix for QPID-172 this would throw RejectedExecutionException
-            _pool.filterWrite(null, new TestSession(), null);
+            _pool.filterWrite(null, testSession, null);
         }
         catch (RejectedExecutionException rje)
         {

Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java Mon Jan 29 08:37:13 2007
@@ -24,9 +24,13 @@
 
 import java.net.SocketAddress;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class TestSession implements IoSession
 {
+    private final ConcurrentMap attributes = new ConcurrentHashMap();
+
     public TestSession()
     {
     }
@@ -68,42 +72,42 @@
 
     public Object getAttachment()
     {
-        return null;  //TODO
+        return getAttribute("");
     }
 
     public Object setAttachment(Object attachment)
     {
-        return null;  //TODO
+        return setAttribute("",attachment);
     }
 
     public Object getAttribute(String key)
     {
-        return null;  //TODO
+        return attributes.get(key);
     }
 
     public Object setAttribute(String key, Object value)
     {
-        return null;  //TODO
+        return attributes.put(key,value);
     }
 
     public Object setAttribute(String key)
     {
-        return null;  //TODO
+        return attributes.put(key, Boolean.TRUE);
     }
 
     public Object removeAttribute(String key)
     {
-        return null;  //TODO
+        return attributes.remove(key);
     }
 
     public boolean containsAttribute(String key)
     {
-        return false;  //TODO
+        return attributes.containsKey(key);
     }
 
     public Set getAttributeKeys()
     {
-        return null;  //TODO
+        return attributes.keySet();
     }
 
     public TransportType getTransportType()

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Mon Jan 29 08:37:13 2007
@@ -106,6 +106,8 @@
                 // TODO: fix hardcoded protocol version data
                 TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
                                                                                            (byte)0,
+                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
                                                                                            null,
                                                                                            false,
                                                                                            false,

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Jan 29 08:37:13 2007
@@ -153,7 +153,10 @@
     {
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Establish some way to determine the version for the test.
-        BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0);
+        BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
+                                                       BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                       BasicPublishBody.getMethod((byte)8,(byte)0),
+                                                       null,false,false,new AMQShortString(id),0);
         
         return request;
     }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Mon Jan 29 08:37:13 2007
@@ -169,6 +169,8 @@
         // TODO: Establish some way to determine the version for the test.
         BasicPublishBody publish = new BasicPublishBody((byte)8,
                                                         (byte)0,
+                                                        BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                        BasicPublishBody.getMethod((byte)8,(byte)0),
                                                         null,
                                                         immediate,
                                                         false,

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Mon Jan 29 08:37:13 2007
@@ -99,6 +99,8 @@
             // TODO: Establish some way to determine the version for the test.
             BasicPublishBody publishBody = new BasicPublishBody((byte)8,
                                                                 (byte)0,
+                                                                BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                                BasicPublishBody.getMethod((byte)8,(byte)0),
                                                                 new AMQShortString("someExchange"),
                                                                 false,
                                                                 false,

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Mon Jan 29 08:37:13 2007
@@ -63,6 +63,8 @@
         // TODO: Establish some way to determine the version for the test.
         BasicPublishBody publish = new BasicPublishBody((byte)8,
                                                         (byte)0,
+                                                        BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                        BasicPublishBody.getMethod((byte)8,(byte)0),
                                                         null,
                                                         immediate,
                                                         false,

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Mon Jan 29 08:37:13 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.VersionSpecificRegistry;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -167,5 +168,10 @@
     public byte getProtocolMinorVersion()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public VersionSpecificRegistry getRegistry()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Mon Jan 29 08:37:13 2007
@@ -52,7 +52,7 @@
     {
     }
 
-    public void removeMessage(StoreContext s, long messageId)
+    public void removeMessage(StoreContext s, Long messageId)
     {
     }
 
@@ -82,27 +82,27 @@
         return null;
     }
 
-    public long getNewMessageId()
+    public Long getNewMessageId()
     {
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+    public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
     {
 
     }
 
-    public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
+    public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
     {
 
     }
 
-    public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
     {
         return null;
     }
 
-    public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+    public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
     {
         return null;
     }
@@ -112,12 +112,12 @@
 
     }
 
-    public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
     {
 
     }
 
-    public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
     {
 
     }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=501096&r1=501095&r2=501096
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Mon Jan 29 08:37:13 2007
@@ -53,6 +53,8 @@
         // TODO: fix hardcoded protocol version data
         AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
                                                                                            (byte)0,
+                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
                                                                                            null,
                                                                                            false,
                                                                                            false,
@@ -82,6 +84,8 @@
         // TODO: fix hardcoded protocol version data
         AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
                                                                                            (byte)0,
+                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
+                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
                                                                                            null,
                                                                                            false,
                                                                                            false,