You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/06/19 11:02:01 UTC

svn commit: r669431 [1/2] - in /incubator/qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/ broker/src/main/grammar/ bro...

Author: rgodfrey
Date: Thu Jun 19 02:01:59 2008
New Revision: 669431

URL: http://svn.apache.org/viewvc?rev=669431&view=rev
Log:
QPID-950 : Broker refactoring, copied / merged from branch

Added:
    incubator/qpid/trunk/qpid/java/broker/src/
      - copied from r662382, incubator/qpid/branches/broker-queue-refactor/java/broker/src/
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
Removed:
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
Modified:
    incubator/qpid/trunk/qpid/java/010ExcludeList
    incubator/qpid/trunk/qpid/java/010ExcludeList-store
    incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
    incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
    incubator/qpid/trunk/qpid/java/systests/pom.xml
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java

Modified: incubator/qpid/trunk/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList Thu Jun 19 02:01:59 2008
@@ -46,4 +46,8 @@
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
 // the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest

Modified: incubator/qpid/trunk/qpid/java/010ExcludeList-store
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList-store?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList-store (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList-store Thu Jun 19 02:01:59 2008
@@ -41,4 +41,9 @@
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
 // the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
+

Modified: incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Thu Jun 19 02:01:59 2008
@@ -22,6 +22,8 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.management.JMException;
 import javax.management.openmbean.OpenDataException;
@@ -34,7 +36,7 @@
 import org.apache.qpid.server.exchange.AbstractExchange;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -191,7 +193,7 @@
         return false;
     }
 
-    public void route(AMQMessage payload) throws AMQException
+    public void route(IncomingMessage payload) throws AMQException
     {
         
         Long value = new Long(SizeOf.getUsedMemory());
@@ -201,17 +203,14 @@
         headers.put(key, value);
         ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-        
-        payload.enqueue(q);
+
+        Collection<AMQQueue> queues =  new ArrayList<AMQQueue>();
+        queues.add(q);
+        payload.enqueue(queues);
         
     }
 
-	@Override
-	public Map<AMQShortString, List<AMQQueue>> getBindings() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
+	
 	public boolean isBound(AMQShortString routingKey, FieldTable arguments,
 			AMQQueue queue) {
 		// TODO Auto-generated method stub

Modified: incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Thu Jun 19 02:01:59 2008
@@ -1,24 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
 package org.apache.qpid.extras.exchanges.example;
 
 import java.util.List;
@@ -28,7 +7,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -102,7 +81,7 @@
     {
     }
 
-    public void route(AMQMessage message) throws AMQException
+    public void route(IncomingMessage message) throws AMQException
     {
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj Thu Jun 19 02:01:59 2008
@@ -94,7 +94,7 @@
             return this.JmsSelector();
         }
         catch (Throwable e) {
-	        throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+	        throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql,e);
         }
 
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Jun 19 02:01:59 2008
@@ -67,7 +67,7 @@
         if (exchType == null)
         {
 
-            throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
+            throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
         }
         Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete);
         return e;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Thu Jun 19 02:01:59 2008
@@ -101,7 +101,7 @@
             else if (!exchange.getType().equals(body.getType()))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
             }
 
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java Thu Jun 19 02:01:59 2008
@@ -1,37 +1,37 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
-
-public enum Permission
-{
-    CONSUME,
-    PUBLISH,
-    CREATE,
-    ACCESS,
-    BIND,
-    UNBIND,
-    DELETE,
-    PURGE
-}
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public enum Permission
+{
+    CONSUME,
+    PUBLISH,
+    CREATE,
+    ACCESS,
+    BIND,
+    UNBIND,
+    DELETE,
+    PURGE
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Jun 19 02:01:59 2008
@@ -1014,6 +1014,7 @@
         }
     }
 
+
     /**
      * Declares the named queue.
      *
@@ -1031,18 +1032,40 @@
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
                             final boolean exclusive) throws AMQException
     {
+        createQueue(name, autoDelete, durable, exclusive, null);
+    }
+
+
+    /**
+     * Declares the named queue.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param name       The name of the queue to declare.
+     * @param autoDelete
+     * @param durable    Flag to indicate that the queue is durable.
+     * @param exclusive  Flag to indicate that the queue is exclusive to this client.
+     * @param arguments  Arguments used to set special properties of the queue
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     *
+     * @todo Be aware of possible changes to parameter order as versions change.
+     */
+    public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+                            final boolean exclusive, final Map<String, Object> arguments) throws AMQException
+    {
         new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendCreateQueue(name, autoDelete, durable, exclusive);
+                sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
                 return null;
             }
         }, _connection).execute();
     }
 
     public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
