You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/16 05:47:35 UTC
svn commit: r386258 -
/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
Author: chirino
Date: Wed Mar 15 20:47:34 2006
New Revision: 386258
URL: http://svn.apache.org/viewcvs?rev=386258&view=rev
Log:
Made the peer test a little more reliable by using consumer advisory messages to know when the peers have fully connected in the cluster.
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java?rev=386258&r1=386257&r2=386258&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java Wed Mar 15 20:47:34 2006
@@ -17,9 +17,13 @@
package org.apache.activemq.transport.peer;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,6 +32,7 @@
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -40,7 +45,7 @@
*/
public class PeerTransportTest extends TestCase {
protected Log log = LogFactory.getLog(getClass());
- protected Destination destination;
+ protected ActiveMQDestination destination;
protected boolean topic = true;
protected static int MESSAGE_COUNT = 50;
protected static int NUMBER_IN_CLUSTER = 3;
@@ -54,10 +59,11 @@
connections = new Connection[NUMBER_IN_CLUSTER];
producers = new MessageProducer[NUMBER_IN_CLUSTER];
messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
- Destination destination = createDestination();
+ ActiveMQDestination destination = createDestination();
String root = System.getProperty("activemq.store.dir");
+
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
connections[i] = createConnection(i);
connections[i].setClientID("ClusterTest" + i);
@@ -70,9 +76,30 @@
messageIdList[i] = new MessageIdList();
consumer.setMessageListener(messageIdList[i]);
}
- System.out.println("Sleeping to ensure cluster is fully connected");
- Thread.sleep(10000);
- System.out.println("Finished sleeping");
+
+ System.out.println("Waiting for cluster to be fully connected");
+
+ // Each connection should see that NUMBER_IN_CLUSTER consumers get registered on the destination.
+ ActiveMQDestination advisoryDest = AdvisorySupport.getConsumerAdvisoryTopic(destination);
+ for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
+ Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = createMessageConsumer(session, advisoryDest);
+
+ int j=0;
+ while(j < NUMBER_IN_CLUSTER) {
+ ActiveMQMessage message = (ActiveMQMessage) consumer.receive(1000);
+ if( message == null ) {
+ fail("Connection "+i+" saw "+j+" consumers, expected: "+NUMBER_IN_CLUSTER);
+ }
+ if( message.getDataStructure()!=null && message.getDataStructure().getDataStructureType()==ConsumerInfo.DATA_STRUCTURE_TYPE ) {
+ j++;
+ }
+ }
+
+ session.close();
+ }
+
+ System.out.println("Cluster is online.");
}
protected void tearDown() throws Exception {
@@ -93,11 +120,11 @@
return fac.createConnection();
}
- protected Destination createDestination() {
+ protected ActiveMQDestination createDestination() {
return createDestination(getClass().getName());
}
- protected Destination createDestination(String name) {
+ protected ActiveMQDestination createDestination(String name) {
if (topic) {
return new ActiveMQTopic(name);
}