You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/27 15:15:50 UTC
svn commit: r1151454 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Author: tabish
Date: Wed Jul 27 13:15:50 2011
New Revision: 1151454
URL: http://svn.apache.org/viewvc?rev=1151454&view=rev
Log:
Update the assertion messages to reflect the actual failure condition.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1151454&r1=1151453&r2=1151454&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Wed Jul 27 13:15:50 2011
@@ -46,15 +46,15 @@ public class ExpiredMessagesWithNoConsum
private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
-
- BrokerService broker;
- Connection connection;
- Session session;
- MessageProducer producer;
- public ActiveMQDestination destination = new ActiveMQQueue("test");
+
+ BrokerService broker;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+ public ActiveMQDestination destination = new ActiveMQQueue("test");
public boolean optimizedDispatch = true;
public PendingQueueMessageStoragePolicy pendingQueuePolicy;
-
+
public static Test suite() {
return suite(ExpiredMessagesWithNoConsumerTest.class);
}
@@ -62,15 +62,15 @@ public class ExpiredMessagesWithNoConsum
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
-
+
protected void createBrokerWithMemoryLimit() throws Exception {
doCreateBroker(true);
}
-
+
protected void createBroker() throws Exception {
doCreateBroker(false);
}
-
+
private void doCreateBroker(boolean memoryLimit) throws Exception {
broker = new BrokerService();
broker.setBrokerName("localhost");
@@ -83,7 +83,7 @@ public class ExpiredMessagesWithNoConsum
defaultEntry.setOptimizedDispatch(optimizedDispatch );
defaultEntry.setExpireMessagesPeriod(800);
defaultEntry.setMaxExpirePageSize(800);
-
+
defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
if (memoryLimit) {
@@ -99,51 +99,51 @@ public class ExpiredMessagesWithNoConsum
broker.waitUntilStarted();
}
-
+
public void initCombosForTestExpiredMessagesWithNoConsumer() {
addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE});
addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()});
}
-
- public void testExpiredMessagesWithNoConsumer() throws Exception {
-
- createBrokerWithMemoryLimit();
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- connection = factory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(destination);
- producer.setTimeToLive(1000);
- connection.start();
- final long sendCount = 2000;
-
- final Thread producingThread = new Thread("Producing Thread") {
+
+ public void testExpiredMessagesWithNoConsumer() throws Exception {
+
+ createBrokerWithMemoryLimit();
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connection = factory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setTimeToLive(1000);
+ connection.start();
+ final long sendCount = 2000;
+
+ final Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
- int i = 0;
- long tStamp = System.currentTimeMillis();
- while (i++ < sendCount) {
- producer.send(session.createTextMessage("test"));
- if (i%100 == 0) {
- LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
- tStamp = System.currentTimeMillis() ;
- }
- }
+ int i = 0;
+ long tStamp = System.currentTimeMillis();
+ while (i++ < sendCount) {
+ producer.send(session.createTextMessage("test"));
+ if (i%100 == 0) {
+ LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
+ tStamp = System.currentTimeMillis() ;
+ }
+ }
} catch (Throwable ex) {
ex.printStackTrace();
}
}
- };
-
- producingThread.start();
-
- assertTrue("producer completed within time", Wait.waitFor(new Wait.Condition() {
+ };
+
+ producingThread.start();
+
+ assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
producingThread.join(1000);
return !producingThread.isAlive();
}
- }));
-
+ }));
+
final DestinationViewMBean view = createView(destination);
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
@@ -156,14 +156,14 @@ public class ExpiredMessagesWithNoConsum
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
-
+
assertEquals("All sent have expired", sendCount, view.getExpiredCount());
assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
- }
-
- // first ack delivered after expiry
+ }
+
+ // first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
- createBroker();
+ createBroker();
final long queuePrefetch = 600;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
@@ -171,11 +171,11 @@ public class ExpiredMessagesWithNoConsum
producer = session.createProducer(destination);
final int ttl = 4000;
producer.setTimeToLive(ttl);
-
- final long sendCount = 1500;
+
+ final long sendCount = 1500;
final CountDownLatch receivedOneCondition = new CountDownLatch(1);
final CountDownLatch waitCondition = new CountDownLatch(1);
-
+
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@@ -189,13 +189,13 @@ public class ExpiredMessagesWithNoConsum
} catch (Exception e) {
e.printStackTrace();
fail(e.toString());
- }
- }
+ }
+ }
});
-
+
connection.start();
-
-
+
+
final Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
@@ -213,19 +213,19 @@ public class ExpiredMessagesWithNoConsum
}
}
};
-
+
producingThread.start();
assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
-
- assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
+
+ assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
producingThread.join(1000);
return !producingThread.isAlive();
- }
+ }
}, Wait.MAX_WAIT_MILLIS * 2));
-
+
final DestinationViewMBean view = createView(destination);
-
+
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return queuePrefetch == view.getDispatchCount();
@@ -235,15 +235,15 @@ public class ExpiredMessagesWithNoConsum
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
- }));
-
+ }));
+
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
-
+
// let the ack happen
waitCondition.countDown();
-
+
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
// consumer ackLater(delivery ack for expired messages) is based on half the prefetch value
@@ -254,21 +254,21 @@ public class ExpiredMessagesWithNoConsum
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
-
-
+
+
assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
-
+
consumer.close();
-
+
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
}
});
assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
-
+
LOG.info("done: " + getName());
}
@@ -328,7 +328,7 @@ public class ExpiredMessagesWithNoConsum
producingThread.start();
assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
- assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
+ assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
producingThread.join(1000);
return !producingThread.isAlive();
@@ -463,7 +463,7 @@ public class ExpiredMessagesWithNoConsum
- protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+ protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
@@ -475,13 +475,13 @@ public class ExpiredMessagesWithNoConsum
true);
}
- protected void tearDown() throws Exception {
- connection.stop();
- broker.stop();
- broker.waitUntilStopped();
- }
+ protected void tearDown() throws Exception {
+ connection.stop();
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+
-
-
}