-            final boolean exclusive)throws AMQException, FailoverException;
+            final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException;
     /**
      * Creates a QueueReceiver
      *

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Jun 19 02:01:59 2008
@@ -243,13 +243,14 @@
      * @param durable    If set when creating a new queue,
      *                   the queue will be marked as durable.
      * @param exclusive  Exclusive queues can only be used from one connection at a time.
+     * @param arguments  Exclusive queues can only be used from one connection at a time.
      * @throws AMQException
      * @throws FailoverException
      */
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
-                                final boolean exclusive) throws AMQException, FailoverException
+                                final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
     {
-        getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
+        getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION,
                                       autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
                                       exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
         // We need to sync so that we get notify of an error.
@@ -594,7 +595,7 @@
         try
         {
             // this is done so that we can produce to a temporary queue beofre we create a consumer
-            sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
+            sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null);
             sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
             result.setQueueName(result.getRoutingKey());
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Jun 19 02:01:59 2008
@@ -40,6 +40,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public final class AMQSession_0_8 extends AMQSession
 {
 
@@ -125,10 +127,19 @@
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
     }
 
-    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
             FailoverException
     {
-        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+        FieldTable table = null;
+        if(arguments != null && !arguments.isEmpty())
+        {
+            table = new FieldTable();
+            for(Map.Entry<String, Object> entry : arguments.entrySet())
+            {
+                table.setObject(entry.getKey(), entry.getValue());
+            }
+        }
+        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
         AMQFrame queueDeclare = body.generateFrame(_channelId);
         getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
@@ -412,6 +423,28 @@
         return subscriber;
     }
 
+
+
+
+    public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+    {
+        new FailoverRetrySupport<Object, AMQException>(
+                new FailoverProtectedOperation<Object, AMQException>()
+                {
+                    public Object execute() throws AMQException, FailoverException
+                    {
+
+                        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+                        // todo send low water mark when protocol allows.
+                        // todo Be aware of possible changes to parameter order as versions change.
+                        getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+                  
+                        return null;
+                    }
+                 }, _connection).execute();
+    }
+
     class QueueDeclareOkHandler extends SpecificMethodFrameListener
     {
 

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Thu Jun 19 02:01:59 2008
@@ -206,16 +206,27 @@
 
             MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
 
-            for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+            int msg;
+            for (msg = 0; msg < (MSG_COUNT / 2); msg++)
             {
 
-                assertTrue(_consumer1.receive(3000) != null);
+
+                final Message message = _consumer1.receive(1000);
+                if(message == null)
+                {
+                    break;
+                }
+
             }
 
-            for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+            _consumer1.close();
+            _clientSession1.close();
+
+            for (; msg < MSG_COUNT ; msg++)
             {
-                assertTrue(consumer2.receive(3000) != null);
+                assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null);
             }
+
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java Thu Jun 19 02:01:59 2008
@@ -62,7 +62,6 @@
         private static final class FixedSizeByteBuffer extends ByteBuffer
         {
             private java.nio.ByteBuffer buf;
-            private int refCount = 1;
             private int mark = -1;
 
 
@@ -70,36 +69,14 @@
             {
                 this.buf = buf;
                 buf.order( ByteOrder.BIG_ENDIAN );
-                refCount = 1;
             }
 
             public synchronized void acquire()
             {
-                if( refCount <= 0 )
-                {
-                    throw new IllegalStateException( "Already released buffer." );
-                }
-
-                refCount ++;
             }
 
             public void release()
             {
-                synchronized( this )
-                {
-                    if( refCount <= 0 )
-                    {
-                        refCount = 0;
-                        throw new IllegalStateException(
-                                "Already released buffer.  You released the buffer too many times." );
-                    }
-
-                    refCount --;
-                    if( refCount > 0)
-                    {
-                        return;
-                    }
-                }
             }
 
             public java.nio.ByteBuffer buf()
@@ -157,50 +134,12 @@
                 {
                     if( newCapacity > capacity() )
                     {
-                        // Allocate a new buffer and transfer all settings to it.
-                        int pos = position();
-                        int limit = limit();
-                        ByteOrder bo = order();
-
-                        capacity0( newCapacity );
-                        buf.limit( limit );
-                        if( mark >= 0 )
-                        {
-                            buf.position( mark );
-                            buf.mark();
-                        }
-                        buf.position( pos );
-                        buf.order( bo );
+                        throw new IllegalArgumentException();
                     }
 
                     return this;
                 }
 
-            protected void capacity0( int requestedCapacity )
-            {
-                int newCapacity = MINIMUM_CAPACITY;
-                while( newCapacity < requestedCapacity )
-                {
-                    newCapacity <<= 1;
-                }
-
-                java.nio.ByteBuffer oldBuf = this.buf;
-                java.nio.ByteBuffer newBuf;
-                if( isDirect() )
-                {
-                    newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
-                }
-                else
-                {
-                    newBuf = java.nio.ByteBuffer.allocate( newCapacity );
-                }
-
-                newBuf.clear();
-                oldBuf.clear();
-                newBuf.put( oldBuf );
-                this.buf = newBuf;
-            }
-
 
 
             public boolean isAutoExpand()

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java Thu Jun 19 02:01:59 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.common;
 
+import org.apache.qpid.framing.AMQShortString;
+
 /**
  * Specifies the available client property types that different clients can use to identify themselves with.
  *
@@ -30,8 +32,21 @@
  */
 public enum ClientProperties
 {
-    instance,
-    product,
-    version,
-    platform
+    instance("instance"),
+    product("product"),
+    version("version"),
+    platform("platform");
+
+    private final AMQShortString _amqShortString;
+
+    private ClientProperties(String name)
+    {
+        _amqShortString = new AMQShortString(name);
+    }
+
+
+    public AMQShortString toAMQShortString()
+    {
+        return _amqShortString;
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Jun 19 02:01:59 2008
@@ -224,7 +224,6 @@
         }
     }
 
-
     /**
      * Get the length of the short string
      * @return length of the underlying byte array
@@ -464,13 +463,49 @@
             return false;
         }
 
-        if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+        final int hashCode = _hashCode;
+
+        final int otherHashCode = otherString._hashCode;
+
+        if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
+        {
+            return false;
+        }
+
+        final int length = _length;
+
+        if(length != otherString._length)
         {
             return false;
         }
 
-        return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
-                || Arrays.equals(getBytes(),otherString.getBytes());
+
+        final byte[] data = _data;
+
+        final byte[] otherData = otherString._data;
+
+        final int offset = _offset;
+
+        final int otherOffset = otherString._offset;
+
+        if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+        {
+            return Arrays.equals(data, otherData);
+        }
+        else
+        {
+            int thisIdx = offset;
+            int otherIdx = otherOffset;
+            for(int i = length;  i-- != 0; )
+            {
+                if(!(data[thisIdx++] == otherData[otherIdx++]))
+                {
+                    return false;
+                }
+            }
+        }
+
+        return true;
 
     }
 
@@ -718,4 +753,17 @@
         return false;  //To change body of created methods use File | Settings | File Templates.
     }
 
+
+    public static void main(String args[])
+    {
+        AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+        AMQShortString s2 = s.substring(2, 7);
+
+        AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+        while(t.hasMoreTokens())
+        {
+            System.err.println(t.nextToken());
+        }
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Thu Jun 19 02:01:59 2008
@@ -93,4 +93,24 @@
     {
         return "[" + getType() + ": " + getValue() + "]";
     }
+
+
+    public boolean equals(Object o)
+    {
+        if(o instanceof AMQTypedValue)
+        {
+            AMQTypedValue other = (AMQTypedValue) o;
+            return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    public int hashCode()
+    {
+        return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Thu Jun 19 02:01:59 2008
@@ -74,7 +74,7 @@
         buffer.skip((int) length);
     }
 
-    private AMQTypedValue getProperty(AMQShortString string)
+    public AMQTypedValue getProperty(AMQShortString string)
     {
         checkPropertyName(string);
 
@@ -891,6 +891,20 @@
         return keys;
     }
 
+    public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+    {
+        if(_encodedForm != null)
+        {
+            return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+        }
+        else
+        {
+            initMapIfNecessary();
+            return _properties.entrySet().iterator();
+        }
+    }
+
+
     public Object get(AMQShortString key)
     {
 
@@ -1050,6 +1064,95 @@
         }
     }
 
+    private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+    {
+        private final AMQTypedValue _value;
+        private final AMQShortString _key;
+
+        public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+        {
+            _key = key;
+            _value = value;
+        }
+
+        public AMQShortString getKey()
+        {
+            return _key;
+        }
+
+        public AMQTypedValue getValue()
+        {
+            return _value;
+        }
+
+        public AMQTypedValue setValue(final AMQTypedValue value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean equals(Object o)
+        {
+            if(o instanceof FieldTableEntry)
+            {
+                FieldTableEntry other = (FieldTableEntry) o;
+                return (_key == null ? other._key == null : _key.equals(other._key))
+                       && (_value == null ? other._value == null : _value.equals(other._value));
+            }
+            else
+            {
+                return false;
+            }
+        }
+
+        public int hashCode()
+        {
+            return (getKey()==null   ? 0 : getKey().hashCode())
+                   ^ (getValue()==null ? 0 : getValue().hashCode());
+        }
+
+    }
+
+
+    private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+    {
+
+        private final ByteBuffer _buffer;
+        private int _expectedRemaining;
+
+        public FieldTableIterator(ByteBuffer buffer, int length)
+        {
+            _buffer = buffer;
+            _expectedRemaining = buffer.remaining() - length;
+        }
+
+        public boolean hasNext()
+        {
+            return (_buffer.remaining() > _expectedRemaining);
+        }
+
+        public Map.Entry<AMQShortString, AMQTypedValue> next()
+        {
+            if(hasNext())
+            {
+                final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+                AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+                return new FieldTableEntry(key, value);
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+
+
+
     public int hashCode()
     {
         initMapIfNecessary();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Jun 19 02:01:59 2008
@@ -50,7 +50,7 @@
  *
  * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
  */
-public class Job implements Runnable
+public class Job implements ReadWriteRunnable
 {
     /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
     private final int _maxEvents;
@@ -67,18 +67,22 @@
     /** Holds the completion continuation, called upon completion of a run of the job. */
     private final JobCompletionHandler _completionHandler;
 
+    private final boolean _readJob;
+
     /**
      * Creates a new job that aggregates many continuations together.
      *
      * @param session           The Mina session.
      * @param completionHandler The per job run, terminal continuation.
      * @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
+     * @param readJob
      */
-    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
+    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
     {
         _session = session;
         _completionHandler = completionHandler;
         _maxEvents = maxEvents;
+        _readJob = readJob;
     }
 
     /**
@@ -157,6 +161,22 @@
         }
     }
 
+    public boolean isReadJob()
+    {
+        return _readJob;
+    }
+
+    public boolean isRead()
+    {
+        return _readJob;
+    }
+
+    public boolean isWrite()
+    {
+        return !_readJob;
+    }
+
+
     /**
      * Another interface for a continuation.
      *

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu Jun 19 02:01:59 2008
@@ -60,24 +60,6 @@
  *     <td> {@link Job}, {@link Job.JobCompletionHandler}
  * </table>
  *
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- *       The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- *       pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- *       so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- *       Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- *       stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- *       it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- *       and trips off another batch of 10 until they are all done. Why not just have a straight forward
- *       consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- *       in them, just have one queue of events and worker threads taking the next event. There will be coordination
- *       between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- *       amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- *       pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- *       better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- *       be substituted in.
- *
  * @todo The static helper methods are pointless. Could just call new.
  */
 public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -96,17 +78,20 @@
 
     private final int _maxEvents;
 
+    private final boolean _readFilter;
+
     /**
      * Creates a named pooling filter, on the specified shared thread pool.
      *
      * @param refCountingPool The thread pool reference.
      * @param name            The identifying name of the filter type.
      */
-    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
     {
         _poolReference = refCountingPool;
         _name = name;
         _maxEvents = maxEvents;
+        _readFilter = readFilter;
     }
 
     /**
@@ -167,7 +152,6 @@
     void fireAsynchEvent(Job job, Event event)
     {
 
-        // job.acquire(); //prevents this job being removed from _jobs
         job.add(event);
 
         final ExecutorService pool = _poolReference.getPool();
@@ -201,7 +185,7 @@
      */
     public void createNewJobForSession(IoSession session)
     {
-        Job job = new Job(session, this, MAX_JOB_EVENTS);
+        Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
         session.setAttribute(_name, job);
     }
 
@@ -433,7 +417,7 @@
          */
         public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
         }
 
         /**
@@ -476,7 +460,7 @@
          */
         public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
         }
 
         /**

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=669431&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Thu Jun 19 02:01:59 2008
@@ -0,0 +1,432 @@
+package org.apache.qpid.pool;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
+{
+
+    private final AtomicInteger _count = new AtomicInteger(0);
+
+    private final ReentrantLock _takeLock = new ReentrantLock();
+
+    private final Condition _notEmpty = _takeLock.newCondition();
+
+    private final ReentrantLock _putLock = new ReentrantLock();
+
+    private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+    private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+
+    private class ReadWriteJobIterator implements Iterator<Runnable>
+    {
+
+        private boolean _onReads;
+        private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
+
+        public boolean hasNext()
+        {
+            if(!_iter.hasNext())
+            {
+                if(_onReads)
+                {
+                    _iter = _readJobQueue.iterator();
+                    _onReads = true;
+                    return _iter.hasNext();
+                }
+                else
+                {
+                    return false;
+                }
+            }
+            else
+            {
+                return true;
+            }
+        }
+
+        public Runnable next()
+        {
+            if(_iter.hasNext())
+            {
+                return _iter.next();
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public void remove()
+        {
+            _takeLock.lock();
+            try
+            {
+                _iter.remove();
+                _count.decrementAndGet();
+            }
+            finally
+            {
+                _takeLock.unlock();
+            }
+        }
+    }
+
+    public Iterator<Runnable> iterator()
+    {
+        return new ReadWriteJobIterator();
+    }
+
+    public int size()
+    {
+        return _count.get();
+    }
+
+    public boolean offer(final Runnable runnable)
+    {
+        final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+        final ReentrantLock putLock = _putLock;
+        putLock.lock();
+        try
+        {
+            if(job.isRead())
+            {
+                _readJobQueue.offer(job);
+            }
+            else
+            {
+                _writeJobQueue.offer(job);
+            }
+            if(_count.getAndIncrement() == 0)
+            {
+                _takeLock.lock();
+                try
+                {
+                    _notEmpty.signal();
+                }
+                finally
+                {
+                    _takeLock.unlock();
+                }
+            }
+            return true;
+        }
+        finally
+        {
+            putLock.unlock();
+        }
+    }
+
+    public void put(final Runnable runnable) throws InterruptedException
+    {
+        final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+        final ReentrantLock putLock = _putLock;
+        putLock.lock();
+
+        try
+        {
+            if(job.isRead())
+            {
+                _readJobQueue.offer(job);
+            }
+            else
+            {
+                _writeJobQueue.offer(job);
+            }
+            if(_count.getAndIncrement() == 0)
+            {
+                                _takeLock.lock();
+                try
+                {
+                    _notEmpty.signal();
+                }
+                finally
+                {
+                    _takeLock.unlock();
+                }
+            }
+
+        }
+        finally
+        {
+            putLock.unlock();
+        }
+    }
+
+
+
+    public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
+    {
+        final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+        final ReentrantLock putLock = _putLock;
+        putLock.lock();
+
+        try
+        {
+            if(job.isRead())
+            {
+                _readJobQueue.offer(job);
+            }
+            else
+            {
+                _writeJobQueue.offer(job);
+            }
+            if(_count.getAndIncrement() == 0)
+            {
+                _takeLock.lock();
+                try
+                {
+                    _notEmpty.signal();
+                }
+                finally
+                {
+                    _takeLock.unlock();
+                }
+            }
+
+            return true;
+        }
+        finally
+        {
+            putLock.unlock();
+        }
+
+    }
+
+    public Runnable take() throws InterruptedException
+    {
+        final ReentrantLock takeLock = _takeLock;
+        takeLock.lockInterruptibly();
+        try
+        {
+            try
+            {
+                while (_count.get() == 0)
+                {
+                    _notEmpty.await();
+                }
+            }
+            catch (InterruptedException ie)
+            {
+                _notEmpty.signal();
+                throw ie;
+            }
+
+            ReadWriteRunnable job = _writeJobQueue.poll();
+            if(job == null)
+            {
+                job = _readJobQueue.poll();
+            }
+            int c = _count.getAndDecrement();
+            if (c > 1)
+            {
+                _notEmpty.signal();
+            }
+            return job;
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+
+
+    }
+
+    public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
+    {
+        final ReentrantLock takeLock = _takeLock;
+        final AtomicInteger count = _count;
+        long nanos = unit.toNanos(timeout);
+        takeLock.lockInterruptibly();
+        ReadWriteRunnable job = null;
+        try
+        {
+
+            for (;;)
+            {
+                if (count.get() > 0)
+                {
+                    job = _writeJobQueue.poll();
+                    if(job == null)
+                    {
+                        job = _readJobQueue.poll();
+                    }
+                    int c = count.getAndDecrement();
+                    if (c > 1)
+                    {
+                        _notEmpty.signal();
+                    }
+                    break;
+                }
+                if (nanos <= 0)
+                {
+                    return null;
+                }
+                try
+                {
+                    nanos = _notEmpty.awaitNanos(nanos);
+                }
+                catch (InterruptedException ie)
+                {
+                    _notEmpty.signal();
+                    throw ie;
+                }
+            }
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+
+        return job;
+    }
+
+    public int remainingCapacity()
+    {
+        return Integer.MAX_VALUE;
+    }
+
+    public int drainTo(final Collection<? super Runnable> c)
+    {
+        int total = 0;
+
+        _putLock.lock();
+        _takeLock.lock();
+        try
+        {
+            ReadWriteRunnable job;
+            while((job = _writeJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _writeJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+            while((job = _readJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _readJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+        }
+        finally
+        {
+            _takeLock.unlock();
+            _putLock.unlock();
+        }
+        return total;
+    }
+
+    public int drainTo(final Collection<? super Runnable> c, final int maxElements)
+    {
+        int total = 0;
+
+        _putLock.lock();
+        _takeLock.lock();
+        try
+        {
+            ReadWriteRunnable job;
+            while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _writeJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+            while(total<=maxElements && (job = _readJobQueue.peek())!= null)
+            {
+                c.add(job);
+                _readJobQueue.poll();
+                _count.decrementAndGet();
+                total++;
+            }
+
+        }
+        finally
+        {
+            _takeLock.unlock();
+            _putLock.unlock();
+        }
+        return total;
+
+    }
+
+    public Runnable poll()
+    {
+        final ReentrantLock takeLock = _takeLock;
+        takeLock.lock();
+        try
+        {
+            if(_count.get() > 0)
+            {
+                ReadWriteRunnable job = _writeJobQueue.poll();
+                if(job == null)
+                {
+                    job = _readJobQueue.poll();
+                }
+                _count.decrementAndGet();
+                return job;
+            }
+            else
+            {
+                return null;
+            }
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+
+    }
+
+    public Runnable peek()
+    {
+        final ReentrantLock takeLock = _takeLock;
+        takeLock.lock();
+        try
+        {
+            ReadWriteRunnable job = _writeJobQueue.peek();
+            if(job == null)
+            {
+                job = _readJobQueue.peek();
+            }
+            return job;
+        }
+        finally
+        {
+            takeLock.unlock();
+        }
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=669431&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Thu Jun 19 02:01:59 2008
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+    boolean isRead();
+    boolean isWrite();
+}

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Thu Jun 19 02:01:59 2008
@@ -22,6 +22,9 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -84,6 +87,8 @@
     /** Holds the number of executor threads to create. */
     private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
 
+    private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
     /**
      * Retrieves the singleton instance of this reference counter.
      *
@@ -105,15 +110,28 @@
      *
      * @return An executor service.
      */
-    ExecutorService acquireExecutorService()
+    public ExecutorService acquireExecutorService()
     {
         synchronized (_lock)
         {
             if (_refCount++ == 0)
             {
-                _pool = Executors.newFixedThreadPool(_poolSize);
+//                _pool = Executors.newFixedThreadPool(_poolSize);
+
+                // Use a job queue that biases to writes
+                if(_useBiasedPool)
+                {
+                    _pool =  new ThreadPoolExecutor(_poolSize, _poolSize,
+                                          0L, TimeUnit.MILLISECONDS,
+                                          new ReadWriteJobQueue());
+                }
+                else
+                {
+                    _pool = Executors.newFixedThreadPool(_poolSize);
+                }
             }
 
+
             return _pool;
         }
     }
@@ -122,7 +140,7 @@
      * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
      * to zero, the executor service is shut down.
      */
-    void releaseExecutorService()
+    public void releaseExecutorService()
     {
         synchronized (_lock)
         {

Modified: incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java (original)
+++ incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java Thu Jun 19 02:01:59 2008
@@ -23,10 +23,11 @@
 import junit.framework.TestCase;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.DestNameExchange;
+import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueMBean;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -62,7 +63,8 @@
     {
         // If this test fails due to changes in the broker code,
         // then the constants in the Constants.java shoule be updated accordingly
-        AMQQueue queue = new AMQQueue(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost,
+                                                            null);
         AMQManagedObject mbean = new AMQQueueMBean(queue);
         MBeanInfo mbeanInfo = mbean.getMBeanInfo();
 
@@ -82,7 +84,7 @@
     {
         // If this test fails due to changes in the broker code,
         // then the constants in the Constants.java shoule be updated accordingly 
-        DestNameExchange exchange = new DestNameExchange();
+        DirectExchange exchange = new DirectExchange();
         exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
         AMQManagedObject mbean = (AMQManagedObject)exchange.getManagedObject();
         MBeanInfo mbeanInfo = mbean.getMBeanInfo();

Modified: incubator/qpid/trunk/qpid/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/pom.xml?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/systests/pom.xml Thu Jun 19 02:01:59 2008
@@ -151,7 +151,6 @@
                         <MessageReturnTest>-n MessageReturnTest org.apache.qpid.server.queue.MessageReturnTest </MessageReturnTest>
                         <QueueDepthWithSelectorTest>-n QueueDepthWithSelectorTest org.apache.qpid.server.queue.QueueDepthWithSelectorTest </QueueDepthWithSelectorTest>
                         <!--<SubscriptionManagerTest>-n SubscriptionManagerTest org.apache.qpid.server.queue.SubscriptionManagerTest </SubscriptionManagerTest>-->
-                        <SubscriptionSetTest>-n SubscriptionSetTest org.apache.qpid.server.queue.SubscriptionSetTest </SubscriptionSetTest>
                         <TimeToLiveTest>-n TimeToLiveTest org.apache.qpid.server.queue.TimeToLiveTest </TimeToLiveTest>
                         <TxnBufferTest>-n TxnBufferTest org.apache.qpid.server.txn.TxnBufferTest </TxnBufferTest>
                         <!--<TxnTest>-n TxnTest org.apache.qpid.server.txn.TxnTest </TxnTest>-->

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Thu Jun 19 02:01:59 2008
@@ -26,12 +26,16 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 
@@ -99,12 +103,16 @@
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
 
-        Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+        Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
         {
-            TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(),
+            TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
                                                                           _storeContext, null,
                                                                           new LinkedList<RequiredDeliveryException>()
             );
+            AMQQueue queue =
+                    AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()),
+                                                       null);
+
             for (int i = 0; i < messageCount; i++)
             {
                 long deliveryTag = i + 1;
@@ -138,8 +146,8 @@
                     }
                 };
 
-                TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
-                _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map));
+                TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+                _map.add(deliveryTag, queue.enqueue(new StoreContext(), message));
             }
             _acked = acked;
             _unacked = unacked;
