You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ac...@apache.org on 2006/01/09 10:39:33 UTC

svn commit: r367253 - in /incubator/activemq/trunk: activemq-core/src/test/java/org/apache/activemq/ assembly/src/test/java/org/apache/activemq/usecases/

Author: aco
Date: Mon Jan  9 01:39:12 2006
New Revision: 367253

URL: http://svn.apache.org/viewcvs?rev=367253&view=rev
Log:
- Added test cases for multiple brokers

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,367 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.*;
+
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ConnectionClosedException;
+import org.springframework.core.io.Resource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Collection;
+import java.net.URI;
+
+/**
+ * Test case support that allows the easy management and connection of several brokers.
+ *
+ * @version $Revision$
+ */
+public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
+    public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
+    public static int MAX_SETUP_TIME = 5000;
+
+    protected Map brokers;
+    protected Map destinations;
+
+    protected int messageSize = 1;
+
+    protected boolean verbose = false;
+
+    protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+        BrokerService localBroker  = ((BrokerItem)brokers.get(localBrokerName)).broker;
+        BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker;
+
+        bridgeBrokers(localBroker, remoteBroker);
+    }
+
+    // Overwrite this method to specify how you want to bridge the two brokers
+    // By default, bridge them using add network connector of the local broker and the first connector of the remote broker
+    protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+        List transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI;
+        if (!transportConnectors.isEmpty()) {
+            remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
+            localBroker.addNetworkConnector("static:" + remoteURI);
+        } else {
+            throw new Exception("Remote broker has no registered connectors.");
+        }
+
+        MAX_SETUP_TIME = 2000;
+    }
+
+    // This will interconnect all brokes using multicast
+    protected void bridgeAllBrokers() throws Exception {
+        bridgeAllBrokers("default");
+    }
+
+    protected void bridgeAllBrokers(String groupName) throws Exception {
+        Collection brokerList = brokers.values();
+        for (Iterator i=brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = ((BrokerItem)i.next()).broker;
+            List transportConnectors = broker.getTransportConnectors();
+
+            if (transportConnectors.isEmpty()) {
+                broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
+                transportConnectors = broker.getTransportConnectors();
+            }
+
+            TransportConnector transport = (TransportConnector)transportConnectors.get(0);
+            transport.setDiscoveryUri(new URI("multicast://" + groupName));
+            broker.addNetworkConnector("multicast://" + groupName);
+        }
+
+        // Multicasting may take longer to setup
+        MAX_SETUP_TIME = 8000;
+    }
+
+    protected void startAllBrokers() throws Exception {
+        Collection brokerList = brokers.values();
+        for (Iterator i=brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = ((BrokerItem)i.next()).broker;
+            broker.start();
+        }
+
+        Thread.sleep(MAX_SETUP_TIME);
+    }
+
+    protected BrokerService createBroker(String brokerName) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        brokers.put(brokerName, new BrokerItem(broker));
+
+        return broker;
+    }
+
+    protected BrokerService createBroker(URI brokerUri) throws Exception {
+        BrokerService broker = BrokerFactory.createBroker(brokerUri);
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+
+        return broker;
+    }
+
+    protected BrokerService createBroker(Resource configFile) throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
+        brokerFactory.afterPropertiesSet();
+
+        BrokerService broker = brokerFactory.getBroker();
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+
+        return broker;
+    }
+
+    protected Connection createConnection(String brokerName) throws Exception {
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createConnection();
+        }
+        return null;
+    }
+
+    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createConsumer(dest);
+        }
+        return null;
+    }
+
+    protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception {
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createDurableSubscriber(dest, name);
+        }
+        return null;
+    }
+
+    protected MessageIdList getBrokerMessages(String brokerName) {
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.getAllMessages();
+        }
+        return null;
+    }
+
+    protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) {
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.getConsumerMessages(consumer);
+        }
+        return null;
+    }
+
+    protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+
+        Connection conn = brokerItem.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = brokerItem.createProducer(destination, sess);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
+            producer.send(msg);
+        }
+
+        producer.close();
+        sess.close();
+        conn.close();
+        brokerItem.connections.remove(conn);
+    }
+
+    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
+        TextMessage msg = session.createTextMessage();
+
+        // Pad message text
+        if (initText.length() < messageSize) {
+            char[] data = new char[messageSize - initText.length()];
+            Arrays.fill(data, '*');
+            String str = new String(data);
+            msg.setText(initText + str);
+
+        // Do not pad message text
+        } else {
+            msg.setText(initText);
+        }
+
+        return msg;
+    }
+
+    protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException {
+        Destination dest;
+        if (topic) {
+            dest = new ActiveMQTopic(name);
+            destinations.put(name, dest);
+            return (ActiveMQDestination)dest;
+        } else {
+            dest = new ActiveMQQueue(name);
+            destinations.put(name, dest);
+            return (ActiveMQDestination)dest;
+        }
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        brokers = new HashMap();
+        destinations = new HashMap();
+    }
+
+    protected void tearDown() throws Exception {
+        destroyAllBrokers();
+        super.tearDown();
+    }
+
+    protected void destroyBroker(String brokerName) throws Exception {
+        BrokerItem brokerItem = (BrokerItem)brokers.remove(brokerName);
+
+        if (brokerItem != null) {
+            brokerItem.destroy();
+        }
+    }
+
+    protected void destroyAllBrokers() throws Exception {
+        for (Iterator i=brokers.values().iterator(); i.hasNext();) {
+            BrokerItem brokerItem = (BrokerItem)i.next();
+            brokerItem.destroy();
+        }
+        brokers.clear();
+    }
+
+
+    // Class to group broker components together
+    protected class BrokerItem {
+        public BrokerService broker;
+        public ActiveMQConnectionFactory factory;
+        public List connections;
+        public Map consumers;
+        public MessageIdList allMessages = new MessageIdList();
+
+        private IdGenerator id;
+
+        public boolean persistent = false;
+
+        public BrokerItem(BrokerService broker) throws Exception {
+            this.broker = broker;
+
+            factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+            consumers = Collections.synchronizedMap(new HashMap());
+            connections = Collections.synchronizedList(new ArrayList());
+            allMessages.setVerbose(verbose);
+            id = new IdGenerator(broker.getBrokerName() + ":");
+        }
+
+        public Connection createConnection() throws Exception {
+            Connection conn = factory.createConnection();
+            conn.setClientID(id.generateId());
+
+            connections.add(conn);
+            return conn;
+        }
+
+        public MessageConsumer createConsumer(Destination dest) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return createConsumer(dest, s);
+        }
+
+        public MessageConsumer createConsumer(Destination dest, Session sess) throws Exception {
+            MessageConsumer client = sess.createConsumer(dest);
+            MessageIdList messageIdList = new MessageIdList();
+            messageIdList.setParent(allMessages);
+            client.setMessageListener(messageIdList);
+            consumers.put(client, messageIdList);
+
+            return client;
+        }
+
+        public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return createDurableSubscriber(dest, s, name);
+        }
+
+        public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception {
+            MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name);
+            MessageIdList messageIdList = new MessageIdList();
+            messageIdList.setParent(allMessages);
+            client.setMessageListener(messageIdList);
+            consumers.put(client, messageIdList);
+
+            return client;
+        }
+
+        public MessageIdList getAllMessages() {
+            return allMessages;
+        }
+
+        public MessageIdList getConsumerMessages(MessageConsumer consumer) {
+            return (MessageIdList)consumers.get(consumer);
+        }
+
+        public MessageProducer createProducer(Destination dest) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return createProducer(dest, s);
+        }
+
+        public MessageProducer createProducer(Destination dest, Session sess) throws Exception {
+            MessageProducer client = sess.createProducer(dest);
+            client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            return client;
+        }
+
+        public void destroy() throws Exception {
+            while (!connections.isEmpty()) {
+                Connection c = (Connection)connections.remove(0);
+                try {
+                    c.close();
+                } catch (ConnectionClosedException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            broker.stop();
+            consumers.clear();
+
+            broker = null;
+            connections = null;
+            consumers = null;
+            factory = null;
+        }
+    }
+
+}

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,119 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import java.util.Map;
+import java.util.HashMap;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport {
+    public static final int BROKER_COUNT   = 5;  // number of brokers to network
+    public static final int CONSUMER_COUNT = 3;  // consumers per broker
+    public static final int PRODUCER_COUNT = 3;  // producers per broker
+    public static final int MESSAGE_COUNT  = 10; // messages per producer
+
+    protected Map consumerMap;
+
+    public void testTopicAllConnected() throws Exception {
+        bridgeAllBrokers();
+        startAllBrokers();
+
+
+        // Setup topic destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=0; j<CONSUMER_COUNT; j++) {
+                consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+            }
+        }
+
+        // Send messages
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=0; j<PRODUCER_COUNT; j++) {
+                sendMessages("Broker" + i, dest, MESSAGE_COUNT);
+            }
+        }
+
+        // Get message count
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=0; j<CONSUMER_COUNT; j++) {
+                MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
+                msgs.waitForMessagesToArrive(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
+                assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, msgs.getMessageCount());
+            }
+        }
+    }
+
+    public void testQueueAllConnected() throws Exception {
+        bridgeAllBrokers();
+
+        startAllBrokers();
+
+        // Setup topic destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=0; j<CONSUMER_COUNT; j++) {
+                consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+            }
+        }
+
+        // Send messages
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=0; j<PRODUCER_COUNT; j++) {
+                sendMessages("Broker" + i, dest, MESSAGE_COUNT);
+            }
+        }
+
+        // Wait for messages to be delivered
+        Thread.sleep(2000);
+
+        // Get message count
+        int totalMsg = 0;
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=0; j<CONSUMER_COUNT; j++) {
+                MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
+                totalMsg += msgs.getMessageCount();
+            }
+        }
+
+        assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+
+        // Setup n brokers
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false"));
+        }
+
+        consumerMap = new HashMap();
+    }
+}

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,81 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.transport.TransportFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClientsTest {
+    protected List bridges = new ArrayList();
+
+    protected void bridgeAllBrokers(String groupName) throws Exception {
+        for (int i=1; i<=BROKER_COUNT; i++) {
+            for (int j=1; j<=BROKER_COUNT; j++) {
+                if (i != j) {
+                    bridgeBrokers("Broker" + i, "Broker" + j);
+                }
+            }
+        }
+
+        MAX_SETUP_TIME = 5000;
+    }
+
+    protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+        List remoteTransports = remoteBroker.getTransportConnectors();
+        List localTransports  = localBroker.getTransportConnectors();
+
+        URI remoteURI, localURI;
+        if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
+            remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
+            localURI  = ((TransportConnector)localTransports.get(0)).getConnectUri();
+
+            // Ensure that we are connecting using tcp
+            if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
+                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                                                                           TransportFactory.connect(remoteURI));
+                bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
+                bridges.add(bridge);
+
+                bridge.start();
+            } else {
+                throw new Exception("Remote broker or local broker is not using tcp connectors");
+            }
+        } else {
+            throw new Exception("Remote broker or local broker has no registered connectors.");
+        }
+    }
+
+    public void setUp() throws Exception {
+        super.setUp();
+
+        // Assign a tcp connector to each broker
+        int j=0;
+        for (Iterator i=brokers.values().iterator(); i.hasNext();) {
+            ((BrokerItem)i.next()).broker.addConnector("tcp://localhost:" + (61616 + j++));
+        }
+    }
+}

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,195 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.util.MessageIdList;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    public void test_AB_BC_BrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+
+        // Let's try to wait for any messages. Should be none.
+        Thread.sleep(1000);
+
+        // Get message count
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+        assertEquals(0, msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA <- BrokerB -> BrokerC
+     */
+    public void test_BA_BC_BrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerB", "BrokerA");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerB", dest, 10);
+
+        // Let's try to wait for any messages.
+        Thread.sleep(1000);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        // Total received should be 10
+        assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA -> BrokerB <- BrokerC
+     */
+    public void test_AB_CB_BrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerC", "BrokerB");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Get message count
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+
+        msgsB.waitForMessagesToArrive(20);
+
+        assertEquals(20, msgsB.getMessageCount());
+    }
+
+    /**
+     * BrokerA <-> BrokerB <-> BrokerC
+     */
+    public void testAllConnectedBrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerA");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerB");
+        bridgeBrokers("BrokerA", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerA");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Let's try to wait for any messages.
+        Thread.sleep(1000);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA <-> BrokerB <-> BrokerC
+     */
+    public void testAllConnectedUsingMulticast() throws Exception {
+        // Setup broker networks
+        bridgeAllBrokers();
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Let's try to wait for any messages.
+        Thread.sleep(1000);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
+    }
+}

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,60 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.transport.TransportFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerQueueNetworkUsingTcpTest extends ThreeBrokerQueueNetworkTest {
+    protected List bridges = new ArrayList();
+
+    protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+        List remoteTransports = remoteBroker.getTransportConnectors();
+        List localTransports  = localBroker.getTransportConnectors();
+
+        URI remoteURI, localURI;
+        if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
+            remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
+            localURI  = ((TransportConnector)localTransports.get(0)).getConnectUri();
+
+            // Ensure that we are connecting using tcp
+            if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
+                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                                                                           TransportFactory.connect(remoteURI));
+                bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
+                bridges.add(bridge);
+
+                bridge.start();
+            } else {
+                throw new Exception("Remote broker or local broker is not using tcp connectors");
+            }
+        } else {
+            throw new Exception("Remote broker or local broker has no registered connectors.");
+        }
+
+        MAX_SETUP_TIME = 2000;
+    }
+}

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,226 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Destination;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    public void test_AB_BC_BrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(10);
+        msgsB.waitForMessagesToArrive(20);
+        msgsC.waitForMessagesToArrive(20);
+
+        assertEquals(10, msgsA.getMessageCount());
+        assertEquals(20, msgsB.getMessageCount());
+        assertEquals(20, msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA <- BrokerB -> BrokerC
+     */
+    public void test_BA_BC_BrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerB", "BrokerA");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(20);
+        msgsB.waitForMessagesToArrive(10);
+        msgsC.waitForMessagesToArrive(20);
+
+        assertEquals(20, msgsA.getMessageCount());
+        assertEquals(10, msgsB.getMessageCount());
+        assertEquals(20, msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA -> BrokerB <- BrokerC
+     */
+    public void test_AB_CB_BrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerC", "BrokerB");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(10);
+        msgsB.waitForMessagesToArrive(30);
+        msgsC.waitForMessagesToArrive(10);
+
+        assertEquals(10, msgsA.getMessageCount());
+        assertEquals(30, msgsB.getMessageCount());
+        assertEquals(10, msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA <-> BrokerB <-> BrokerC
+     */
+    public void testAllConnectedBrokerNetwork() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerA");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerB");
+        bridgeBrokers("BrokerA", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerA");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(30);
+        msgsB.waitForMessagesToArrive(30);
+        msgsC.waitForMessagesToArrive(30);
+
+        assertEquals(30, msgsA.getMessageCount());
+        assertEquals(30, msgsB.getMessageCount());
+        assertEquals(30, msgsC.getMessageCount());
+    }
+
+    /**
+     * BrokerA <-> BrokerB <-> BrokerC
+     */
+    public void testAllConnectedUsingMulticast() throws Exception {
+        // Setup broker networks
+        bridgeAllBrokers();
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerC", dest, 10);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(30);
+        msgsB.waitForMessagesToArrive(30);
+        msgsC.waitForMessagesToArrive(30);
+
+        assertEquals(30, msgsA.getMessageCount());
+        assertEquals(30, msgsB.getMessageCount());
+        assertEquals(30, msgsC.getMessageCount());
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
+    }
+}

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java Mon Jan  9 01:39:12 2006
@@ -0,0 +1,60 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.transport.TransportFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerTopicNetworkUsingTcpTest extends ThreeBrokerTopicNetworkTest {
+    protected List bridges = new ArrayList();
+
+    protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+        List remoteTransports = remoteBroker.getTransportConnectors();
+        List localTransports  = localBroker.getTransportConnectors();
+
+        URI remoteURI, localURI;
+        if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
+            remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
+            localURI  = ((TransportConnector)localTransports.get(0)).getConnectUri();
+
+            // Ensure that we are connecting using tcp
+            if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
+                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                                                                           TransportFactory.connect(remoteURI));
+                bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
+                bridges.add(bridge);
+
+                bridge.start();
+            } else {
+                throw new Exception("Remote broker or local broker is not using tcp connectors");
+            }
+        } else {
+            throw new Exception("Remote broker or local broker has no registered connectors.");
+        }
+
+        MAX_SETUP_TIME = 2000;
+    }
+}