You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ac...@apache.org on 2006/01/09 10:39:33 UTC
svn commit: r367253 - in /incubator/activemq/trunk:
activemq-core/src/test/java/org/apache/activemq/
assembly/src/test/java/org/apache/activemq/usecases/
Author: aco
Date: Mon Jan 9 01:39:12 2006
New Revision: 367253
URL: http://svn.apache.org/viewcvs?rev=367253&view=rev
Log:
- Added test cases for multiple brokers
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,367 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.*;
+
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ConnectionClosedException;
+import org.springframework.core.io.Resource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Collection;
+import java.net.URI;
+
+/**
+ * Test case support that allows the easy management and connection of several brokers.
+ *
+ * @version $Revision$
+ */
+public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
+ public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
+ public static int MAX_SETUP_TIME = 5000;
+
+ protected Map brokers;
+ protected Map destinations;
+
+ protected int messageSize = 1;
+
+ protected boolean verbose = false;
+
+ protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+ BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker;
+ BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker;
+
+ bridgeBrokers(localBroker, remoteBroker);
+ }
+
+ // Overwrite this method to specify how you want to bridge the two brokers
+ // By default, bridge them using add network connector of the local broker and the first connector of the remote broker
+ protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+ List transportConnectors = remoteBroker.getTransportConnectors();
+ URI remoteURI;
+ if (!transportConnectors.isEmpty()) {
+ remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
+ localBroker.addNetworkConnector("static:" + remoteURI);
+ } else {
+ throw new Exception("Remote broker has no registered connectors.");
+ }
+
+ MAX_SETUP_TIME = 2000;
+ }
+
+ // This will interconnect all brokes using multicast
+ protected void bridgeAllBrokers() throws Exception {
+ bridgeAllBrokers("default");
+ }
+
+ protected void bridgeAllBrokers(String groupName) throws Exception {
+ Collection brokerList = brokers.values();
+ for (Iterator i=brokerList.iterator(); i.hasNext();) {
+ BrokerService broker = ((BrokerItem)i.next()).broker;
+ List transportConnectors = broker.getTransportConnectors();
+
+ if (transportConnectors.isEmpty()) {
+ broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
+ transportConnectors = broker.getTransportConnectors();
+ }
+
+ TransportConnector transport = (TransportConnector)transportConnectors.get(0);
+ transport.setDiscoveryUri(new URI("multicast://" + groupName));
+ broker.addNetworkConnector("multicast://" + groupName);
+ }
+
+ // Multicasting may take longer to setup
+ MAX_SETUP_TIME = 8000;
+ }
+
+ protected void startAllBrokers() throws Exception {
+ Collection brokerList = brokers.values();
+ for (Iterator i=brokerList.iterator(); i.hasNext();) {
+ BrokerService broker = ((BrokerItem)i.next()).broker;
+ broker.start();
+ }
+
+ Thread.sleep(MAX_SETUP_TIME);
+ }
+
+ protected BrokerService createBroker(String brokerName) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(brokerName);
+ brokers.put(brokerName, new BrokerItem(broker));
+
+ return broker;
+ }
+
+ protected BrokerService createBroker(URI brokerUri) throws Exception {
+ BrokerService broker = BrokerFactory.createBroker(brokerUri);
+ brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+
+ return broker;
+ }
+
+ protected BrokerService createBroker(Resource configFile) throws Exception {
+ BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
+ brokerFactory.afterPropertiesSet();
+
+ BrokerService broker = brokerFactory.getBroker();
+ brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+
+ return broker;
+ }
+
+ protected Connection createConnection(String brokerName) throws Exception {
+ BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+ if (brokerItem != null) {
+ return brokerItem.createConnection();
+ }
+ return null;
+ }
+
+ protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
+ BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+ if (brokerItem != null) {
+ return brokerItem.createConsumer(dest);
+ }
+ return null;
+ }
+
+ protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception {
+ BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+ if (brokerItem != null) {
+ return brokerItem.createDurableSubscriber(dest, name);
+ }
+ return null;
+ }
+
+ protected MessageIdList getBrokerMessages(String brokerName) {
+ BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+ if (brokerItem != null) {
+ return brokerItem.getAllMessages();
+ }
+ return null;
+ }
+
+ protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) {
+ BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+ if (brokerItem != null) {
+ return brokerItem.getConsumerMessages(consumer);
+ }
+ return null;
+ }
+
+ protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
+ BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+
+ Connection conn = brokerItem.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = brokerItem.createProducer(destination, sess);
+
+ for (int i = 0; i < count; i++) {
+ TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
+ producer.send(msg);
+ }
+
+ producer.close();
+ sess.close();
+ conn.close();
+ brokerItem.connections.remove(conn);
+ }
+
+ protected TextMessage createTextMessage(Session session, String initText) throws Exception {
+ TextMessage msg = session.createTextMessage();
+
+ // Pad message text
+ if (initText.length() < messageSize) {
+ char[] data = new char[messageSize - initText.length()];
+ Arrays.fill(data, '*');
+ String str = new String(data);
+ msg.setText(initText + str);
+
+ // Do not pad message text
+ } else {
+ msg.setText(initText);
+ }
+
+ return msg;
+ }
+
+ protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException {
+ Destination dest;
+ if (topic) {
+ dest = new ActiveMQTopic(name);
+ destinations.put(name, dest);
+ return (ActiveMQDestination)dest;
+ } else {
+ dest = new ActiveMQQueue(name);
+ destinations.put(name, dest);
+ return (ActiveMQDestination)dest;
+ }
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ brokers = new HashMap();
+ destinations = new HashMap();
+ }
+
+ protected void tearDown() throws Exception {
+ destroyAllBrokers();
+ super.tearDown();
+ }
+
+ protected void destroyBroker(String brokerName) throws Exception {
+ BrokerItem brokerItem = (BrokerItem)brokers.remove(brokerName);
+
+ if (brokerItem != null) {
+ brokerItem.destroy();
+ }
+ }
+
+ protected void destroyAllBrokers() throws Exception {
+ for (Iterator i=brokers.values().iterator(); i.hasNext();) {
+ BrokerItem brokerItem = (BrokerItem)i.next();
+ brokerItem.destroy();
+ }
+ brokers.clear();
+ }
+
+
+ // Class to group broker components together
+ protected class BrokerItem {
+ public BrokerService broker;
+ public ActiveMQConnectionFactory factory;
+ public List connections;
+ public Map consumers;
+ public MessageIdList allMessages = new MessageIdList();
+
+ private IdGenerator id;
+
+ public boolean persistent = false;
+
+ public BrokerItem(BrokerService broker) throws Exception {
+ this.broker = broker;
+
+ factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+ consumers = Collections.synchronizedMap(new HashMap());
+ connections = Collections.synchronizedList(new ArrayList());
+ allMessages.setVerbose(verbose);
+ id = new IdGenerator(broker.getBrokerName() + ":");
+ }
+
+ public Connection createConnection() throws Exception {
+ Connection conn = factory.createConnection();
+ conn.setClientID(id.generateId());
+
+ connections.add(conn);
+ return conn;
+ }
+
+ public MessageConsumer createConsumer(Destination dest) throws Exception {
+ Connection c = createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return createConsumer(dest, s);
+ }
+
+ public MessageConsumer createConsumer(Destination dest, Session sess) throws Exception {
+ MessageConsumer client = sess.createConsumer(dest);
+ MessageIdList messageIdList = new MessageIdList();
+ messageIdList.setParent(allMessages);
+ client.setMessageListener(messageIdList);
+ consumers.put(client, messageIdList);
+
+ return client;
+ }
+
+ public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception {
+ Connection c = createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return createDurableSubscriber(dest, s, name);
+ }
+
+ public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception {
+ MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name);
+ MessageIdList messageIdList = new MessageIdList();
+ messageIdList.setParent(allMessages);
+ client.setMessageListener(messageIdList);
+ consumers.put(client, messageIdList);
+
+ return client;
+ }
+
+ public MessageIdList getAllMessages() {
+ return allMessages;
+ }
+
+ public MessageIdList getConsumerMessages(MessageConsumer consumer) {
+ return (MessageIdList)consumers.get(consumer);
+ }
+
+ public MessageProducer createProducer(Destination dest) throws Exception {
+ Connection c = createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return createProducer(dest, s);
+ }
+
+ public MessageProducer createProducer(Destination dest, Session sess) throws Exception {
+ MessageProducer client = sess.createProducer(dest);
+ client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ return client;
+ }
+
+ public void destroy() throws Exception {
+ while (!connections.isEmpty()) {
+ Connection c = (Connection)connections.remove(0);
+ try {
+ c.close();
+ } catch (ConnectionClosedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ broker.stop();
+ consumers.clear();
+
+ broker = null;
+ connections = null;
+ consumers = null;
+ factory = null;
+ }
+ }
+
+}
Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,119 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import java.util.Map;
+import java.util.HashMap;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport {
+ public static final int BROKER_COUNT = 5; // number of brokers to network
+ public static final int CONSUMER_COUNT = 3; // consumers per broker
+ public static final int PRODUCER_COUNT = 3; // producers per broker
+ public static final int MESSAGE_COUNT = 10; // messages per producer
+
+ protected Map consumerMap;
+
+ public void testTopicAllConnected() throws Exception {
+ bridgeAllBrokers();
+ startAllBrokers();
+
+
+ // Setup topic destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=0; j<CONSUMER_COUNT; j++) {
+ consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+ }
+ }
+
+ // Send messages
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=0; j<PRODUCER_COUNT; j++) {
+ sendMessages("Broker" + i, dest, MESSAGE_COUNT);
+ }
+ }
+
+ // Get message count
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=0; j<CONSUMER_COUNT; j++) {
+ MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
+ msgs.waitForMessagesToArrive(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
+ assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, msgs.getMessageCount());
+ }
+ }
+ }
+
+ public void testQueueAllConnected() throws Exception {
+ bridgeAllBrokers();
+
+ startAllBrokers();
+
+ // Setup topic destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=0; j<CONSUMER_COUNT; j++) {
+ consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+ }
+ }
+
+ // Send messages
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=0; j<PRODUCER_COUNT; j++) {
+ sendMessages("Broker" + i, dest, MESSAGE_COUNT);
+ }
+ }
+
+ // Wait for messages to be delivered
+ Thread.sleep(2000);
+
+ // Get message count
+ int totalMsg = 0;
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=0; j<CONSUMER_COUNT; j++) {
+ MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
+ totalMsg += msgs.getMessageCount();
+ }
+ }
+
+ assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg);
+ }
+
+ public void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+
+ // Setup n brokers
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false"));
+ }
+
+ consumerMap = new HashMap();
+ }
+}
Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,81 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.transport.TransportFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClientsTest {
+ protected List bridges = new ArrayList();
+
+ protected void bridgeAllBrokers(String groupName) throws Exception {
+ for (int i=1; i<=BROKER_COUNT; i++) {
+ for (int j=1; j<=BROKER_COUNT; j++) {
+ if (i != j) {
+ bridgeBrokers("Broker" + i, "Broker" + j);
+ }
+ }
+ }
+
+ MAX_SETUP_TIME = 5000;
+ }
+
+ protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+ List remoteTransports = remoteBroker.getTransportConnectors();
+ List localTransports = localBroker.getTransportConnectors();
+
+ URI remoteURI, localURI;
+ if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
+ remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
+ localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
+
+ // Ensure that we are connecting using tcp
+ if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
+ DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ TransportFactory.connect(remoteURI));
+ bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
+ bridges.add(bridge);
+
+ bridge.start();
+ } else {
+ throw new Exception("Remote broker or local broker is not using tcp connectors");
+ }
+ } else {
+ throw new Exception("Remote broker or local broker has no registered connectors.");
+ }
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ // Assign a tcp connector to each broker
+ int j=0;
+ for (Iterator i=brokers.values().iterator(); i.hasNext();) {
+ ((BrokerItem)i.next()).broker.addConnector("tcp://localhost:" + (61616 + j++));
+ }
+ }
+}
Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,195 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.util.MessageIdList;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
+
+ /**
+ * BrokerA -> BrokerB -> BrokerC
+ */
+ public void test_AB_BC_BrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+
+ // Let's try to wait for any messages. Should be none.
+ Thread.sleep(1000);
+
+ // Get message count
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+ assertEquals(0, msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA <- BrokerB -> BrokerC
+ */
+ public void test_BA_BC_BrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerB", "BrokerA");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerB", dest, 10);
+
+ // Let's try to wait for any messages.
+ Thread.sleep(1000);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ // Total received should be 10
+ assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA -> BrokerB <- BrokerC
+ */
+ public void test_AB_CB_BrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerC", "BrokerB");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Get message count
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+
+ msgsB.waitForMessagesToArrive(20);
+
+ assertEquals(20, msgsB.getMessageCount());
+ }
+
+ /**
+ * BrokerA <-> BrokerB <-> BrokerC
+ */
+ public void testAllConnectedBrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerA");
+ bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("BrokerC", "BrokerB");
+ bridgeBrokers("BrokerA", "BrokerC");
+ bridgeBrokers("BrokerC", "BrokerA");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Let's try to wait for any messages.
+ Thread.sleep(1000);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA <-> BrokerB <-> BrokerC
+ */
+ public void testAllConnectedUsingMulticast() throws Exception {
+ // Setup broker networks
+ bridgeAllBrokers();
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Let's try to wait for any messages.
+ Thread.sleep(1000);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
+ }
+
+ public void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+ createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
+ createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+ createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
+ }
+}
Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,60 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.transport.TransportFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerQueueNetworkUsingTcpTest extends ThreeBrokerQueueNetworkTest {
+ protected List bridges = new ArrayList();
+
+ protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+ List remoteTransports = remoteBroker.getTransportConnectors();
+ List localTransports = localBroker.getTransportConnectors();
+
+ URI remoteURI, localURI;
+ if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
+ remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
+ localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
+
+ // Ensure that we are connecting using tcp
+ if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
+ DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ TransportFactory.connect(remoteURI));
+ bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
+ bridges.add(bridge);
+
+ bridge.start();
+ } else {
+ throw new Exception("Remote broker or local broker is not using tcp connectors");
+ }
+ } else {
+ throw new Exception("Remote broker or local broker has no registered connectors.");
+ }
+
+ MAX_SETUP_TIME = 2000;
+ }
+}
Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,226 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Destination;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
+
+ /**
+ * BrokerA -> BrokerB -> BrokerC
+ */
+ public void test_AB_BC_BrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(10);
+ msgsB.waitForMessagesToArrive(20);
+ msgsC.waitForMessagesToArrive(20);
+
+ assertEquals(10, msgsA.getMessageCount());
+ assertEquals(20, msgsB.getMessageCount());
+ assertEquals(20, msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA <- BrokerB -> BrokerC
+ */
+ public void test_BA_BC_BrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerB", "BrokerA");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(20);
+ msgsB.waitForMessagesToArrive(10);
+ msgsC.waitForMessagesToArrive(20);
+
+ assertEquals(20, msgsA.getMessageCount());
+ assertEquals(10, msgsB.getMessageCount());
+ assertEquals(20, msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA -> BrokerB <- BrokerC
+ */
+ public void test_AB_CB_BrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerC", "BrokerB");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(10);
+ msgsB.waitForMessagesToArrive(30);
+ msgsC.waitForMessagesToArrive(10);
+
+ assertEquals(10, msgsA.getMessageCount());
+ assertEquals(30, msgsB.getMessageCount());
+ assertEquals(10, msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA <-> BrokerB <-> BrokerC
+ */
+ public void testAllConnectedBrokerNetwork() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerA");
+ bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("BrokerC", "BrokerB");
+ bridgeBrokers("BrokerA", "BrokerC");
+ bridgeBrokers("BrokerC", "BrokerA");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(30);
+ msgsB.waitForMessagesToArrive(30);
+ msgsC.waitForMessagesToArrive(30);
+
+ assertEquals(30, msgsA.getMessageCount());
+ assertEquals(30, msgsB.getMessageCount());
+ assertEquals(30, msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA <-> BrokerB <-> BrokerC
+ */
+ public void testAllConnectedUsingMulticast() throws Exception {
+ // Setup broker networks
+ bridgeAllBrokers();
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 10);
+ sendMessages("BrokerB", dest, 10);
+ sendMessages("BrokerC", dest, 10);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(30);
+ msgsB.waitForMessagesToArrive(30);
+ msgsC.waitForMessagesToArrive(30);
+
+ assertEquals(30, msgsA.getMessageCount());
+ assertEquals(30, msgsB.getMessageCount());
+ assertEquals(30, msgsC.getMessageCount());
+ }
+
+ public void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+ createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
+ createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+ createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
+ }
+}
Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java?rev=367253&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java (added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java Mon Jan 9 01:39:12 2006
@@ -0,0 +1,60 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.transport.TransportFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ThreeBrokerTopicNetworkUsingTcpTest extends ThreeBrokerTopicNetworkTest {
+ protected List bridges = new ArrayList();
+
+ protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+ List remoteTransports = remoteBroker.getTransportConnectors();
+ List localTransports = localBroker.getTransportConnectors();
+
+ URI remoteURI, localURI;
+ if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
+ remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
+ localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
+
+ // Ensure that we are connecting using tcp
+ if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
+ DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+ TransportFactory.connect(remoteURI));
+ bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
+ bridges.add(bridge);
+
+ bridge.start();
+ } else {
+ throw new Exception("Remote broker or local broker is not using tcp connectors");
+ }
+ } else {
+ throw new Exception("Remote broker or local broker has no registered connectors.");
+ }
+
+ MAX_SETUP_TIME = 2000;
+ }
+}