You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/05 14:33:10 UTC

svn commit: r1154189 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Author: tabish
Date: Fri Aug  5 12:33:10 2011
New Revision: 1154189

URL: http://svn.apache.org/viewvc?rev=1154189&view=rev
Log:
Allows the class to set a field that isn't public but has a setter method.  Fixes the compile warnings as well.

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1154189&r1=1154188&r2=1154189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Fri Aug  5 12:33:10 2011
@@ -41,19 +41,20 @@ import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
 
+    private final ActiveMQDestination destination = new ActiveMQQueue("test");
+
+    private boolean optimizedDispatch = true;
+    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
 
-    BrokerService broker;
-    Connection connection;
-    Session session;
-    MessageProducer producer;
-    public ActiveMQDestination destination = new ActiveMQQueue("test");
-    public boolean optimizedDispatch = true;
-    public PendingQueueMessageStoragePolicy pendingQueuePolicy;
+    private BrokerService broker;
+    private String connectionUri;
+    private Connection connection;
+    private Session session;
+    private MessageProducer producer;
 
     public static Test suite() {
         return suite(ExpiredMessagesWithNoConsumerTest.class);
@@ -76,11 +77,11 @@ public class ExpiredMessagesWithNoConsum
         broker.setBrokerName("localhost");
         broker.setUseJmx(true);
         broker.setDeleteAllMessagesOnStartup(true);
-        broker.addConnector("tcp://localhost:61616");
+        broker.addConnector("tcp://localhost:0");
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
-        defaultEntry.setOptimizedDispatch(optimizedDispatch );
+        defaultEntry.setOptimizedDispatch(optimizedDispatch);
         defaultEntry.setExpireMessagesPeriod(800);
         defaultEntry.setMaxExpirePageSize(800);
 
@@ -93,11 +94,12 @@ public class ExpiredMessagesWithNoConsum
         }
 
         policyMap.setDefaultEntry(defaultEntry);
-        broker.setDestinationPolicy(policyMap);
 
+        broker.setDestinationPolicy(policyMap);
         broker.start();
-
         broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
 
     public void initCombosForTestExpiredMessagesWithNoConsumer() {
@@ -109,7 +111,7 @@ public class ExpiredMessagesWithNoConsum
 
         createBrokerWithMemoryLimit();
 
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
         connection = factory.createConnection();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         producer = session.createProducer(destination);
@@ -139,7 +141,7 @@ public class ExpiredMessagesWithNoConsum
 
         assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
-                producingThread.join(1000);
+                producingThread.join(TimeUnit.SECONDS.toMillis(1000));
                 return !producingThread.isAlive();
             }
         }));
@@ -157,15 +159,16 @@ public class ExpiredMessagesWithNoConsum
                 + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
                 + ", size= " + view.getQueueSize());
 
-        assertEquals("All sent have expired", sendCount, view.getExpiredCount());
-        assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
+        assertEquals("Not all sent messages have expired", sendCount, view.getExpiredCount());
+        assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
     }
 
     // first ack delivered after expiry
     public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
         createBroker();
         final long queuePrefetch = 600;
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
         connection = factory.createConnection();
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         producer = session.createProducer(destination);
@@ -183,7 +186,7 @@ public class ExpiredMessagesWithNoConsum
                 try {
                     LOG.info("Got my message: " + message);
                     receivedOneCondition.countDown();
-                    waitCondition.await(60, TimeUnit.SECONDS);
+                    waitCondition.await(6, TimeUnit.MINUTES);
                     LOG.info("acking message: " + message);
                     message.acknowledge();
                 } catch (Exception e) {
@@ -195,7 +198,6 @@ public class ExpiredMessagesWithNoConsum
 
         connection.start();
 
-
         final Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
@@ -222,7 +224,7 @@ public class ExpiredMessagesWithNoConsum
                 producingThread.join(1000);
                 return !producingThread.isAlive();
             }
-        }, Wait.MAX_WAIT_MILLIS * 2));
+        }, Wait.MAX_WAIT_MILLIS * 10));
 
         final DestinationViewMBean view = createView(destination);
 
@@ -231,7 +233,7 @@ public class ExpiredMessagesWithNoConsum
                 return queuePrefetch == view.getDispatchCount();
             }
         }));
-        assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+        assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return sendCount == view.getExpiredCount();
             }
@@ -255,10 +257,10 @@ public class ExpiredMessagesWithNoConsum
                 + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
                 + ", size= " + view.getQueueSize());
 
-
-        assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
-        assertEquals("size gets back to 0 ", 0, view.getQueueSize());
-        assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
+        assertEquals("inflight didn't reduce to half prefetch minus single delivered message",
+                     (queuePrefetch/2) -1, view.getInFlightCount());
+        assertEquals("size didn't get back to 0 ", 0, view.getQueueSize());
+        assertEquals("dequeues didn't match sent/expired ", sendCount, view.getDequeueCount());
 
         consumer.close();
 
