You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC

svn commit: r507672 [12/16] - in /incubator/qpid/branches/qpid.0-9: gentools/src/org/apache/qpid/gentools/ gentools/templ.java/ gentools/templ.net/ java/ java/broker/ java/broker/bin/ java/broker/distribution/ java/broker/distribution/src/ java/broker/...

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java Wed Feb 14 12:02:03 2007
@@ -65,29 +65,35 @@
         _connection.close();
     }
 
-    public void onMessage(Message response)
+    public synchronized void onMessage(Message response)
     {
+       
         System.out.println("Received " + (++_count) + " of " + _expected  + " responses.");
         if(_count == _expected)
         {
-            synchronized(this)
-            {
-                notifyAll();
-            }
+
+            notifyAll();
         }
+
+
     }
 
-    synchronized void waitUntilComplete() throws InterruptedException
+    synchronized void waitUntilComplete() throws Exception
     {
-        while(_count < _expected)
+
+        if(_count < _expected)
+        {
+            wait(10000L);
+        }
+        if(_count < _expected)
         {
-            wait();
+            throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);    
         }
     }
 
     static AMQConnection connect(String broker) throws Exception
     {
-        return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+        return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
     }
 
     public static void main(String[] argv) throws Exception

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java Wed Feb 14 12:02:03 2007
@@ -73,7 +73,7 @@
 
     static AMQConnection connect(String broker) throws Exception
     {
-        return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+        return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
     }
 
 //    public static void main(String[] argv) throws Exception

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java Wed Feb 14 12:02:03 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.test.unit.client.forwardall;
 
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
 
 /**
  * Queue that allows several private queues to be registered and bound
@@ -29,15 +30,19 @@
  */
 class SpecialQueue extends AMQQueue
 {
-    private final String name;
+    private final AMQShortString name;
 
     SpecialQueue(String name)
     {
+        this(new AMQShortString(name));
+    }
+    SpecialQueue(AMQShortString name)
+    {
         super(name, true);
         this.name = name;
     }
 
-    public String getRoutingKey()
+    public AMQShortString getRoutingKey()
     {
         return name;
     }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java Wed Feb 14 12:02:03 2007
@@ -54,7 +54,7 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path");
+        connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test");
         destination = new AMQQueue(randomize("LatencyTest"), true);
         session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
 
@@ -101,6 +101,7 @@
         }
         catch (Exception e)
         {
+            e.printStackTrace();
             fail("This Test should succeed but failed due to: " + e);
         }
         finally
@@ -236,7 +237,7 @@
 
     public void onMessage(Message message)
     {
-        received++;
+
         try
         {
             if (message instanceof ObjectMessage)
@@ -255,13 +256,11 @@
             items.add(e);
         }
 
-        if (waiting)
-        {
             synchronized(this)
             {
+                received++;
                 notify();
             }
-        }
     }
 
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.mina.common.IoSession;
 
 import junit.framework.TestCase;
@@ -45,7 +46,7 @@
             return (TestIoSession) _minaProtocolSession;
         }
 
-        public String genQueueName()
+        public AMQShortString genQueueName()
         {
             return generateQueueName();
         }
@@ -80,26 +81,26 @@
 
     public void testGenerateQueueName()
     {
-        String testAddress;
+        AMQShortString testAddress;
 
-        //test address with / and ; chars which generateQueueName should remove
+        //test address with / and ; chars which generateQueueName should removeKey
         _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
         _testSession.getMinaProtocolSession().setLocalPort(_port);
 
         testAddress = _testSession.genQueueName();
-        assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress);
+        assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
 
         //test empty address
         _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
 
         testAddress = _testSession.genQueueName();
-        assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress);
+        assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
 
         //test address with no special chars
         _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
 
         testAddress = _testSession.genQueueName();
-        assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress);
+        assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
 
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Wed Feb 14 12:02:03 2007
@@ -28,7 +28,7 @@
     protected Connection createConnection() throws AMQException, URLSyntaxException
     {
         return new AMQConnection(_broker, "guest", "guest",
-                                                      "fred", "/test");
+                                                      "fred", "test");
     }
 
     public void testTempoaryQueue() throws Exception

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java Wed Feb 14 12:02:03 2007
@@ -51,7 +51,7 @@
 
     public void testAllMethodsThrowAfterConnectionClose() throws Exception
     {
-        AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "/test_path");
+        AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test");
 
         Topic destination1 = new AMQTopic("t1");
         TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java Wed Feb 14 12:02:03 2007