@@ -154,7 +162,7 @@
         {
             for (long tag : tags)
             {
-                UnacknowledgedMessage u = _map.get(tag);
+                QueueEntry u = _map.get(tag);
                 assertTrue("Message not found for tag " + tag, u != null);
                 ((TestMessage) u.getMessage()).assertCountEquals(expected);
             }
@@ -175,7 +183,7 @@
             _op.consolidate();
             _op.undoPrepare();
 
-            assertCount(_acked, 1); //DTX Changed to 0, but that is wrong msg 5 is acked!
+            assertCount(_acked, 1);
             assertCount(_unacked, 0);
         }
 
@@ -195,34 +203,50 @@
         }
     }
 
+    private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+    {
+        final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+                                                                                                   null,
+                                                                                                   false);
+        try
+        {
+            amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+                                                              publishBody,
+                                                              new ContentHeaderBody()
+            {
+                public int getSize()
+                {
+                    return 1;
+                }
+            });
+        }
+        catch (AMQException e)
+        {
+            // won't happen
+        }
+
+
+        return amqMessageHandle;
+    }
+
+
     private class TestMessage extends AMQMessage
     {
         private final long _tag;
         private int _count;
 
-        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+                throws AMQException
         {
-            super(messageId, publishBody, txnContext);
-            try
-            {
-                setContentHeaderBody(new ContentHeaderBody()
-                {
-                    public int getSize()
-                    {
-                        return 1;
-                    }
-                });
-            }
-            catch (AMQException e)
-            {
-                // won't happen
-            }
+            super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
             _tag = tag;
         }
 
-        public void incrementReference()
+
+        public boolean incrementReference()
         {
             _count++;
+            return true;
         }
 
         public void decrementReference(StoreContext context)

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Thu Jun 19 02:01:59 2008
@@ -24,18 +24,17 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.log4j.Logger;
 
 import java.util.*;
@@ -50,7 +49,7 @@
     /**
      * Not used in this test, just there to stub out the routing calls
      */
-    private MessageStore _store = new SkeletonMessageStore();
+    private MessageStore _store = new MemoryMessageStore();
 
     private StoreContext _storeContext = new StoreContext();
 
@@ -94,7 +93,11 @@
     protected void route(Message m) throws AMQException
     {
         m.route(exchange);
-        m.routingComplete(_store, _storeContext, _handleFactory);
+        m.getIncomingMessage().routingComplete(_store, _handleFactory);
+        if(m.getIncomingMessage().allContentReceived())
+        {
+            m.getIncomingMessage().deliverToQueues();
+        }
     }
 
     protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -122,12 +125,12 @@
             {
                 if (expected.contains(q))
                 {
-                    assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+                    assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m));
                     //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
                 }
                 else
                 {
-                    assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+                    assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m));
                     //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
                 }
             }