@@ -275,7 +277,8 @@ public class ExpiredMessagesWithNoConsum
     public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
         createBroker();
         final long queuePrefetch = 600;
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
         connection = factory.createConnection();
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         producer = session.createProducer(destination);
@@ -291,11 +294,15 @@ public class ExpiredMessagesWithNoConsum
 
             public void onMessage(Message message) {
                 try {
-                    LOG.info("Got my message: " + message);
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Got my message: " + message);
+                    }
                     receivedOneCondition.countDown();
                     received.incrementAndGet();
-                    waitCondition.await(60, TimeUnit.SECONDS);
-                    LOG.info("acking message: " + message);
+                    waitCondition.await(5, TimeUnit.MINUTES);
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("acking message: " + message);
+                    }
                     message.acknowledge();
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -306,7 +313,6 @@ public class ExpiredMessagesWithNoConsum
 
         connection.start();
 
-
         final Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
@@ -333,16 +339,16 @@ public class ExpiredMessagesWithNoConsum
                 producingThread.join(1000);
                 return !producingThread.isAlive();
             }
-        }));
+        }, Wait.MAX_WAIT_MILLIS * 10));
 
         final DestinationViewMBean view = createView(destination);
 
-        assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
+        assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return queuePrefetch == view.getDispatchCount();
             }
         }));
-        assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+        assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return sendCount == view.getExpiredCount();
             }
@@ -366,16 +372,20 @@ public class ExpiredMessagesWithNoConsum
                 + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
                 + ", size= " + view.getQueueSize());
 
-
-        assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
-        assertEquals("size gets back to 0 ", 0, view.getQueueSize());
-        assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
-
+        assertEquals("inflight didn't reduce to half prefetch minus single delivered message",
+                     (queuePrefetch/2) -1, view.getInFlightCount());
+        assertEquals("size doesn't get back to 0 ", 0, view.getQueueSize());
+        assertEquals("dequeues don't match sent/expired ", sendCount, view.getDequeueCount());
 
         // produce some more
         producer.setTimeToLive(0);
+        long tStamp = System.currentTimeMillis();
         for (int i=0; i<sendCount; i++) {
             producer.send(session.createTextMessage("test-" + i));
+            if (i%100 == 0) {
+                LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100)  + "m/ms");
+                tStamp = System.currentTimeMillis() ;
+            }
         }
 
         Wait.waitFor(new Wait.Condition() {
@@ -391,15 +401,14 @@ public class ExpiredMessagesWithNoConsum
                 return 0 == view.getInFlightCount();
             }
         });
-        assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
+        assertEquals("inflight did not go to zeor on close", 0, view.getInFlightCount());
 
         LOG.info("done: " + getName());
     }
 
-
     public void testExpireMessagesForDurableSubscriber() throws Exception {
         createBroker();
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
         connection = factory.createConnection();
         connection.setClientID("myConnection");
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -420,20 +429,17 @@ public class ExpiredMessagesWithNoConsum
 
         DestinationViewMBean view = createView((ActiveMQTopic)destination);
 
-
         LOG.info("messages sent");
         LOG.info("expired=" + view.getExpiredCount() + " " +  view.getEnqueueCount());
         assertEquals(0, view.getExpiredCount());
         assertEquals(10, view.getEnqueueCount());
 
-
         Thread.sleep(4000);
 
         LOG.info("expired=" + view.getExpiredCount() + " " +  view.getEnqueueCount());
         assertEquals(10, view.getExpiredCount());
         assertEquals(0, view.getEnqueueCount());
 
-
         final AtomicLong received = new AtomicLong();
         sub = session.createDurableSubscriber(destination, "mySub");
         sub.setMessageListener(new MessageListener() {
@@ -445,7 +451,6 @@ public class ExpiredMessagesWithNoConsum
 
         LOG.info("Waiting for messages to arrive");
 
-
         Wait.waitFor(new Wait.Condition() {
              public boolean isSatisified() throws Exception {
                  return received.get() >= sendCount;
@@ -458,11 +463,8 @@ public class ExpiredMessagesWithNoConsum
         assertEquals(0, received.get());
         assertEquals(10, view.getExpiredCount());
         assertEquals(0, view.getEnqueueCount());
-
     }
 
-
-
     protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
         String domain = "org.apache.activemq";
         ObjectName name;
@@ -481,7 +483,19 @@ public class ExpiredMessagesWithNoConsum
         broker.waitUntilStopped();
     }
 
+    public boolean getOptimizedDispatch() {
+        return this.optimizedDispatch;
+    }
 
+    public void setOptimizedDispatch(boolean option) {
+        this.optimizedDispatch = option;
+    }
 
+    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
+        return this.pendingQueuePolicy;
+    }
 
+    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy policy) {
+        this.pendingQueuePolicy = policy;
+    }
 }