@@ -11,6 +11,7 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.*;
 
@@ -39,12 +40,12 @@
 
     public void testJMSDestination() throws Exception
     {
-        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Queue queue = new AMQQueue("someQ", "someQ", false, true);
+        Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
 
-        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
         Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         MessageProducer producer = producerSession.createProducer(queue);
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Wed Feb 14 12:02:03 2007
@@ -1,11 +1,23 @@
-/**
- * User: Robert Greig
- * Date: 12-Dec-2006
- ******************************************************************************
- * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
- * this program may be photocopied reproduced or translated to another
- * program language without prior written consent of JP Morgan Chase Ltd
- ******************************************************************************/
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.test.unit.message;
 
 import junit.framework.TestCase;
@@ -47,7 +59,7 @@
 
     public void testStreamMessageEOF() throws Exception
     {
-        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
 
@@ -60,7 +72,7 @@
         //force synch to ensure the consumer has resulted in a bound queue
         ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
 
-        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
 
         AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
@@ -101,7 +113,7 @@
 
     public void testModifyReceivedMessageExpandsBuffer() throws Exception
     {
-        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         AMQQueue queue = new AMQQueue("testQ");
         MessageConsumer consumer = consumerSession.createConsumer(queue);
@@ -123,7 +135,7 @@
                 }
             }
         });