@@ -234,7 +237,7 @@
         return properties;
     }
 
-    static class TestQueue extends AMQQueue
+    static class TestQueue extends SimpleAMQQueue
     {
         final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
 
@@ -248,13 +251,167 @@
          * not invoked. It is unnecessary since for this test we only care to know whether the message was
          * sent to the queue; the queue processing logic is not being tested.
          * @param msg
-         * @param deliverFirst
          * @throws AMQException
          */
-        public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
+        @Override
+        public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+        {
+            messages.add( new HeadersExchangeTest.Message(msg));
+            return new QueueEntry()
+            {
+
+                public AMQQueue getQueue()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public AMQMessage getMessage()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public long getSize()
+                {
+                    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean getDeliveredToConsumer()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean expired() throws AMQException
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isAcquired()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean acquire()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean acquire(Subscription sub)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean delete()
+                {
+                    return false;
+                }
+
+                public boolean isDeleted()
+                {
+                    return false;
+                }
+
+                public boolean acquiredBySubscription()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void setDeliveredToSubscription()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void release()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public String debugIdentity()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean immediateAndNotDelivered()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void setRedelivered(boolean b)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public Subscription getDeliveredSubscription()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void reject()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void reject(Subscription subscription)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isRejectedBy(Subscription subscription)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void requeue(StoreContext storeContext) throws AMQException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void dispose(final StoreContext storeContext) throws MessageCleanupException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void restoreCredit()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isQueueDeleted()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void addStateChangeListener(StateChangeListener listener)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean removeStateChangeListener(StateChangeListener listener)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public int compareTo(final QueueEntry o)
+                {
+                    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+            };
+        }
+
+        boolean isInQueue(Message msg)
         {
-            messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
+            return messages.contains(msg);
         }
+
     }
 
     /**
@@ -262,10 +419,44 @@
      */
     static class Message extends AMQMessage
     {
-        private static MessageStore _messageStore = new MemoryMessageStore();
+        private class TestIncomingMessage extends IncomingMessage
+        {
+
+            public TestIncomingMessage(final long messageId,
+                                       final MessagePublishInfo info,
+                                       final TransactionalContext txnContext,
+                                       final AMQProtocolSession publisher)
+            {
+                super(messageId, info, txnContext, publisher);
+            }
+
+
+            public AMQMessage getUnderlyingMessage()
+            {
+                return Message.this;
+            }
+
+
+            public ContentHeaderBody getContentHeaderBody()
+            {
+                try
+                {
+                    return Message.this.getContentHeaderBody();
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        private IncomingMessage _incoming;
+
+        private static MessageStore _messageStore = new SkeletonMessageStore();
 
         private static StoreContext _storeContext = new StoreContext();
 
+
         private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
                                                                                       null,
                                                                          new LinkedList<RequiredDeliveryException>()
@@ -278,12 +469,47 @@
 
         Message(String id, FieldTable headers) throws AMQException
         {
-            this(getPublishRequest(id), getContentHeader(headers), null);
+            this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+        }
+
+        public IncomingMessage getIncomingMessage()
+        {
+            return _incoming;
+        }
+
+        private Message(long messageId,
+                        MessagePublishInfo publish,
+                        ContentHeaderBody header,
+                        List<ContentBody> bodies) throws AMQException
+        {
+            super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+
+
+            
+            _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+            _incoming.setContentHeaderBody(header);
+
+
         }
 
-        private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+        private static AMQMessageHandle createMessageHandle(final long messageId,
+                                                            final MessagePublishInfo publish,
+                                                            final ContentHeaderBody header)
         {
-            super(_messageStore.getNewMessageId(), publish, _txnContext, header);
+
+            final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+                                                                                                       _messageStore,
+                                                                                                       true);
+
+            try
+            {
+                amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+            }
+            catch (AMQException e)
+            {
+                
+            }
+            return amqMessageHandle;
         }
 
         private Message(AMQMessage msg) throws AMQException
@@ -291,15 +517,13 @@
             super(msg);
         }
 
+
+
         void route(Exchange exchange) throws AMQException
         {
-            exchange.route(this);
+            exchange.route(_incoming);
         }
 
-        boolean isInQueue(TestQueue queue)
-        {
-            return queue.messages.contains(this);
-        }
 
         public int hashCode()
         {

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Jun 19 02:01:59 2008
@@ -28,16 +28,15 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.management.JMException;
 
-
 /**
  * Test class to test MBean operations for AMQMinaProtocolSession.
  */
@@ -56,13 +55,12 @@
         // check the channel count is correct
         int channelCount = _mbean.channels().size();
         assertTrue(channelCount == 1);
-        AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()),
                                                                    false,
                                                                    new AMQShortString("test"),
                                                                    true,
-                                                                   _protocolSession.getVirtualHost());
-        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
-	
+                                                                   _protocolSession.getVirtualHost(), null);
+        AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
         channel.setDefaultQueue(queue);
         _protocolSession.addChannel(channel);
         channelCount = _mbean.channels().size();