You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/12/31 18:33:58 UTC

svn commit: r491455 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq: JmsMultipleBrokersTestSupport.java usecases/MultiBrokersMultiClientsTest.java util/MessageIdList.java

Author: chirino
Date: Sun Dec 31 09:33:56 2006
New Revision: 491455

URL: http://svn.apache.org/viewvc?view=rev&rev=491455
Log:
Wait the perfect amount of time by using count down latches.

Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?view=diff&rev=491455&r1=491454&r2=491455
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Sun Dec 31 09:33:56 2006
@@ -43,6 +43,7 @@
 import java.util.Collections;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 import java.net.URI;
 
 /**
@@ -178,10 +179,14 @@
         return null;
     }
 
-    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
+    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {    	
+    	return createConsumer(brokerName, dest, null);
+    }
+    
+    protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
         if (brokerItem != null) {
-            return brokerItem.createConsumer(dest);
+            return brokerItem.createConsumer(dest, latch);
         }
         return null;
     }
@@ -321,19 +326,26 @@
         }
 
         public MessageConsumer createConsumer(Destination dest) throws Exception {
+        	return createConsumer(dest, null);
+        }
+        
+        public MessageConsumer createConsumer(Destination dest, CountDownLatch latch) throws Exception {
             Connection c = createConnection();
             c.start();
             Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            return createConsumer(dest, s);
+            return createConsumerWithSession(dest, s, latch);
         }
 
-        public MessageConsumer createConsumer(Destination dest, Session sess) throws Exception {
+        public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
+        	return createConsumerWithSession(dest, sess, null);
+        }
+        public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch) throws Exception {
             MessageConsumer client = sess.createConsumer(dest);
             MessageIdList messageIdList = new MessageIdList();
+            messageIdList.setCountDownLatch(latch);
             messageIdList.setParent(allMessages);
             client.setMessageListener(messageIdList);
             consumers.put(client, messageIdList);
-
             return client;
         }
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java?view=diff&rev=491455&r1=491454&r2=491455
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java Sun Dec 31 09:33:56 2006
@@ -17,14 +17,17 @@
  */
 package org.apache.activemq.usecases;
 
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
-import java.util.Map;
-import java.util.HashMap;
-import java.net.URI;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.util.MessageIdList;
 
 /**
  * @version $Revision: 1.1.1.1 $
@@ -45,14 +48,20 @@
         // Setup topic destination
         Destination dest = createDestination("TEST.FOO", true);
 
+        CountDownLatch latch = new CountDownLatch(
+        		BROKER_COUNT * PRODUCER_COUNT * 
+        		BROKER_COUNT * CONSUMER_COUNT *
+        		MESSAGE_COUNT);
+        
         // Setup consumers
         for (int i=1; i<=BROKER_COUNT; i++) {
             for (int j=0; j<CONSUMER_COUNT; j++) {
-                consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+                consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest, latch));
             }
         }
+
         //wait for consumers to get propagated
-        Thread.sleep(2000);
+        Thread.sleep(5000);
 
         // Send messages
         for (int i=1; i<=BROKER_COUNT; i++) {
@@ -60,32 +69,37 @@
                 sendMessages("Broker" + i, dest, MESSAGE_COUNT);
             }
         }
+        
+        assertTrue("Missing "+latch.getCount()+ " messages", latch.await(30, TimeUnit.SECONDS));
 
         // Get message count
         for (int i=1; i<=BROKER_COUNT; i++) {
             for (int j=0; j<CONSUMER_COUNT; j++) {
                 MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
-                msgs.waitForMessagesToArrive(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
                 assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, msgs.getMessageCount());
             }
         }
+        
     }
 
     public void testQueueAllConnected() throws Exception {
         bridgeAllBrokers();
-
         startAllBrokers();
 
         // Setup topic destination
         Destination dest = createDestination("TEST.FOO", false);
 
+        CountDownLatch latch = new CountDownLatch(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
+        
         // Setup consumers
         for (int i=1; i<=BROKER_COUNT; i++) {
             for (int j=0; j<CONSUMER_COUNT; j++) {
-                consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+                consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest, latch));
             }
         }
-        Thread.sleep(2000);
+        
+        //wait for consumers to get propagated
+        Thread.sleep(5000);
 
         // Send messages
         for (int i=1; i<=BROKER_COUNT; i++) {
@@ -95,8 +109,8 @@
         }
 
         // Wait for messages to be delivered
-        Thread.sleep(2000);
-
+        assertTrue("Missing "+latch.getCount()+ " messages", latch.await(30, TimeUnit.SECONDS));
+        
         // Get message count
         int totalMsg = 0;
         for (int i=1; i<=BROKER_COUNT; i++) {
@@ -105,7 +119,6 @@
                 totalMsg += msgs.getMessageCount();
             }
         }
-
         assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg);
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java?view=diff&rev=491455&r1=491454&r2=491455
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java Sun Dec 31 09:33:56 2006
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -51,6 +52,8 @@
     private MessageListener parent;
     private long maximumDuration = 15000L;
 
+	private CountDownLatch countDownLatch;
+
     public MessageIdList() {
         this(new Object());
     }
@@ -99,6 +102,9 @@
     public void onMessage(Message message) {
         String id=null;
         try {
+        	if( countDownLatch != null )
+        		countDownLatch.countDown();
+        	
             id = message.getJMSMessageID();
             synchronized (semaphore) {
                 messageIds.add(id);
@@ -230,5 +236,9 @@
     public void setMaximumDuration(long maximumDuration){
         this.maximumDuration=maximumDuration;
     }
+
+	public void setCountDownLatch(CountDownLatch countDownLatch) {
+		this.countDownLatch = countDownLatch;
+	}
 
 }