-        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
         AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         MessageProducer mandatoryProducer = producerSession.createProducer(queue);
         con.start();

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Feb 14 12:02:03 2007
@@ -25,7 +25,7 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -39,10 +39,23 @@
 
 public class DurableSubscriptionTest extends TestCase
 {
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
     public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
     {
         AMQTopic topic = new AMQTopic("MyTopic");
-        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
         Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         MessageConsumer consumer1 = session1.createConsumer(topic);
         MessageProducer producer = session1.createProducer(topic);
@@ -83,7 +96,7 @@
     public void testDurability() throws AMQException, JMSException, URLSyntaxException
     {
         AMQTopic topic = new AMQTopic("MyTopic");
-        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
         Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         MessageConsumer consumer1 = session1.createConsumer(topic);
         MessageProducer producer = session1.createProducer(topic);
@@ -128,6 +141,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(DurableSubscriptionTest.class));
+        return new junit.framework.TestSuite(DurableSubscriptionTest.class);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java Wed Feb 14 12:02:03 2007
@@ -48,7 +48,7 @@
     public void testUnidentifiedProducer() throws Exception
     {
         AMQTopic topic = new AMQTopic("MyTopic");
-        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
         TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(null);
         MessageConsumer consumer1 = session1.createConsumer(topic);

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Wed Feb 14 12:02:03 2007
@@ -51,7 +51,7 @@
     public void testTopicSubscriptionUnsubscription() throws Exception
     {
         AMQTopic topic = new AMQTopic("MyTopic");
-        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
         TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
         TopicPublisher publisher = session1.createPublisher(topic);
@@ -97,7 +97,7 @@
     {
         AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
         AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
-        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
         TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
         TopicPublisher publisher = session1.createPublisher(null);
@@ -112,7 +112,7 @@
         {
             session1.close();
             con.close();
-            con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+            con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
             con.start();
             session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
             publisher = session1.createPublisher(null);
@@ -134,11 +134,11 @@
     public void testUnsubscriptionAfterConnectionClose() throws Exception
     {
         AMQTopic topic = new AMQTopic("MyTopic3");
-        AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
         TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
 
-        AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+        AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
         TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
 
@@ -149,7 +149,7 @@
         assertNotNull(tm);
         con2.close();
         publisher.publish(session1.createTextMessage("Hello2"));
-        con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+        con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
         session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         sub = session2.createDurableSubscriber(topic, "subscription0");
         con2.start();
@@ -163,14 +163,14 @@
     public void testTextMessageCreation() throws Exception
     {
         AMQTopic topic = new AMQTopic("MyTopic4");
-        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
         TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
         MessageConsumer consumer1 = session1.createConsumer(topic);
         con.start();
         TextMessage tm = session1.createTextMessage("Hello");
         publisher.publish(tm);
-        tm = (TextMessage) consumer1.receive(2000);
+        tm = (TextMessage) consumer1.receive(200000L);
         assertNotNull(tm);
         String msgText = tm.getText();
         assertEquals("Hello", msgText);
@@ -178,7 +178,7 @@
         msgText = tm.getText();
         assertNull(msgText);
         publisher.publish(tm);
-        tm = (TextMessage) consumer1.receive(2000);
+        tm = (TextMessage) consumer1.receive(20000000L);
         assertNotNull(tm);
         msgText = tm.getText();
         assertNull(msgText);
@@ -202,7 +202,7 @@
 
     public void testSendingSameMessage() throws Exception
     {
-        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         TemporaryTopic topic = session.createTemporaryTopic();
         assertNotNull(topic);
@@ -219,12 +219,13 @@
         assertNotNull(receivedMessage);
         assertEquals(sentMessage.getText(),receivedMessage.getText());
 
+        conn.close();
 
     }
 
     public void testTemporaryTopic() throws Exception
     {
-        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         TemporaryTopic topic = session.createTemporaryTopic();
         assertNotNull(topic);

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Wed Feb 14 12:02:03 2007
@@ -23,8 +23,11 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
 import org.apache.log4j.Logger;
 
 import javax.jms.*;
@@ -54,10 +57,11 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        queue1 = new AMQQueue("Q1", "Q1", false, true);
+        TransportConnection.createVMBroker(1);
+        queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
         queue2 = new AMQQueue("Q2", false);
 
-        con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test");
+        con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
         session = con.createSession(true, 0);
         consumer1 = session.createConsumer(queue1);
         //Dummy just to create the queue. 
@@ -66,16 +70,26 @@
         producer2 = session.createProducer(queue2);
         con.start();
 
-        prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test");
+        prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
         prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         prepProducer1 = prepSession.createProducer(queue1);
         prepCon.start();
+
+//         //add some messages
+//         prepProducer1.send(prepSession.createTextMessage("A"));
+//         prepProducer1.send(prepSession.createTextMessage("B"));
+//         prepProducer1.send(prepSession.createTextMessage("C"));
+// 
+//         testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+//         testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+//         testConsumer2 = testSession.createConsumer(queue2);
     }
 
     protected void tearDown() throws Exception
     {
         con.close();
         prepCon.close();
+        TransportConnection.killAllVMBrokers();
         super.tearDown();
     }
 
@@ -96,9 +110,9 @@
 
         //commit
         session.commit();
-
+        testCon.start();
         //ensure sent messages can be received and received messages are gone
-        testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+        testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
         testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         testConsumer1 = testSession.createConsumer(queue1);
         testConsumer2 = testSession.createConsumer(queue2);
@@ -108,6 +122,7 @@
         expect("Y", testConsumer2.receive(1000));
         expect("Z", testConsumer2.receive(1000));
 
+        testConsumer1 = testSession.createConsumer(queue1);
         assertTrue(null == testConsumer1.receive(1000));
         assertTrue(null == testConsumer2.receive(1000));
         testCon.close();
@@ -141,11 +156,12 @@
         expect("A", consumer1.receive(1000));
         expect("B", consumer1.receive(1000));
         expect("C", consumer1.receive(1000));
-
+        testCon.start();
+        testConsumer1 = testSession.createConsumer(queue1);
         //commit
         session.commit();
 
-        testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+        testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
         testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         testConsumer1 = testSession.createConsumer(queue1);
         testConsumer2 = testSession.createConsumer(queue2);
@@ -164,7 +180,7 @@
 
     public void testResendsMsgsAfterSessionClose() throws Exception
     {
-        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
 
         Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
         AMQQueue queue3 = new AMQQueue("Q3", false);
@@ -172,7 +188,7 @@
         //force synch to ensure the consumer has resulted in a bound queue
         ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
-        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
         Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
         MessageProducer producer = producerSession.createProducer(queue3);
 
@@ -234,6 +250,7 @@
 
         con.close();
         con2.close();
+
     }
     
     // This checks that queue Q1 is in fact empty and does not have any stray
@@ -251,6 +268,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+        return new junit.framework.TestSuite(TransactedTest.class);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java Wed Feb 14 12:02:03 2007
@@ -32,6 +32,7 @@
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionOpenOkBody;
 import org.apache.qpid.framing.ConnectionSecureBody;
@@ -128,9 +129,9 @@
 
     class ConnectionTuneHandler extends ConnectionTuneMethodHandler
     {
-        protected AMQMethodBody createConnectionOpenMethodBody(String path, String capabilities, boolean insist)
+        protected AMQMethodBody createConnectionOpenMethodBody(AMQProtocolSession protocolSession, AMQShortString path, AMQShortString capabilities, boolean insist)
         {
-            return super.createConnectionOpenMethodBody(path, ClusterCapability.add(capabilities, _identity), insist);
+            return super.createConnectionOpenMethodBody(protocolSession, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java Wed Feb 14 12:02:03 2007
@@ -46,8 +46,8 @@
 
     ServerHandlerRegistry getHandlerRegistry()
     {
-        // TODO - FIX THIS!
-        return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
+        // TODO - FIX THIS! These cannot be null to work correctly
+        return new ServerHandlerRegistry(getHandlerFactory(), null, null);
     }
 
     private MethodHandlerFactory getHandlerFactory()

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.cluster;
 
+import org.apache.qpid.framing.AMQShortString;
+
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -28,22 +30,22 @@
     public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*";
     public static final String PEER = "cluster_peer";
 
-    public static String add(String original, MemberHandle identity)
+    public static AMQShortString add(AMQShortString original, MemberHandle identity)
     {
-        return original == null ? peer(identity) : original + " " + peer(identity);
+        return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity));
     }
 
-    private static String peer(MemberHandle identity)
+    private static AMQShortString peer(MemberHandle identity)
     {
-        return PEER + "=" + identity.getDetails();
+        return new AMQShortString(PEER + "=" + identity.getDetails());
     }
 
-    public static boolean contains(String in)
+    public static boolean contains(AMQShortString in)
     {
-        return in != null && in.contains(in);
+        return in != null; // && in.contains(in);
     }
 
-    public static MemberHandle getPeer(String in)
+    public static MemberHandle getPeer(AMQShortString in)
     {
         Matcher matcher = Pattern.compile(PATTERN).matcher(in);
         if (matcher.matches())

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Wed Feb 14 12:02:03 2007
@@ -38,6 +38,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import java.net.InetSocketAddress;
 
@@ -55,13 +56,8 @@
     }
 
     public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
-    {
-        this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
-    }
-
-    public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
-    {
-        super(queueRegistry, exchangeRegistry);
+    {        
+        super(registry);
         ClusterBuilder builder = new ClusterBuilder(address);
         _groupMgr = builder.getGroupManager();
         _handlers = builder.getHandlerRegistry();
@@ -74,9 +70,9 @@
         _handlers = handler._handlers;
     }
 
-    protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
+    protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
     {
-        new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
+        new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession));
     }
 
     void connect(String join) throws Exception
@@ -176,12 +172,12 @@
 
     private boolean isMembershipAnnouncement(Object msg)
     {
-        return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+        return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody);
     }
 
     private boolean isBufferable(Object msg)
     {
-        return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+        return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame());
     }
 
     private boolean isBuffereable(AMQBody body)

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java Wed Feb 14 12:02:03 2007
@@ -24,7 +24,10 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -32,6 +35,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
 
@@ -39,10 +43,12 @@
 {
     private MemberHandle _peer;
 
-    public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
-            throws AMQException
+    public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+//    public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
+//        ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
     {
-        super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+        super(session, virtualHostRegistry, codecFactory, stateManager);
+//        super(session, queueRegistry, exchangeRegistry, codecFactory);
     }
 
     public boolean isPeerSession()
@@ -65,7 +71,8 @@
         AMQChannel channel = super.getChannel(channelId);
         if (isPeerSession() && channel == null)
         {
-            channel = new OneUseChannel(channelId, this, getStateManager());
+            VirtualHost virtualHost = getVirtualHost();
+            channel = new OneUseChannel(channelId, this, virtualHost.getMessageStore(),virtualHost.getExchangeRegistry(), getStateManager());
             addChannel(channel);
         }
         return channel;
@@ -101,19 +108,12 @@
      */
     private class OneUseChannel extends AMQChannel
     {
-        public OneUseChannel(int channelId, AMQProtocolSession session,
-            AMQMethodListener methodListener)
-        {
-            this(channelId, session, ApplicationRegistry.getInstance(), methodListener);
-        }
-
-        public OneUseChannel(int channelId, AMQProtocolSession session, IApplicationRegistry registry,
-                             AMQMethodListener methodListener)
+        public OneUseChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
         {
             super(channelId,
                   session,
-                  registry.getMessageStore(),
-                  registry.getExchangeRegistry(),
+                  messageStore,
+                  exchanges,
                   methodListener);
         }
 

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Wed Feb 14 12:02:03 2007
@@ -22,16 +22,10 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ClusterJoinBody;
-import org.apache.qpid.framing.ClusterLeaveBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.framing.ClusterPingBody;
-import org.apache.qpid.framing.ClusterSuspectBody;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.cluster.policy.StandardPolicies;
 import org.apache.qpid.server.cluster.replay.ReplayManager;
 import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.InvokeMultiple;
 
 import java.util.List;
 
@@ -96,7 +90,7 @@
         Broker destination = findBroker(broker);
         if(destination == null)
         {
-            _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));            
+            _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
         }
         else
         {
@@ -114,12 +108,15 @@
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        ClusterPingBody ping = new ClusterPingBody((byte)0, (byte)9);
-        ping.broker = _group.getLocal().getDetails();
-        ping.responseRequired = true;
-        ping.load = _loadTable.getLocalLoad();
+        ClusterPingBody ping = new ClusterPingBody((byte)0,
+                                                   (byte)9,
+                                                   ClusterPingBody.getClazz((byte)0, (byte)9),
+                                                   ClusterPingBody.getMethod((byte)0, (byte)9),
+                                                   _group.getLocal().getDetails(),
+                                                   _loadTable.getLocalLoad(),
+                                                   true);
         BlockingHandler handler = new BlockingHandler();
-        send(getLeader(), new SimpleSendable(ping), handler);
+        send(getLeader(), new SimpleBodySendable(ping), handler);
         handler.waitForCompletion();
         if (handler.failed())
         {
@@ -162,9 +159,13 @@
         _logger.info(new LogMessage("Connected to {0}. joining", leader));
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        ClusterJoinBody join = new ClusterJoinBody((byte)0, (byte)9);
-        join.broker = _group.getLocal().getDetails();
-        send(leader, new SimpleSendable(join));
+        ClusterJoinBody join = new ClusterJoinBody((byte)0,
+                                                   (byte)9,
+                                                   ClusterJoinBody.getClazz((byte)0, (byte)9),
+                                                   ClusterJoinBody.getMethod((byte)0, (byte)9),
+                                                   _group.getLocal().getDetails());
+
+        send(leader, new SimpleBodySendable(join));
     }
 
     private Broker connectToLeader(MemberHandle member) throws AMQException
@@ -183,9 +184,13 @@
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        ClusterLeaveBody leave = new ClusterLeaveBody((byte)0, (byte)9);
-        leave.broker = _group.getLocal().getDetails();
-        send(getLeader(), new SimpleSendable(leave));
+        ClusterLeaveBody leave = new ClusterLeaveBody((byte)0,
+                                                      (byte)9,
+                                                      ClusterLeaveBody.getClazz((byte)0, (byte)9),
+                                                      ClusterLeaveBody.getMethod((byte)0, (byte)9),
+                                                      _group.getLocal().getDetails());
+
+        send(getLeader(), new SimpleBodySendable(leave));
     }
 
     private void suspect(MemberHandle broker) throws AMQException
@@ -206,9 +211,13 @@
         {
             // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            ClusterSuspectBody suspect = new ClusterSuspectBody((byte)0, (byte)9);
-            suspect.broker = broker.getDetails();
-            send(getLeader(), new SimpleSendable(suspect));
+            ClusterSuspectBody suspect = new ClusterSuspectBody((byte)0,
+                                                                (byte)9,
+                                                                ClusterSuspectBody.getClazz((byte)0, (byte)9),
+                                                                ClusterSuspectBody.getMethod((byte)0, (byte)9),
+                                                                broker.getDetails());
+
+            send(getLeader(), new SimpleBodySendable(suspect));
         }
     }
 
@@ -230,10 +239,13 @@
             //pass request on to leader:
             // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            ClusterJoinBody request = new ClusterJoinBody((byte)0, (byte)9);
-            request.broker = member.getDetails();
+            ClusterJoinBody request = new ClusterJoinBody((byte)0, (byte)9,
+                                                          ClusterJoinBody.getClazz((byte)0, (byte)9),
+                                                          ClusterJoinBody.getMethod((byte)0, (byte)9),
+                                                          member.getDetails());
+            
             Broker leader = getLeader();
-            send(leader, new SimpleSendable(request));
+            send(leader, new SimpleBodySendable(request));
             _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
         }
     }
@@ -277,9 +289,12 @@
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        ClusterMembershipBody announce = new ClusterMembershipBody((byte)0, (byte)9);
-        //TODO: revise this way of converting String to bytes...
-        announce.members = membership.getBytes();
+        ClusterMembershipBody announce = new ClusterMembershipBody((byte)0, (byte)9,
+                                                                   ClusterMembershipBody.getClazz((byte)0, (byte)9),
+                                                                   ClusterMembershipBody.getMethod((byte)0, (byte)9),
+                                                                   membership.getBytes());
+
+        
         return announce;
     }
 
@@ -287,7 +302,7 @@
     {
         String membership = SimpleMemberHandle.membersToString(_group.getMembers());
         ClusterMembershipBody announce = createAnnouncement(membership);
-        broadcast(new SimpleSendable(announce));
+        broadcast(new SimpleBodySendable(announce));
         _logger.info(new LogMessage("Membership announcement sent: {0}", membership));
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java Wed Feb 14 12:02:03 2007
@@ -83,7 +83,7 @@
             // implementation provided by MINA
             if (connectorConfig.enableExecutorPool)
             {
-                sconfig.setThreadModel(new ReadWriteThreadModel());
+                sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
             }
 
             String host = InetAddress.getLocalHost().getHostName();

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.cluster;
 
+import org.apache.qpid.framing.AMQShortString;
+
 public interface MemberHandle
 {
     public String getHost();
@@ -30,5 +32,5 @@
 
     public boolean matches(String host, int port);
 
-    public String getDetails();
+    public AMQShortString getDetails();
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Wed Feb 14 12:02:03 2007
@@ -65,7 +65,6 @@
     {
         super(host, port);
         _local = local;
-        // TODO - FIX THIS
         _legacyHandler = new ClientHandlerRegistry(local, null);
     }
 
@@ -183,7 +182,7 @@
         {
             //signal redirection to waiting thread
             ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
-            String[] parts = redirect.host.split(":");
+            String[] parts = redirect.host.toString().split(":");
             _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
         }
         else
@@ -203,15 +202,15 @@
 
     private void handleFrame(AMQFrame frame) throws AMQException
     {
-        AMQBody body = frame.bodyFrame;
+        AMQBody body = frame.getBodyFrame();
         if (body instanceof AMQRequestBody)
         {
-            handleMethod(frame.channel, ((AMQRequestBody)body).getMethodPayload(),
+            handleMethod(frame.getChannel(), ((AMQRequestBody)body).getMethodPayload(),
                 ((AMQRequestBody)body).getRequestId());
         }
         else if (body instanceof AMQResponseBody)
         {
-            handleMethod(frame.channel, ((AMQResponseBody)body).getMethodPayload(),
+            handleMethod(frame.getChannel(), ((AMQResponseBody)body).getMethodPayload(),
                 ((AMQRequestBody)body).getRequestId());
         }
         else

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java Wed Feb 14 12:02:03 2007
@@ -28,8 +28,8 @@
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.cluster.util.LogMessage;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -43,23 +43,20 @@
     private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
     private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
 
-    ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
-        AMQProtocolSession protocolSession)
+    ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
+        super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession);
     }
 
-    ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
-        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+    ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
     {
-        this(queueRegistry, exchangeRegistry, protocolSession);
+        this(virtualHostRegistry, protocolSession);
         _handlers.putAll(s._handlers);
     }
 
-    ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
-        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+    ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
     {
-        this(queueRegistry, exchangeRegistry, protocolSession);
+        this(virtualHostRegistry, protocolSession);
         init(factory);
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.cluster;
 
+import org.apache.qpid.framing.AMQShortString;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -37,6 +39,11 @@
         _port = port;
     }
 
+    public SimpleMemberHandle(AMQShortString details)
+    {
+        this(details.toString());
+    }
+
     public SimpleMemberHandle(String details)
     {
         String[] parts = details.split(":");
@@ -84,14 +91,14 @@
         return _host.equals(host) && _port == port;
     }
 
-    public String getDetails()
+    public AMQShortString getDetails()
     {
-        return _host + ":" + _port;
+        return new AMQShortString(_host + ":" + _port);
     }
 
     public String toString()
     {
-        return getDetails();
+        return getDetails().toString();
     }
 
     static List<MemberHandle> stringToMembers(String membership)

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java Wed Feb 14 12:02:03 2007
@@ -21,36 +21,24 @@
 package org.apache.qpid.server.cluster;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.queue.AMQMessage;
 
-import java.util.Arrays;
-import java.util.List;
+import java.util.Iterator;
 
 public class SimpleSendable implements Sendable
 {
-    private final List<AMQBody> _bodies;
+    private final AMQMessage _message;
 
-    public SimpleSendable(AMQBody body)
+    public SimpleSendable(AMQMessage message)
     {
-        this(Arrays.asList(body));
-    }
-
-    public SimpleSendable(List<AMQBody> bodies)
-    {
-        _bodies = bodies;
+        _message = message;
     }
 
     public void send(int channel, Member member) throws AMQException
     {
-        for (AMQBody body : _bodies)
-        {
-            member.send(new AMQFrame(channel, body));
-        }
-    }
-
-    public String toString()
-    {
-        return _bodies.toString();
+        // TODO: If refs are used, this will not work!
+        member.send(new AMQFrame(channel, _message.getTransferBody()));
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 
@@ -54,19 +55,19 @@
         }
     }
 
-    protected final void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
     {
         for(ClusterMethodHandler<A> handler : _handlers)
         {
-            handler.peer(session, evt);
+            handler.peer(stateMgr, evt);
         }
     }
 
-    protected final void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    protected final void client(AMQStateManager stateMgr,  AMQMethodEvent<A> evt) throws AMQException
     {
         for(ClusterMethodHandler<A> handler : _handlers)
         {
-            handler.client(session, evt);
+            handler.client(stateMgr, evt);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java Wed Feb 14 12:02:03 2007
@@ -27,8 +27,10 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueBindBody;
 import org.apache.qpid.framing.QueueDeleteBody;
@@ -46,7 +48,7 @@
 class ChannelQueueManager
 {
     private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class);
-    private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>();
+    private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>();
 
     ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler()
     {
@@ -68,37 +70,37 @@
         return new MessageConsumeHandler();
     }
 
-    private void set(int channel, String queue)
+    private void set(int channel, AMQShortString queue)
     {
         _channelQueues.put(channel, queue);
         _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue));
     }
 
-    private String get(int channel)
+    private AMQShortString get(int channel)
     {
-        String queue = _channelQueues.get(channel);
+        AMQShortString queue = _channelQueues.get(channel);
         _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue));
         return queue;
     }
 
     private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
     {
-        protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+        protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
         {
         }
 
-        protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+        protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
         {
             set(evt.getChannelId(), evt.getMethod().queue);
         }
     }
     private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
     {
-        protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+        protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
         {
         }
 
-        protected void client(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+        protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
         {
             if(evt.getMethod().queue == null)
             {
@@ -108,11 +110,11 @@
     }
     private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
     {
-        protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+        protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
         {
         }
 
-        protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+        protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
         {
             if(evt.getMethod().queue == null)
             {
@@ -123,11 +125,11 @@
 
     private class MessageConsumeHandler extends ClusterMethodHandler<MessageConsumeBody>
     {
-        protected void peer(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+        protected void peer(AMQStateManager stateMgr,  AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
         {
         }
 
-        protected void client(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+        protected void client(AMQStateManager stateMgr, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
         {
             if(evt.getMethod().queue == null)
             {

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -32,18 +32,20 @@
 
 public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
 {
-    public final void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
     {
+        AMQProtocolSession session = stateMgr.getProtocolSession();
+
         if (ClusteredProtocolSession.isPeerSession(session))
         {
-            peer(session, evt);
+            peer(stateMgr, evt);
         }
         else
         {
-            client(session, evt);
+            client(stateMgr,  evt);
         }
     }
 
-    protected abstract void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
-    protected abstract void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+    protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
+    protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.MessageCancelBody;
 import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.framing.MessageTransferBody;
@@ -162,17 +163,15 @@
 
     private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
     {
-        public void methodReceived(AMQProtocolSession session,
-                                   AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
+        public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
         {
-            _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
+            _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession()));
         }
     }
 
     private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
     {
-        public void methodReceived(AMQProtocolSession session,
-                                   AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
+        public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
         {
             _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
         }
@@ -180,8 +179,7 @@
 
     private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
     {
-        public void methodReceived(AMQProtocolSession protocolSession,
-                                   AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
+        public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
         {
             _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
         }
@@ -189,8 +187,7 @@
 
     private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
     {
-        public void methodReceived(AMQProtocolSession protocolSession,
-                                   AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
+        public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
         {
             _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
         }
@@ -198,8 +195,7 @@
 
     private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
     {
-        public void methodReceived(AMQProtocolSession session,
-                                   AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
+        public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
         {
             ClusterMembershipBody body = evt.getMethod();
             _groupMgr.handleMembershipAnnouncement(new String(body.members));
@@ -208,15 +204,14 @@
 
     private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
     {
-        public void methodReceived(AMQProtocolSession session,
-                                   AMQMethodEvent<ClusterPingBody> evt) throws AMQException
+        public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException
         {
             MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
             _groupMgr.handlePing(peer, evt.getMethod().load);
             if (evt.getMethod().responseRequired)
             {
                 evt.getMethod().load = _loadTable.getLocalLoad();
-                session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
+                stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
             }
         }
     }
@@ -228,12 +223,12 @@
             super(ConnectionOpenMethodHandler.getInstance());
         }
 
-        void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
+        void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt)
         {
-            String capabilities = evt.getMethod().capabilities;
+            AMQShortString capabilities = evt.getMethod().capabilities;
             if (ClusterCapability.contains(capabilities))
             {
-                ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
+                ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities));
             }
             else
             {
@@ -249,9 +244,9 @@
             super(ConnectionCloseMethodHandler.getInstance());
         }
 
-        void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt)
+        void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt)
         {
-            if (!ClusteredProtocolSession.isPeerSession(session))
+            if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession()))
             {
                 _loadTable.decrementLocalLoad();
             }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java Wed Feb 14 12:02:03 2007
@@ -38,18 +38,18 @@
         _base = base;
     }
 
-    public void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateMgr,  AMQMethodEvent<A> evt) throws AMQException
     {
-        preHandle(session, evt);
-        _base.methodReceived(session, evt);
-        postHandle(session, evt);
+        preHandle(stateMgr, evt);
+        _base.methodReceived(stateMgr, evt);
+        postHandle(stateMgr, evt);
     }
 
-    void preHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
     {
     }
 
-    void postHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
     {
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.cluster.ClusteredProtocolSession;
 import org.apache.qpid.server.cluster.GroupManager;
 import org.apache.qpid.server.cluster.util.LogMessage;
@@ -34,6 +35,7 @@
 import org.apache.qpid.server.queue.PrivateQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.RemoteQueueProxy;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class LocalQueueDeclareHandler extends QueueDeclareHandler
 {
@@ -45,12 +47,12 @@
         _groupMgr = groupMgr;
     }
 
-    protected String createName()
+    protected AMQShortString createName()
     {
-        return super.createName() + "@" + _groupMgr.getLocal().getDetails();
+        return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails());
     }
 
-    protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
+    protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException
     {
         //is it private or shared:
         if (body.exclusive)
@@ -60,18 +62,18 @@
                 //need to get peer from the session...
                 MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
                 _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
-                return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry);
+                return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost);
             }
             else
             {
                 _logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
-                return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry);
+                return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost);
             }
         }
         else
         {
             _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
-            return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry);
+            return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java Wed Feb 14 12:02:03 2007
@@ -31,7 +31,7 @@
 
 public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
 {
-    public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
     {
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java Wed Feb 14 12:02:03 2007
@@ -47,14 +47,14 @@
         _client = client;
     }
 
-    protected void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
     {
-        _peer.methodReceived(session, evt);
+        _peer.methodReceived(stateMgr, evt);
     }
 
-    protected void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+    protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
     {
-        _client.methodReceived(session, evt);
+        _client.methodReceived(stateMgr, evt);
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java Wed Feb 14 12:02:03 2007
@@ -41,11 +41,11 @@
         _handler = handler;
     }
 
-    protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+    protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
     {
     }
 
-    protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
+    protected void client(AMQStateManager stateMgr,  AMQMethodEvent<QueueDeclareBody> evt)
             throws AMQException
     {
         setName(evt.getMethod());//need to set the name before propagating this method

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java Wed Feb 14 12:02:03 2007
@@ -32,15 +32,21 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class RemoteCancelHandler implements StateAwareMethodListener<MessageCancelBody>
 {
     private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
 
-    public void methodReceived(AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
         //By convention, consumers setup between brokers use the queue name as the consumer tag:
-        AMQQueue queue = session.getQueueRegistry().getQueue(evt.getMethod().getDestination());
+        AMQQueue queue = queueRegistry.getQueue(evt.getMethod().getDestination());
         if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));