You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/05 14:33:10 UTC
svn commit: r1154189 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Author: tabish
Date: Fri Aug 5 12:33:10 2011
New Revision: 1154189
URL: http://svn.apache.org/viewvc?rev=1154189&view=rev
Log:
Allows the class to set a field that isn't public but has a setter method. Fixes the compile warnings as well.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1154189&r1=1154188&r2=1154189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Fri Aug 5 12:33:10 2011
@@ -41,19 +41,20 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
+ private final ActiveMQDestination destination = new ActiveMQQueue("test");
+
+ private boolean optimizedDispatch = true;
+ private PendingQueueMessageStoragePolicy pendingQueuePolicy;
- BrokerService broker;
- Connection connection;
- Session session;
- MessageProducer producer;
- public ActiveMQDestination destination = new ActiveMQQueue("test");
- public boolean optimizedDispatch = true;
- public PendingQueueMessageStoragePolicy pendingQueuePolicy;
+ private BrokerService broker;
+ private String connectionUri;
+ private Connection connection;
+ private Session session;
+ private MessageProducer producer;
public static Test suite() {
return suite(ExpiredMessagesWithNoConsumerTest.class);
@@ -76,11 +77,11 @@ public class ExpiredMessagesWithNoConsum
broker.setBrokerName("localhost");
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
- broker.addConnector("tcp://localhost:61616");
+ broker.addConnector("tcp://localhost:0");
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setOptimizedDispatch(optimizedDispatch );
+ defaultEntry.setOptimizedDispatch(optimizedDispatch);
defaultEntry.setExpireMessagesPeriod(800);
defaultEntry.setMaxExpirePageSize(800);
@@ -93,11 +94,12 @@ public class ExpiredMessagesWithNoConsum
}
policyMap.setDefaultEntry(defaultEntry);
- broker.setDestinationPolicy(policyMap);
+ broker.setDestinationPolicy(policyMap);
broker.start();
-
broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void initCombosForTestExpiredMessagesWithNoConsumer() {
@@ -109,7 +111,7 @@ public class ExpiredMessagesWithNoConsum
createBrokerWithMemoryLimit();
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
@@ -139,7 +141,7 @@ public class ExpiredMessagesWithNoConsum
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
- producingThread.join(1000);
+ producingThread.join(TimeUnit.SECONDS.toMillis(1000));
return !producingThread.isAlive();
}
}));
@@ -157,15 +159,16 @@ public class ExpiredMessagesWithNoConsum
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
- assertEquals("All sent have expired", sendCount, view.getExpiredCount());
- assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
+ assertEquals("Not all sent messages have expired", sendCount, view.getExpiredCount());
+ assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
}
// first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker();
final long queuePrefetch = 600;
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
@@ -183,7 +186,7 @@ public class ExpiredMessagesWithNoConsum
try {
LOG.info("Got my message: " + message);
receivedOneCondition.countDown();
- waitCondition.await(60, TimeUnit.SECONDS);
+ waitCondition.await(6, TimeUnit.MINUTES);
LOG.info("acking message: " + message);
message.acknowledge();
} catch (Exception e) {
@@ -195,7 +198,6 @@ public class ExpiredMessagesWithNoConsum
connection.start();
-
final Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
@@ -222,7 +224,7 @@ public class ExpiredMessagesWithNoConsum
producingThread.join(1000);
return !producingThread.isAlive();
}
- }, Wait.MAX_WAIT_MILLIS * 2));
+ }, Wait.MAX_WAIT_MILLIS * 10));
final DestinationViewMBean view = createView(destination);
@@ -231,7 +233,7 @@ public class ExpiredMessagesWithNoConsum
return queuePrefetch == view.getDispatchCount();
}
}));
- assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+ assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
@@ -255,10 +257,10 @@ public class ExpiredMessagesWithNoConsum
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
-
- assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
- assertEquals("size gets back to 0 ", 0, view.getQueueSize());
- assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
+ assertEquals("inflight didn't reduce to half prefetch minus single delivered message",
+ (queuePrefetch/2) -1, view.getInFlightCount());
+ assertEquals("size didn't get back to 0 ", 0, view.getQueueSize());
+ assertEquals("dequeues didn't match sent/expired ", sendCount, view.getDequeueCount());
consumer.close();
@@ -275,7 +277,8 @@ public class ExpiredMessagesWithNoConsum
public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
createBroker();
final long queuePrefetch = 600;
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
@@ -291,11 +294,15 @@ public class ExpiredMessagesWithNoConsum
public void onMessage(Message message) {
try {
- LOG.info("Got my message: " + message);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got my message: " + message);
+ }
receivedOneCondition.countDown();
received.incrementAndGet();
- waitCondition.await(60, TimeUnit.SECONDS);
- LOG.info("acking message: " + message);
+ waitCondition.await(5, TimeUnit.MINUTES);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("acking message: " + message);
+ }
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
@@ -306,7 +313,6 @@ public class ExpiredMessagesWithNoConsum
connection.start();
-
final Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
@@ -333,16 +339,16 @@ public class ExpiredMessagesWithNoConsum
producingThread.join(1000);
return !producingThread.isAlive();
}
- }));
+ }, Wait.MAX_WAIT_MILLIS * 10));
final DestinationViewMBean view = createView(destination);
- assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
+ assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return queuePrefetch == view.getDispatchCount();
}
}));
- assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+ assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
@@ -366,16 +372,20 @@ public class ExpiredMessagesWithNoConsum
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
-
- assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
- assertEquals("size gets back to 0 ", 0, view.getQueueSize());
- assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
-
+ assertEquals("inflight didn't reduce to half prefetch minus single delivered message",
+ (queuePrefetch/2) -1, view.getInFlightCount());
+ assertEquals("size doesn't get back to 0 ", 0, view.getQueueSize());
+ assertEquals("dequeues don't match sent/expired ", sendCount, view.getDequeueCount());
// produce some more
producer.setTimeToLive(0);
+ long tStamp = System.currentTimeMillis();
for (int i=0; i<sendCount; i++) {
producer.send(session.createTextMessage("test-" + i));
+ if (i%100 == 0) {
+ LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
+ tStamp = System.currentTimeMillis() ;
+ }
}
Wait.waitFor(new Wait.Condition() {
@@ -391,15 +401,14 @@ public class ExpiredMessagesWithNoConsum
return 0 == view.getInFlightCount();
}
});
- assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
+ assertEquals("inflight did not go to zeor on close", 0, view.getInFlightCount());
LOG.info("done: " + getName());
}
-
public void testExpireMessagesForDurableSubscriber() throws Exception {
createBroker();
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
connection.setClientID("myConnection");
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -420,20 +429,17 @@ public class ExpiredMessagesWithNoConsum
DestinationViewMBean view = createView((ActiveMQTopic)destination);
-
LOG.info("messages sent");
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(0, view.getExpiredCount());
assertEquals(10, view.getEnqueueCount());
-
Thread.sleep(4000);
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(10, view.getExpiredCount());
assertEquals(0, view.getEnqueueCount());
-
final AtomicLong received = new AtomicLong();
sub = session.createDurableSubscriber(destination, "mySub");
sub.setMessageListener(new MessageListener() {
@@ -445,7 +451,6 @@ public class ExpiredMessagesWithNoConsum
LOG.info("Waiting for messages to arrive");
-
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return received.get() >= sendCount;
@@ -458,11 +463,8 @@ public class ExpiredMessagesWithNoConsum
assertEquals(0, received.get());
assertEquals(10, view.getExpiredCount());
assertEquals(0, view.getEnqueueCount());
-
}
-
-
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;
@@ -481,7 +483,19 @@ public class ExpiredMessagesWithNoConsum
broker.waitUntilStopped();
}
+ public boolean getOptimizedDispatch() {
+ return this.optimizedDispatch;
+ }
+ public void setOptimizedDispatch(boolean option) {
+ this.optimizedDispatch = option;
+ }
+ public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
+ return this.pendingQueuePolicy;
+ }
+ public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy policy) {
+ this.pendingQueuePolicy = policy;
+ }
}