You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [12/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/...
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java Wed Feb 14 12:02:03 2007
@@ -65,29 +65,35 @@
_connection.close();
}
- public void onMessage(Message response)
+ public synchronized void onMessage(Message response)
{
+
System.out.println("Received " + (++_count) + " of " + _expected + " responses.");
if(_count == _expected)
{
- synchronized(this)
- {
- notifyAll();
- }
+
+ notifyAll();
}
+
+
}
- synchronized void waitUntilComplete() throws InterruptedException
+ synchronized void waitUntilComplete() throws Exception
{
- while(_count < _expected)
+
+ if(_count < _expected)
+ {
+ wait(10000L);
+ }
+ if(_count < _expected)
{
- wait();
+ throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);
}
}
static AMQConnection connect(String broker) throws Exception
{
- return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+ return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
}
public static void main(String[] argv) throws Exception
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java Wed Feb 14 12:02:03 2007
@@ -73,7 +73,7 @@
static AMQConnection connect(String broker) throws Exception
{
- return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+ return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
}
// public static void main(String[] argv) throws Exception
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java Wed Feb 14 12:02:03 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.test.unit.client.forwardall;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
/**
* Queue that allows several private queues to be registered and bound
@@ -29,15 +30,19 @@
*/
class SpecialQueue extends AMQQueue
{
- private final String name;
+ private final AMQShortString name;
SpecialQueue(String name)
{
+ this(new AMQShortString(name));
+ }
+ SpecialQueue(AMQShortString name)
+ {
super(name, true);
this.name = name;
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return name;
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java Wed Feb 14 12:02:03 2007
@@ -54,7 +54,7 @@
protected void setUp() throws Exception
{
super.setUp();
- connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path");
+ connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test");
destination = new AMQQueue(randomize("LatencyTest"), true);
session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -101,6 +101,7 @@
}
catch (Exception e)
{
+ e.printStackTrace();
fail("This Test should succeed but failed due to: " + e);
}
finally
@@ -236,7 +237,7 @@
public void onMessage(Message message)
{
- received++;
+
try
{
if (message instanceof ObjectMessage)
@@ -255,13 +256,11 @@
items.add(e);
}
- if (waiting)
- {
synchronized(this)
{
+ received++;
notify();
}
- }
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.mina.common.IoSession;
import junit.framework.TestCase;
@@ -45,7 +46,7 @@
return (TestIoSession) _minaProtocolSession;
}
- public String genQueueName()
+ public AMQShortString genQueueName()
{
return generateQueueName();
}
@@ -80,26 +81,26 @@
public void testGenerateQueueName()
{
- String testAddress;
+ AMQShortString testAddress;
- //test address with / and ; chars which generateQueueName should remove
+ //test address with / and ; chars which generateQueueName should removeKey
_testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
_testSession.getMinaProtocolSession().setLocalPort(_port);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
//test empty address
_testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress);
+ assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
//test address with no special chars
_testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Wed Feb 14 12:02:03 2007
@@ -28,7 +28,7 @@
protected Connection createConnection() throws AMQException, URLSyntaxException
{
return new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
}
public void testTempoaryQueue() throws Exception
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java Wed Feb 14 12:02:03 2007
@@ -51,7 +51,7 @@
public void testAllMethodsThrowAfterConnectionClose() throws Exception
{
- AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "/test_path");
+ AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test");
Topic destination1 = new AMQTopic("t1");
TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java Wed Feb 14 12:02:03 2007
@@ -11,6 +11,7 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -39,12 +40,12 @@
public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Wed Feb 14 12:02:03 2007
@@ -1,11 +1,23 @@
-/**
- * User: Robert Greig
- * Date: 12-Dec-2006
- ******************************************************************************
- * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
- * this program may be photocopied reproduced or translated to another
- * program language without prior written consent of JP Morgan Chase Ltd
- ******************************************************************************/
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.unit.message;
import junit.framework.TestCase;
@@ -47,7 +59,7 @@
public void testStreamMessageEOF() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -60,7 +72,7 @@
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -101,7 +113,7 @@
public void testModifyReceivedMessageExpandsBuffer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue("testQ");
MessageConsumer consumer = consumerSession.createConsumer(queue);
@@ -123,7 +135,7 @@
}
}
});
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
con.start();
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Feb 14 12:02:03 2007
@@ -25,7 +25,7 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -39,10 +39,23 @@
public class DurableSubscriptionTest extends TestCase
{
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -83,7 +96,7 @@
public void testDurability() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -128,6 +141,6 @@
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(DurableSubscriptionTest.class));
+ return new junit.framework.TestSuite(DurableSubscriptionTest.class);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java Wed Feb 14 12:02:03 2007
@@ -48,7 +48,7 @@
public void testUnidentifiedProducer() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(null);
MessageConsumer consumer1 = session1.createConsumer(topic);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Wed Feb 14 12:02:03 2007
@@ -51,7 +51,7 @@
public void testTopicSubscriptionUnsubscription() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
@@ -97,7 +97,7 @@
{
AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
@@ -112,7 +112,7 @@
{
session1.close();
con.close();
- con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
con.start();
session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
publisher = session1.createPublisher(null);
@@ -134,11 +134,11 @@
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic3");
- AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
- AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
@@ -149,7 +149,7 @@
assertNotNull(tm);
con2.close();
publisher.publish(session1.createTextMessage("Hello2"));
- con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
@@ -163,14 +163,14 @@
public void testTextMessageCreation() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic4");
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
con.start();
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
- tm = (TextMessage) consumer1.receive(2000);
+ tm = (TextMessage) consumer1.receive(200000L);
assertNotNull(tm);
String msgText = tm.getText();
assertEquals("Hello", msgText);
@@ -178,7 +178,7 @@
msgText = tm.getText();
assertNull(msgText);
publisher.publish(tm);
- tm = (TextMessage) consumer1.receive(2000);
+ tm = (TextMessage) consumer1.receive(20000000L);
assertNotNull(tm);
msgText = tm.getText();
assertNull(msgText);
@@ -202,7 +202,7 @@
public void testSendingSameMessage() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
@@ -219,12 +219,13 @@
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(),receivedMessage.getText());
+ conn.close();
}
public void testTemporaryTopic() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Wed Feb 14 12:02:03 2007
@@ -23,8 +23,11 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
import org.apache.log4j.Logger;
import javax.jms.*;
@@ -54,10 +57,11 @@
protected void setUp() throws Exception
{
super.setUp();
- queue1 = new AMQQueue("Q1", "Q1", false, true);
+ TransportConnection.createVMBroker(1);
+ queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
queue2 = new AMQQueue("Q2", false);
- con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
session = con.createSession(true, 0);
consumer1 = session.createConsumer(queue1);
//Dummy just to create the queue.
@@ -66,16 +70,26 @@
producer2 = session.createProducer(queue2);
con.start();
- prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test");
+ prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
+
+// //add some messages
+// prepProducer1.send(prepSession.createTextMessage("A"));
+// prepProducer1.send(prepSession.createTextMessage("B"));
+// prepProducer1.send(prepSession.createTextMessage("C"));
+//
+// testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+// testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+// testConsumer2 = testSession.createConsumer(queue2);
}
protected void tearDown() throws Exception
{
con.close();
prepCon.close();
+ TransportConnection.killAllVMBrokers();
super.tearDown();
}
@@ -96,9 +110,9 @@
//commit
session.commit();
-
+ testCon.start();
//ensure sent messages can be received and received messages are gone
- testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
testConsumer2 = testSession.createConsumer(queue2);
@@ -108,6 +122,7 @@
expect("Y", testConsumer2.receive(1000));
expect("Z", testConsumer2.receive(1000));
+ testConsumer1 = testSession.createConsumer(queue1);
assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));
testCon.close();
@@ -141,11 +156,12 @@
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
-
+ testCon.start();
+ testConsumer1 = testSession.createConsumer(queue1);
//commit
session.commit();
- testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
testConsumer2 = testSession.createConsumer(queue2);
@@ -164,7 +180,7 @@
public void testResendsMsgsAfterSessionClose() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue3 = new AMQQueue("Q3", false);
@@ -172,7 +188,7 @@
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue3);
@@ -234,6 +250,7 @@
con.close();
con2.close();
+
}
// This checks that queue Q1 is in fact empty and does not have any stray
@@ -251,6 +268,6 @@
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+ return new junit.framework.TestSuite(TransactedTest.class);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java Wed Feb 14 12:02:03 2007
@@ -32,6 +32,7 @@
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.framing.ConnectionSecureBody;
@@ -128,9 +129,9 @@
class ConnectionTuneHandler extends ConnectionTuneMethodHandler
{
- protected AMQMethodBody createConnectionOpenMethodBody(String path, String capabilities, boolean insist)
+ protected AMQMethodBody createConnectionOpenMethodBody(AMQProtocolSession protocolSession, AMQShortString path, AMQShortString capabilities, boolean insist)
{
- return super.createConnectionOpenMethodBody(path, ClusterCapability.add(capabilities, _identity), insist);
+ return super.createConnectionOpenMethodBody(protocolSession, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist);
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java Wed Feb 14 12:02:03 2007
@@ -46,8 +46,8 @@
ServerHandlerRegistry getHandlerRegistry()
{
- // TODO - FIX THIS!
- return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
+ // TODO - FIX THIS! These cannot be null to work correctly
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null);
}
private MethodHandlerFactory getHandlerFactory()
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.cluster;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -28,22 +30,22 @@
public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*";
public static final String PEER = "cluster_peer";
- public static String add(String original, MemberHandle identity)
+ public static AMQShortString add(AMQShortString original, MemberHandle identity)
{
- return original == null ? peer(identity) : original + " " + peer(identity);
+ return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity));
}
- private static String peer(MemberHandle identity)
+ private static AMQShortString peer(MemberHandle identity)
{
- return PEER + "=" + identity.getDetails();
+ return new AMQShortString(PEER + "=" + identity.getDetails());
}
- public static boolean contains(String in)
+ public static boolean contains(AMQShortString in)
{
- return in != null && in.contains(in);
+ return in != null; // && in.contains(in);
}
- public static MemberHandle getPeer(String in)
+ public static MemberHandle getPeer(AMQShortString in)
{
Matcher matcher = Pattern.compile(PATTERN).matcher(in);
if (matcher.matches())
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Wed Feb 14 12:02:03 2007
@@ -38,6 +38,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.net.InetSocketAddress;
@@ -55,13 +56,8 @@
}
public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
- {
- this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
- }
-
- public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
- {
- super(queueRegistry, exchangeRegistry);
+ {
+ super(registry);
ClusterBuilder builder = new ClusterBuilder(address);
_groupMgr = builder.getGroupManager();
_handlers = builder.getHandlerRegistry();
@@ -74,9 +70,9 @@
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
+ new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession));
}
void connect(String join) throws Exception
@@ -176,12 +172,12 @@
private boolean isMembershipAnnouncement(Object msg)
{
- return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+ return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody);
}
private boolean isBufferable(Object msg)
{
- return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+ return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame());
}
private boolean isBuffereable(AMQBody body)
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java Wed Feb 14 12:02:03 2007
@@ -24,7 +24,10 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
@@ -32,6 +35,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
@@ -39,10 +43,12 @@
{
private MemberHandle _peer;
- public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
+// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
{
- super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+ super(session, virtualHostRegistry, codecFactory, stateManager);
+// super(session, queueRegistry, exchangeRegistry, codecFactory);
}
public boolean isPeerSession()
@@ -65,7 +71,8 @@
AMQChannel channel = super.getChannel(channelId);
if (isPeerSession() && channel == null)
{
- channel = new OneUseChannel(channelId, this, getStateManager());
+ VirtualHost virtualHost = getVirtualHost();
+ channel = new OneUseChannel(channelId, this, virtualHost.getMessageStore(),virtualHost.getExchangeRegistry(), getStateManager());
addChannel(channel);
}
return channel;
@@ -101,19 +108,12 @@
*/
private class OneUseChannel extends AMQChannel
{
- public OneUseChannel(int channelId, AMQProtocolSession session,
- AMQMethodListener methodListener)
- {
- this(channelId, session, ApplicationRegistry.getInstance(), methodListener);
- }
-
- public OneUseChannel(int channelId, AMQProtocolSession session, IApplicationRegistry registry,
- AMQMethodListener methodListener)
+ public OneUseChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
{
super(channelId,
session,
- registry.getMessageStore(),
- registry.getExchangeRegistry(),
+ messageStore,
+ exchanges,
methodListener);
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Wed Feb 14 12:02:03 2007
@@ -22,16 +22,10 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ClusterJoinBody;
-import org.apache.qpid.framing.ClusterLeaveBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.framing.ClusterPingBody;
-import org.apache.qpid.framing.ClusterSuspectBody;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.cluster.policy.StandardPolicies;
import org.apache.qpid.server.cluster.replay.ReplayManager;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.InvokeMultiple;
import java.util.List;
@@ -96,7 +90,7 @@
Broker destination = findBroker(broker);
if(destination == null)
{
- _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
+ _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
}
else
{
@@ -114,12 +108,15 @@
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterPingBody ping = new ClusterPingBody((byte)0, (byte)9);
- ping.broker = _group.getLocal().getDetails();
- ping.responseRequired = true;
- ping.load = _loadTable.getLocalLoad();
+ ClusterPingBody ping = new ClusterPingBody((byte)0,
+ (byte)9,
+ ClusterPingBody.getClazz((byte)0, (byte)9),
+ ClusterPingBody.getMethod((byte)0, (byte)9),
+ _group.getLocal().getDetails(),
+ _loadTable.getLocalLoad(),
+ true);
BlockingHandler handler = new BlockingHandler();
- send(getLeader(), new SimpleSendable(ping), handler);
+ send(getLeader(), new SimpleBodySendable(ping), handler);
handler.waitForCompletion();
if (handler.failed())
{
@@ -162,9 +159,13 @@
_logger.info(new LogMessage("Connected to {0}. joining", leader));
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody join = new ClusterJoinBody((byte)0, (byte)9);
- join.broker = _group.getLocal().getDetails();
- send(leader, new SimpleSendable(join));
+ ClusterJoinBody join = new ClusterJoinBody((byte)0,
+ (byte)9,
+ ClusterJoinBody.getClazz((byte)0, (byte)9),
+ ClusterJoinBody.getMethod((byte)0, (byte)9),
+ _group.getLocal().getDetails());
+
+ send(leader, new SimpleBodySendable(join));
}
private Broker connectToLeader(MemberHandle member) throws AMQException
@@ -183,9 +184,13 @@
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterLeaveBody leave = new ClusterLeaveBody((byte)0, (byte)9);
- leave.broker = _group.getLocal().getDetails();
- send(getLeader(), new SimpleSendable(leave));
+ ClusterLeaveBody leave = new ClusterLeaveBody((byte)0,
+ (byte)9,
+ ClusterLeaveBody.getClazz((byte)0, (byte)9),
+ ClusterLeaveBody.getMethod((byte)0, (byte)9),
+ _group.getLocal().getDetails());
+
+ send(getLeader(), new SimpleBodySendable(leave));
}
private void suspect(MemberHandle broker) throws AMQException
@@ -206,9 +211,13 @@
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterSuspectBody suspect = new ClusterSuspectBody((byte)0, (byte)9);
- suspect.broker = broker.getDetails();
- send(getLeader(), new SimpleSendable(suspect));
+ ClusterSuspectBody suspect = new ClusterSuspectBody((byte)0,
+ (byte)9,
+ ClusterSuspectBody.getClazz((byte)0, (byte)9),
+ ClusterSuspectBody.getMethod((byte)0, (byte)9),
+ broker.getDetails());
+
+ send(getLeader(), new SimpleBodySendable(suspect));
}
}
@@ -230,10 +239,13 @@
//pass request on to leader:
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody request = new ClusterJoinBody((byte)0, (byte)9);
- request.broker = member.getDetails();
+ ClusterJoinBody request = new ClusterJoinBody((byte)0, (byte)9,
+ ClusterJoinBody.getClazz((byte)0, (byte)9),
+ ClusterJoinBody.getMethod((byte)0, (byte)9),
+ member.getDetails());
+
Broker leader = getLeader();
- send(leader, new SimpleSendable(request));
+ send(leader, new SimpleBodySendable(request));
_logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
}
}
@@ -277,9 +289,12 @@
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterMembershipBody announce = new ClusterMembershipBody((byte)0, (byte)9);
- //TODO: revise this way of converting String to bytes...
- announce.members = membership.getBytes();
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)0, (byte)9,
+ ClusterMembershipBody.getClazz((byte)0, (byte)9),
+ ClusterMembershipBody.getMethod((byte)0, (byte)9),
+ membership.getBytes());
+
+
return announce;
}
@@ -287,7 +302,7 @@
{
String membership = SimpleMemberHandle.membersToString(_group.getMembers());
ClusterMembershipBody announce = createAnnouncement(membership);
- broadcast(new SimpleSendable(announce));
+ broadcast(new SimpleBodySendable(announce));
_logger.info(new LogMessage("Membership announcement sent: {0}", membership));
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java Wed Feb 14 12:02:03 2007
@@ -83,7 +83,7 @@
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
String host = InetAddress.getLocalHost().getHostName();
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.cluster;
+import org.apache.qpid.framing.AMQShortString;
+
public interface MemberHandle
{
public String getHost();
@@ -30,5 +32,5 @@
public boolean matches(String host, int port);
- public String getDetails();
+ public AMQShortString getDetails();
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Wed Feb 14 12:02:03 2007
@@ -65,7 +65,6 @@
{
super(host, port);
_local = local;
- // TODO - FIX THIS
_legacyHandler = new ClientHandlerRegistry(local, null);
}
@@ -183,7 +182,7 @@
{
//signal redirection to waiting thread
ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
- String[] parts = redirect.host.split(":");
+ String[] parts = redirect.host.toString().split(":");
_connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
}
else
@@ -203,15 +202,15 @@
private void handleFrame(AMQFrame frame) throws AMQException
{
- AMQBody body = frame.bodyFrame;
+ AMQBody body = frame.getBodyFrame();
if (body instanceof AMQRequestBody)
{
- handleMethod(frame.channel, ((AMQRequestBody)body).getMethodPayload(),
+ handleMethod(frame.getChannel(), ((AMQRequestBody)body).getMethodPayload(),
((AMQRequestBody)body).getRequestId());
}
else if (body instanceof AMQResponseBody)
{
- handleMethod(frame.channel, ((AMQResponseBody)body).getMethodPayload(),
+ handleMethod(frame.getChannel(), ((AMQResponseBody)body).getMethodPayload(),
((AMQRequestBody)body).getRequestId());
}
else
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java Wed Feb 14 12:02:03 2007
@@ -28,8 +28,8 @@
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.util.HashMap;
import java.util.Map;
@@ -43,23 +43,20 @@
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
+ super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(queueRegistry, exchangeRegistry, protocolSession);
+ this(virtualHostRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(queueRegistry, exchangeRegistry, protocolSession);
+ this(virtualHostRegistry, protocolSession);
init(factory);
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.cluster;
+import org.apache.qpid.framing.AMQShortString;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -37,6 +39,11 @@
_port = port;
}
+ public SimpleMemberHandle(AMQShortString details)
+ {
+ this(details.toString());
+ }
+
public SimpleMemberHandle(String details)
{
String[] parts = details.split(":");
@@ -84,14 +91,14 @@
return _host.equals(host) && _port == port;
}
- public String getDetails()
+ public AMQShortString getDetails()
{
- return _host + ":" + _port;
+ return new AMQShortString(_host + ":" + _port);
}
public String toString()
{
- return getDetails();
+ return getDetails().toString();
}
static List<MemberHandle> stringToMembers(String membership)
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java Wed Feb 14 12:02:03 2007
@@ -21,36 +21,24 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.queue.AMQMessage;
-import java.util.Arrays;
-import java.util.List;
+import java.util.Iterator;
public class SimpleSendable implements Sendable
{
- private final List<AMQBody> _bodies;
+ private final AMQMessage _message;
- public SimpleSendable(AMQBody body)
+ public SimpleSendable(AMQMessage message)
{
- this(Arrays.asList(body));
- }
-
- public SimpleSendable(List<AMQBody> bodies)
- {
- _bodies = bodies;
+ _message = message;
}
public void send(int channel, Member member) throws AMQException
{
- for (AMQBody body : _bodies)
- {
- member.send(new AMQFrame(channel, body));
- }
- }
-
- public String toString()
- {
- return _bodies.toString();
+ // TODO: If refs are used, this will not work!
+ member.send(new AMQFrame(channel, _message.getTransferBody()));
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -54,19 +55,19 @@
}
}
- protected final void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.peer(session, evt);
+ handler.peer(stateMgr, evt);
}
}
- protected final void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.client(session, evt);
+ handler.client(stateMgr, evt);
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java Wed Feb 14 12:02:03 2007
@@ -27,8 +27,10 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeleteBody;
@@ -46,7 +48,7 @@
class ChannelQueueManager
{
private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class);
- private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>();
+ private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>();
ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler()
{
@@ -68,37 +70,37 @@
return new MessageConsumeHandler();
}
- private void set(int channel, String queue)
+ private void set(int channel, AMQShortString queue)
{
_channelQueues.put(channel, queue);
_logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue));
}
- private String get(int channel)
+ private AMQShortString get(int channel)
{
- String queue = _channelQueues.get(channel);
+ AMQShortString queue = _channelQueues.get(channel);
_logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue));
return queue;
}
private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
{
- protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
set(evt.getChannelId(), evt.getMethod().queue);
}
}
private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
{
- protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
}
- protected void client(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -108,11 +110,11 @@
}
private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
{
- protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
}
- protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -123,11 +125,11 @@
private class MessageConsumeHandler extends ClusterMethodHandler<MessageConsumeBody>
{
- protected void peer(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
}
- protected void client(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -32,18 +32,20 @@
public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
{
- public final void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
+ AMQProtocolSession session = stateMgr.getProtocolSession();
+
if (ClusteredProtocolSession.isPeerSession(session))
{
- peer(session, evt);
+ peer(stateMgr, evt);
}
else
{
- client(session, evt);
+ client(stateMgr, evt);
}
}
- protected abstract void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
- protected abstract void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.MessageTransferBody;
@@ -162,17 +163,15 @@
private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
{
- public void methodReceived(AMQProtocolSession session,
- AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
{
- _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
+ _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession()));
}
}
private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
{
- public void methodReceived(AMQProtocolSession session,
- AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
{
_groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -180,8 +179,7 @@
private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
{
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
{
_groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -189,8 +187,7 @@
private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
{
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
{
_groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -198,8 +195,7 @@
private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
{
- public void methodReceived(AMQProtocolSession session,
- AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
{
ClusterMembershipBody body = evt.getMethod();
_groupMgr.handleMembershipAnnouncement(new String(body.members));
@@ -208,15 +204,14 @@
private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
{
- public void methodReceived(AMQProtocolSession session,
- AMQMethodEvent<ClusterPingBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException
{
MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
_groupMgr.handlePing(peer, evt.getMethod().load);
if (evt.getMethod().responseRequired)
{
evt.getMethod().load = _loadTable.getLocalLoad();
- session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
+ stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
}
}
}
@@ -228,12 +223,12 @@
super(ConnectionOpenMethodHandler.getInstance());
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt)
{
- String capabilities = evt.getMethod().capabilities;
+ AMQShortString capabilities = evt.getMethod().capabilities;
if (ClusterCapability.contains(capabilities))
{
- ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
+ ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities));
}
else
{
@@ -249,9 +244,9 @@
super(ConnectionCloseMethodHandler.getInstance());
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt)
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt)
{
- if (!ClusteredProtocolSession.isPeerSession(session))
+ if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession()))
{
_loadTable.decrementLocalLoad();
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java Wed Feb 14 12:02:03 2007
@@ -38,18 +38,18 @@
_base = base;
}
- public void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- preHandle(session, evt);
- _base.methodReceived(session, evt);
- postHandle(session, evt);
+ preHandle(stateMgr, evt);
+ _base.methodReceived(stateMgr, evt);
+ postHandle(stateMgr, evt);
}
- void preHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
}
- void postHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.util.LogMessage;
@@ -34,6 +35,7 @@
import org.apache.qpid.server.queue.PrivateQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.RemoteQueueProxy;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class LocalQueueDeclareHandler extends QueueDeclareHandler
{
@@ -45,12 +47,12 @@
_groupMgr = groupMgr;
}
- protected String createName()
+ protected AMQShortString createName()
{
- return super.createName() + "@" + _groupMgr.getLocal().getDetails();
+ return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails());
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException
{
//is it private or shared:
if (body.exclusive)
@@ -60,18 +62,18 @@
//need to get peer from the session...
MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
_logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
- return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry);
+ return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost);
}
else
{
_logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
- return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry);
+ return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost);
}
}
else
{
_logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
- return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry);
+ return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost);
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java Wed Feb 14 12:02:03 2007
@@ -31,7 +31,7 @@
public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
{
- public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
{
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java Wed Feb 14 12:02:03 2007
@@ -47,14 +47,14 @@
_client = client;
}
- protected void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _peer.methodReceived(session, evt);
+ _peer.methodReceived(stateMgr, evt);
}
- protected void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _client.methodReceived(session, evt);
+ _client.methodReceived(stateMgr, evt);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java Wed Feb 14 12:02:03 2007
@@ -41,11 +41,11 @@
_handler = handler;
}
- protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt)
throws AMQException
{
setName(evt.getMethod());//need to set the name before propagating this method
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java Wed Feb 14 12:02:03 2007
@@ -32,15 +32,21 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class RemoteCancelHandler implements StateAwareMethodListener<MessageCancelBody>
{
private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
- public void methodReceived(AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
//By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = session.getQueueRegistry().getQueue(evt.getMethod().getDestination());
+ AMQQueue queue = queueRegistry.getQueue(evt.getMethod().getDestination());
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));