You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/05 17:43:18 UTC
svn commit: r1442657 -
/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
Author: tabish
Date: Tue Feb 5 16:43:18 2013
New Revision: 1442657
URL: http://svn.apache.org/viewvc?rev=1442657&view=rev
Log:
Thin out the amount of synchronized code used in the test to make things simpler.
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java?rev=1442657&r1=1442656&r2=1442657&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java Tue Feb 5 16:43:18 2013
@@ -16,8 +16,8 @@
*/
package org.apache.activemq;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
@@ -36,6 +36,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +44,11 @@ import org.slf4j.LoggerFactory;
public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class);
- private TestMutex testMutex;
- protected Connection connection;
- protected ConnectionConsumer connectionConsumer;
- protected Queue queue;
- protected CountDownLatch messageTwoDelay = new CountDownLatch(1);
+ private Connection connection;
+ private ConnectionConsumer connectionConsumer;
+ private Queue queue;
+ private final AtomicBoolean completed = new AtomicBoolean();
+ private final AtomicBoolean success = new AtomicBoolean();
public void testPrefetchExtension() throws Exception {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -60,23 +61,21 @@ public class OnePrefetchAsyncConsumerTes
producer.send(session.createTextMessage("Msg2"));
// Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from
- // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription
+ // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the
+ // PrefetchSubscription
producer.send(session.createTextMessage("Msg3"));
session.commit();
- // wait for test to complete and the test result to get set
- // this happens asynchronously since the messages are delivered asynchronously
- long done = System.currentTimeMillis() + getMaxTestTime();
- synchronized (testMutex) {
- while (!testMutex.testCompleted && System.currentTimeMillis() < done) {
- testMutex.wait(TimeUnit.SECONDS.toMillis(10));
- }
- }
+ assertTrue("test completed on time", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return completed.get();
+ }
+ }));
- assertTrue("completed on time", testMutex.testCompleted);
- //test completed, result is ready
- assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful);
+ assertTrue("Attempted to retrieve more than one ServerSession at a time", success.get());
}
@Override
@@ -90,12 +89,10 @@ public class OnePrefetchAsyncConsumerTes
bindAddress = "tcp://localhost:0";
super.setUp();
- testMutex = new TestMutex();
connection = createConnection();
queue = createQueue();
// note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription
- connectionConsumer = connection.createConnectionConsumer(
- queue, null, new TestServerSessionPool(connection), 1);
+ connectionConsumer = connection.createConnectionConsumer(queue, null, new TestServerSessionPool(connection), 1);
connection.start();
}
@@ -124,108 +121,87 @@ public class OnePrefetchAsyncConsumerTes
// simulates a ServerSessionPool with only 1 ServerSession
private class TestServerSessionPool implements ServerSessionPool {
- Connection connection;
- TestServerSession serverSession;
- boolean serverSessionInUse = false;
-
- public TestServerSessionPool(Connection connection) throws JMSException {
- this.connection = connection;
- serverSession = new TestServerSession(this);
- }
+ Connection connection;
+ TestServerSession serverSession;
+ boolean serverSessionInUse = false;
+
+ public TestServerSessionPool(Connection connection) throws JMSException {
+ this.connection = connection;
+ this.serverSession = new TestServerSession(this);
+ }
- @Override
+ @Override
public ServerSession getServerSession() throws JMSException {
- synchronized (this) {
- if (serverSessionInUse) {
- LOG.info("asked for session while in use, not serialised delivery");
- synchronized (testMutex) {
- testMutex.testSuccessful = false;
- testMutex.testCompleted = true;
- }
- }
- serverSessionInUse = true;
- return serverSession;
- }
- }
+ synchronized (this) {
+ if (serverSessionInUse) {
+ LOG.info("asked for session while in use, not serialised delivery");
+ success.set(false);
+ completed.set(true);
+ }
+ serverSessionInUse = true;
+ return serverSession;
+ }
+ }
}
private class TestServerSession implements ServerSession {
- TestServerSessionPool pool;
- Session session;
+ TestServerSessionPool pool;
+ Session session;
- public TestServerSession(TestServerSessionPool pool) throws JMSException {
- this.pool = pool;
- session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- session.setMessageListener(new TestMessageListener());
- }
+ public TestServerSession(TestServerSessionPool pool) throws JMSException {
+ this.pool = pool;
+ session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ session.setMessageListener(new TestMessageListener());
+ }
- @Override
+ @Override
public Session getSession() throws JMSException {
- return session;
- }
+ return session;
+ }
- @Override
+ @Override
public void start() throws JMSException {
- // use a separate thread to process the message asynchronously
- new Thread() {
- @Override
+ // use a separate thread to process the message asynchronously
+ new Thread() {
+ @Override
public void run() {
- // let the session deliver the message
- session.run();
+ // let the session deliver the message
+ session.run();
- // commit the tx and
- // return ServerSession to pool
- synchronized (pool) {
+ // commit the tx and return ServerSession to pool
+ synchronized (pool) {
try {
session.commit();
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
}
pool.serverSessionInUse = false;
- }
-
- // let the test check if the test was completed
- synchronized (testMutex) {
- testMutex.notifyAll();
- }
- }
- }.start();
- }
+ }
+ }
+ }.start();
+ }
}
private class TestMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
- String text = ((TextMessage)message).getText();
- LOG.info("got message: " + text);
- if (text.equals("Msg3")) {
- // if we get here, Exception in getServerSession() was not thrown, test is successful
- // this obviously doesn't happen now,
- // need to fix prefetchExtension computation logic in PrefetchSubscription to get here
- synchronized (testMutex) {
- if (!testMutex.testCompleted) {
- testMutex.testSuccessful = true;
- testMutex.testCompleted = true;
- testMutex.notifyAll();
- }
- }
- }
- else if (text.equals("Msg2")) {
- // simulate long message processing so that Msg3 comes when Msg2 is still being processed
- // and thus the single ServerSession is in use
- TimeUnit.SECONDS.sleep(4);
- }
- }
- catch (JMSException e) {
- }
- catch (InterruptedException e) {
+ String text = ((TextMessage) message).getText();
+ LOG.info("got message: " + text);
+ if (text.equals("Msg3")) {
+ // if we get here, Exception in getServerSession() was not thrown, test is
+ // successful this obviously doesn't happen now, need to fix prefetchExtension
+ // computation logic in PrefetchSubscription to get here
+ success.set(true);
+ completed.set(true);
+ } else if (text.equals("Msg2")) {
+ // simulate long message processing so that Msg3 comes when Msg2 is still being
+ // processed and thus the single ServerSession is in use
+ TimeUnit.SECONDS.sleep(4);
+ }
+ } catch (JMSException e) {
+ } catch (InterruptedException e) {
}
}
}
- private class TestMutex {
- boolean testCompleted = false;
- boolean testSuccessful = true;
- }
}