You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/16 05:47:35 UTC

svn commit: r386258 - /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java

Author: chirino
Date: Wed Mar 15 20:47:34 2006
New Revision: 386258

URL: http://svn.apache.org/viewcvs?rev=386258&view=rev
Log:
Made the peer test a little more reliable by using consumer advisory messages to know when the peers have fully connected in the cluster.

Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java?rev=386258&r1=386257&r2=386258&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java Wed Mar 15 20:47:34 2006
@@ -17,9 +17,13 @@
 
 package org.apache.activemq.transport.peer;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +32,7 @@
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -40,7 +45,7 @@
  */
 public class PeerTransportTest extends TestCase {
     protected Log log = LogFactory.getLog(getClass());
-    protected Destination destination;
+    protected ActiveMQDestination destination;
     protected boolean topic = true;
     protected static int MESSAGE_COUNT = 50;
     protected static int NUMBER_IN_CLUSTER = 3;
@@ -54,10 +59,11 @@
         connections = new Connection[NUMBER_IN_CLUSTER];
         producers = new MessageProducer[NUMBER_IN_CLUSTER];
         messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
-        Destination destination = createDestination();
+        ActiveMQDestination destination = createDestination();
 
         String root = System.getProperty("activemq.store.dir");
 
+        
         for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
             connections[i] = createConnection(i);
             connections[i].setClientID("ClusterTest" + i);
@@ -70,9 +76,30 @@
             messageIdList[i] = new MessageIdList();
             consumer.setMessageListener(messageIdList[i]);
         }
-        System.out.println("Sleeping to ensure cluster is fully connected");
-        Thread.sleep(10000);
-        System.out.println("Finished sleeping");
+        
+        System.out.println("Waiting for cluster to be fully connected");
+        
+        // Each connection should see that NUMBER_IN_CLUSTER consumers get registered on the destination.
+        ActiveMQDestination advisoryDest = AdvisorySupport.getConsumerAdvisoryTopic(destination);
+        for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
+            Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = createMessageConsumer(session, advisoryDest);
+            
+            int j=0;
+            while(j < NUMBER_IN_CLUSTER) {
+                ActiveMQMessage message = (ActiveMQMessage) consumer.receive(1000);
+                if( message == null ) {
+                    fail("Connection "+i+" saw "+j+" consumers, expected: "+NUMBER_IN_CLUSTER);
+                }
+                if( message.getDataStructure()!=null && message.getDataStructure().getDataStructureType()==ConsumerInfo.DATA_STRUCTURE_TYPE ) {
+                    j++;
+                }
+            }
+            
+            session.close();
+        }
+        
+        System.out.println("Cluster is online.");
     }
 
     protected void tearDown() throws Exception {
@@ -93,11 +120,11 @@
         return fac.createConnection();
     }
 
-    protected Destination createDestination() {
+    protected ActiveMQDestination createDestination() {
         return createDestination(getClass().getName());
     }
 
-    protected Destination createDestination(String name) {
+    protected ActiveMQDestination createDestination(String name) {
         if (topic) {
             return new ActiveMQTopic(name);
         }