You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/29 21:45:21 UTC

svn commit: r1177411 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java

Author: tabish
Date: Thu Sep 29 19:45:21 2011
New Revision: 1177411

URL: http://svn.apache.org/viewvc?rev=1177411&view=rev
Log:
Use a Wait condition instead of sleeps to detect completion.

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java?rev=1177411&r1=1177410&r2=1177411&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java Thu Sep 29 19:45:21 2011
@@ -37,12 +37,10 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.mock.MockTransport;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * 
- */
 public class ReconnectTest extends TestCase {
 
     public static final int MESSAGES_PER_ITTERATION = 10;
@@ -67,10 +65,10 @@ public class ReconnectTest extends TestC
         private final String name;
 
         public Worker(final String name) throws URISyntaxException, JMSException {
-            this.name=name;
+            this.name = name;
             URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false");
             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
-            connection = (ActiveMQConnection)factory.createConnection();
+            connection = (ActiveMQConnection) factory.createConnection();
             connection.addTransportListener(new TransportListener() {
                 public void onCommand(Object command) {
                 }
@@ -80,12 +78,12 @@ public class ReconnectTest extends TestC
                 }
 
                 public void transportInterupted() {
-                    LOG.info("Worker "+name+" was interrupted...");
+                    LOG.info("Worker " + name + " was interrupted...");
                     interruptedCount.incrementAndGet();
                 }
 
                 public void transportResumed() {
-                    LOG.info("Worker "+name+" was resummed...");
+                    LOG.info("Worker " + name + " was resummed...");
                     resumedCount.incrementAndGet();
                 }
             });
@@ -117,7 +115,7 @@ public class ReconnectTest extends TestC
 
         public void run() {
             try {
-                ActiveMQQueue queue = new ActiveMQQueue("FOO_"+name);
+                ActiveMQQueue queue = new ActiveMQQueue("FOO_" + name);
                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 MessageConsumer consumer = session.createConsumer(queue);
                 MessageProducer producer = session.createProducer(queue);
@@ -150,31 +148,29 @@ public class ReconnectTest extends TestC
         public synchronized void assertNoErrors() {
             if (error != null) {
                 error.printStackTrace();
-                fail("Worker "+name+" got Exception: " + error);
+                fail("Worker " + name + " got Exception: " + error);
             }
         }
-
     }
 
     public void testReconnects() throws Exception {
 
         for (int k = 1; k < 10; k++) {
-
             LOG.info("Test run: " + k);
 
             // Wait for at least one iteration to occur...
             for (int i = 0; i < WORKER_COUNT; i++) {
-               int c=0;
+                int c = 0;
                 for (int j = 0; j < 30; j++) {
-                       c = workers[i].iterations.getAndSet(0);
-                       if( c != 0 ) {
-                               break;
-                       }
+                    c = workers[i].iterations.getAndSet(0);
+                    if (c != 0) {
+                        break;
+                    }
                     workers[i].assertNoErrors();
-                    LOG.info("Test run "+k+": Waiting for worker " + i + " to finish an iteration.");
+                    LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration.");
                     Thread.sleep(1000);
                 }
-                assertTrue("Test run "+k+": Worker " + i + " never completed an interation.", c != 0);
+                assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
                 workers[i].assertNoErrors();
             }
 
@@ -185,26 +181,20 @@ public class ReconnectTest extends TestC
                 workers[i].failConnection();
             }
 
-            long start;
-            // Wait for the connections to get interrupted...
-            start = System.currentTimeMillis();
-            while (interruptedCount.get() < WORKER_COUNT) {
-               if( System.currentTimeMillis()-start > 1000*60 ) {
-                      fail("Timed out waiting for all connections to be interrupted.");
-               }
-                LOG.info("Test run "+k+": Waiting for connections to get interrupted.. at: " + interruptedCount.get());
-                Thread.sleep(1000);
-            }
+            assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition(){
+                public boolean isSatisified() throws Exception {
+                    LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get());
+                    return interruptedCount.get() == WORKER_COUNT;
+                }
+            }, TimeUnit.SECONDS.toMillis(60)));
 
             // Wait for the connections to re-establish...
-            start = System.currentTimeMillis();
-            while (resumedCount.get() < WORKER_COUNT) {
-               if( System.currentTimeMillis()-start > 1000*60 ) {
-                       fail("Timed out waiting for all connections to be resumed.");
-               }
-                LOG.info("Test run "+k+": Waiting for connections to get resumed.. at: " + resumedCount.get());
-                Thread.sleep(1000);
-            }
+            assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition(){
+                public boolean isSatisified() throws Exception {
+                    LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get());
+                    return resumedCount.get() >= WORKER_COUNT;
+                }
+            }, TimeUnit.SECONDS.toMillis(60)));
 
             // Reset the counters..
             interruptedCount.set(0);
@@ -212,11 +202,9 @@ public class ReconnectTest extends TestC
             for (int i = 0; i < WORKER_COUNT; i++) {
                 workers[i].iterations.set(0);
             }
-            
-            Thread.sleep(1000);
 
+            Thread.sleep(1000);
         }
-
     }
 
     @Override
@@ -227,13 +215,11 @@ public class ReconnectTest extends TestC
         TransportConnector connector = bs.addConnector("tcp://localhost:0");
         bs.start();
         tcpUri = connector.getConnectUri();
-
         workers = new Worker[WORKER_COUNT];
         for (int i = 0; i < WORKER_COUNT; i++) {
-            workers[i] = new Worker(""+i);
+            workers[i] = new Worker("" + i);
             workers[i].start();
         }
-
     }
 
     @Override
@@ -243,5 +229,4 @@ public class ReconnectTest extends TestC
         }
         new ServiceStopper().stop(bs);
     }
-
 }