You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/05 17:43:18 UTC

svn commit: r1442657 - /activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java

Author: tabish
Date: Tue Feb  5 16:43:18 2013
New Revision: 1442657

URL: http://svn.apache.org/viewvc?rev=1442657&view=rev
Log:
Thin out the amount of synchronized code used in the test to make things simpler. 

Modified:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java?rev=1442657&r1=1442656&r2=1442657&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java Tue Feb  5 16:43:18 2013
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
@@ -36,6 +36,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +44,11 @@ import org.slf4j.LoggerFactory;
 public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class);
 
-    private TestMutex testMutex;
-    protected Connection connection;
-    protected ConnectionConsumer connectionConsumer;
-    protected Queue queue;
-    protected CountDownLatch messageTwoDelay = new CountDownLatch(1);
+    private Connection connection;
+    private ConnectionConsumer connectionConsumer;
+    private Queue queue;
+    private final AtomicBoolean completed = new AtomicBoolean();
+    private final AtomicBoolean success = new AtomicBoolean();
 
     public void testPrefetchExtension() throws Exception {
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -60,23 +61,21 @@ public class OnePrefetchAsyncConsumerTes
         producer.send(session.createTextMessage("Msg2"));
 
         // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from
-        // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription
+        // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the
+        // PrefetchSubscription
         producer.send(session.createTextMessage("Msg3"));
 
         session.commit();
 
-        // wait for test to complete and the test result to get set
-        // this happens asynchronously since the messages are delivered asynchronously
-        long done = System.currentTimeMillis() + getMaxTestTime();
-        synchronized (testMutex) {
-           while (!testMutex.testCompleted && System.currentTimeMillis() < done) {
-              testMutex.wait(TimeUnit.SECONDS.toMillis(10));
-           }
-        }
+        assertTrue("test completed on time", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return completed.get();
+            }
+        }));
 
-         assertTrue("completed on time", testMutex.testCompleted);
-        //test completed, result is ready
-        assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful);
+        assertTrue("Attempted to retrieve more than one ServerSession at a time", success.get());
     }
 
     @Override
@@ -90,12 +89,10 @@ public class OnePrefetchAsyncConsumerTes
         bindAddress = "tcp://localhost:0";
         super.setUp();
 
-        testMutex = new TestMutex();
         connection = createConnection();
         queue = createQueue();
         // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription
-        connectionConsumer = connection.createConnectionConsumer(
-           queue, null, new TestServerSessionPool(connection), 1);
+        connectionConsumer = connection.createConnectionConsumer(queue, null, new TestServerSessionPool(connection), 1);
         connection.start();
     }
 
@@ -124,108 +121,87 @@ public class OnePrefetchAsyncConsumerTes
 
     // simulates a ServerSessionPool with only 1 ServerSession
     private class TestServerSessionPool implements ServerSessionPool {
-         Connection connection;
-         TestServerSession serverSession;
-         boolean serverSessionInUse = false;
-
-         public TestServerSessionPool(Connection connection) throws JMSException {
-             this.connection = connection;
-             serverSession = new TestServerSession(this);
-         }
+        Connection connection;
+        TestServerSession serverSession;
+        boolean serverSessionInUse = false;
+
+        public TestServerSessionPool(Connection connection) throws JMSException {
+            this.connection = connection;
+            this.serverSession = new TestServerSession(this);
+        }
 
-         @Override
+        @Override
         public ServerSession getServerSession() throws JMSException {
-             synchronized (this) {
-                 if (serverSessionInUse) {
-                     LOG.info("asked for session while in use, not serialised delivery");
-                     synchronized (testMutex) {
-                        testMutex.testSuccessful = false;
-                        testMutex.testCompleted = true;
-                     }
-                 }
-                 serverSessionInUse = true;
-                 return serverSession;
-             }
-         }
+            synchronized (this) {
+                if (serverSessionInUse) {
+                    LOG.info("asked for session while in use, not serialised delivery");
+                    success.set(false);
+                    completed.set(true);
+                }
+                serverSessionInUse = true;
+                return serverSession;
+            }
+        }
     }
 
     private class TestServerSession implements ServerSession {
-         TestServerSessionPool pool;
-         Session session;
+        TestServerSessionPool pool;
+        Session session;
 
-         public TestServerSession(TestServerSessionPool pool) throws JMSException {
-             this.pool = pool;
-             session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-             session.setMessageListener(new TestMessageListener());
-         }
+        public TestServerSession(TestServerSessionPool pool) throws JMSException {
+            this.pool = pool;
+            session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            session.setMessageListener(new TestMessageListener());
+        }
 
-         @Override
+        @Override
         public Session getSession() throws JMSException {
-             return session;
-         }
+            return session;
+        }
 
-         @Override
+        @Override
         public void start() throws JMSException {
-             // use a separate thread to process the message asynchronously
-             new Thread() {
-                 @Override
+            // use a separate thread to process the message asynchronously
+            new Thread() {
+                @Override
                 public void run() {
-                     // let the session deliver the message
-                     session.run();
+                    // let the session deliver the message
+                    session.run();
 
-                     // commit the tx and
-                     // return ServerSession to pool
-                     synchronized (pool) {
+                    // commit the tx and return ServerSession to pool
+                    synchronized (pool) {
                         try {
                             session.commit();
-                        }
-                        catch (JMSException e) {
+                        } catch (JMSException e) {
                         }
                         pool.serverSessionInUse = false;
-                     }
-
-                     // let the test check if the test was completed
-                     synchronized (testMutex) {
-                         testMutex.notifyAll();
-                     }
-                 }
-              }.start();
-         }
+                    }
+                }
+            }.start();
+        }
     }
 
     private class TestMessageListener implements MessageListener {
         @Override
         public void onMessage(Message message) {
             try {
-               String text = ((TextMessage)message).getText();
-               LOG.info("got message: " + text);
-               if (text.equals("Msg3")) {
-                  // if we get here, Exception in getServerSession() was not thrown, test is successful
-                  // this obviously doesn't happen now,
-                  // need to fix prefetchExtension computation logic in PrefetchSubscription to get here
-                  synchronized (testMutex) {
-                      if (!testMutex.testCompleted) {
-                          testMutex.testSuccessful = true;
-                          testMutex.testCompleted = true;
-                          testMutex.notifyAll();
-                      }
-                  }
-               }
-               else if (text.equals("Msg2")) {
-                  // simulate long message processing so that Msg3 comes when Msg2 is still being processed
-                  // and thus the single ServerSession is in use
-                  TimeUnit.SECONDS.sleep(4);
-               }
-            }
-            catch (JMSException e) {
-            }
-            catch (InterruptedException e) {
+                String text = ((TextMessage) message).getText();
+                LOG.info("got message: " + text);
+                if (text.equals("Msg3")) {
+                    // if we get here, Exception in getServerSession() was not thrown, test is
+                    // successful this obviously doesn't happen now, need to fix prefetchExtension
+                    // computation logic in PrefetchSubscription to get here
+                    success.set(true);
+                    completed.set(true);
+                } else if (text.equals("Msg2")) {
+                    // simulate long message processing so that Msg3 comes when Msg2 is still being
+                    // processed and thus the single ServerSession is in use
+                    TimeUnit.SECONDS.sleep(4);
+                }
+            } catch (JMSException e) {
+            } catch (InterruptedException e) {
             }
         }
     }
 
-    private class TestMutex {
-        boolean testCompleted = false;
-        boolean testSuccessful = true;
-    }
 }