You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/08/10 21:01:24 UTC

svn commit: r1371809 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java

Author: tabish
Date: Fri Aug 10 19:01:24 2012
New Revision: 1371809

URL: http://svn.apache.org/viewvc?rev=1371809&view=rev
Log:
Test didn't need to depend on hard coded port so use :0 instead.  Add some additional test cases.

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java?rev=1371809&r1=1371808&r2=1371809&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java Fri Aug 10 19:01:24 2012
@@ -1,10 +1,8 @@
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.net.URI;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
@@ -18,77 +16,63 @@ import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.management.ObjectName;
 
-import junit.framework.Assert;
-
-import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageConsumer;
-import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.Wait;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test for AMQ-3965.
- * A consumer may be stalled in case it uses optimizeAcknowledge and receives 
- * a number of messages that expire before being dispatched to application code. 
- * See AMQ-3965 for more details.
+ * A consumer may be stalled in case it uses optimizeAcknowledge and receives
+ * a number of messages that expire before being dispatched to application code.
+ * See for more details.
+ *
  */
 public class OptimizeAcknowledgeWithExpiredMsgsTest {
-	
+
     private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
 
-    private static BrokerService broker = null; 
-    protected static final String DATA_DIR = "target/activemq-data/";
-    public final String brokerUrl =  "tcp://localhost:61614";
-
-   
-    /** 
-     * Creates a broker instance and starts it.
-     * 
+    private BrokerService broker = null;
+
+    private String connectionUri;
+
+    /**
+     * Creates a broker instance but does not start it.
+     *
      * @param brokerUri - transport uri of broker
      * @param brokerName - name for the broker
      * @return a BrokerService instance with transport uri and broker name set
      * @throws Exception
      */
-    protected BrokerService createBroker(URI brokerUri, String brokerName) throws Exception {
-        BrokerService broker = BrokerFactory.createBroker(brokerUri);
-        broker.setBrokerName(brokerName);
-        broker.setBrokerId(brokerName);
-        broker.setDataDirectory(DATA_DIR);
-        broker.setEnableStatistics(true);
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setDeleteAllMessagesOnStartup(true);
         broker.setUseJmx(false);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
         return broker;
     }
-    
-    
+
     @Before
     public void setUp() throws Exception {
-        final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
-        
-        broker = createBroker(new URI("broker:(" + brokerUrl + ")" + options), "localhost");
+        broker = createBroker();
         broker.start();
-    	broker.waitUntilStarted();
-    	
+        broker.waitUntilStarted();
     }
-    
-    
+
     @After
-    public void tearDown() throws Exception {    
-        if (broker != null)
-    		broker.stop();
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
     }
-	
 
     /**
      * Tests for AMQ-3965
@@ -96,136 +80,217 @@ public class OptimizeAcknowledgeWithExpi
      * Creates producer and consumer. Producer sends 45 msgs that will expire
      * at consumer (but before being dispatched to app code).
      * Producer then sends 60 msgs without expiry.
-     * 
+     *
      * Consumer receives msgs using a MessageListener and increments a counter.
-     * Main thread sleeps for 5 seconds and checks the counter value. 
+     * Main thread sleeps for 5 seconds and checks the counter value.
      * If counter != 60 msgs (the number of msgs that should get dispatched
-     * to consumer) the test fails. 
+     * to consumer) the test fails.
      */
     @Test
     public void testOptimizedAckWithExpiredMsgs() throws Exception
     {
-    	
-    	ActiveMQConnectionFactory connectionFactory = 
-    			new ActiveMQConnectionFactory(brokerUrl + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+        ActiveMQConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
 
         // Create JMS resources
         Connection connection = connectionFactory.createConnection();
-        connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = session.createQueue("TEST.FOO");
 
-        // ***** Consumer code ***** 
-        MessageConsumer consumer = session.createConsumer(destination); 
-                 
-        MyMessageListener listener = new MyMessageListener();
+        // ***** Consumer code *****
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final MyMessageListener listener = new MyMessageListener();
         connection.setExceptionListener((ExceptionListener) listener);
-                
+
         // ***** Producer Code *****
         MessageProducer producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
         String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
-        TextMessage message; 
-        
+        TextMessage message;
+
+        // Produce msgs that will expire quickly
+        for (int i=0; i<45; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,100);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 10 msec");
+        }
+        // Produce msgs that don't expire
+        for (int i=0; i<60; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,60000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
         consumer.setMessageListener(listener);
-        listener.setDelay(100);
-        
+
+        sleep(1000);  // let the batch of 45 expire.
+
+        connection.start();
+
+        assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getCounter() == 60;
+            }
+        }));
+
+        LOG.info("Received all expected messages with counter at: " + listener.getCounter());
+
+        // Cleanup
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+    @Test
+    public void testOptimizedAckWithExpiredMsgsSync() throws Exception
+    {
+        ActiveMQConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+        // Create JMS resources
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST.FOO");
+
+        // ***** Consumer code *****
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // ***** Producer Code *****
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+        TextMessage message;
+
         // Produce msgs that will expire quickly
         for (int i=0; i<45; i++) {
             message = session.createTextMessage(text);
-            producer.send(message,1,1,30);
-            LOG.trace("Sent message: "+ message.getJMSMessageID() + 
-            	" with expiry 30 msec");
+            producer.send(message,1,1,10);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 10 msec");
         }
         // Produce msgs that don't expire
         for (int i=0; i<60; i++) {
             message = session.createTextMessage(text);
-            producer.send(message);
-            LOG.trace("Sent message: "+ message.getJMSMessageID() + 
-            	" with no expiry.");               	
-        }
-        listener.setDelay(0);       
-
-        // set exit condition
-        TestExitCondition cond = new TestExitCondition(listener);
-        Wait.waitFor(cond, 5000);
-        
-        Assert.assertTrue("Error: Some non-expired messages were not received.", listener.getCounter() >= 60);
-        
-        LOG.info("Received all expected messages with counter at " + listener.getCounter());
-        
+            producer.send(message,1,1,30000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+        sleep(200);
+
+        int counter = 1;
+        for (; counter <= 60; ++counter) {
+            assertNotNull(consumer.receive(2000));
+            LOG.info("counter at " + counter);
+        }
+        LOG.info("Received all expected messages with counter at: " + counter);
+
+        // Cleanup
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+    @Test
+    public void testOptimizedAckWithExpiredMsgsSync2() throws Exception
+    {
+        ActiveMQConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+        // Create JMS resources
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST.FOO");
+
+        // ***** Consumer code *****
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // ***** Producer Code *****
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+        TextMessage message;
+
+        // Produce msgs that don't expire
+        for (int i=0; i<56; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,30000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+        // Produce msgs that will expire quickly
+        for (int i=0; i<44; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,10);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 10 msec");
+        }
+        // Produce some moremsgs that don't expire
+        for (int i=0; i<4; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,30000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+
+        sleep(200);
+
+        int counter = 1;
+        for (; counter <= 60; ++counter) {
+            assertNotNull(consumer.receive(2000));
+            LOG.info("counter at " + counter);
+        }
+        LOG.info("Received all expected messages with counter at: " + counter);
+
         // Cleanup
-        LOG.info("Cleaning up.");
         producer.close();
         consumer.close();
         session.close();
         connection.close();
-        listener = null;
     }
-    
 
     private void sleep(int milliSecondTime) {
         try {
             Thread.sleep(milliSecondTime);
         } catch (InterruptedException igonred) {
-        }    
+        }
     }
-    
-    
-    /**
-     * Defines the exit condition for the test.
-     */
-    private class TestExitCondition implements Wait.Condition {
 
-    	private MyMessageListener listener;
-    	
-    	public TestExitCondition(MyMessageListener l) {
-    		this.listener = l;
-    	}
-    	
-		public boolean isSatisified() throws Exception {
-    		return listener.getCounter() == 36;
-		}
-    	
-    }
-    
-    
-    /** 
+    /**
      * Standard JMS MessageListener
      */
     private class MyMessageListener implements MessageListener, ExceptionListener {
-    	
-    	private AtomicInteger counter = new AtomicInteger(0);
-    	private int delay = 0;
-    	
-    	public void onMessage(final Message message) { 
-            try { 
-                LOG.trace("Got Message " + message.getJMSMessageID()); 
-                LOG.debug("counter at " + counter.incrementAndGet());
-                if (delay>0) {
-                	sleep(delay);
-                }
-            } catch (final Exception e) { 
-                e.printStackTrace(); 
+
+        private AtomicInteger counter = new AtomicInteger(0);
+
+        public void onMessage(final Message message) {
+            try {
+                LOG.trace("Got Message " + message.getJMSMessageID());
+                LOG.info("counter at " + counter.incrementAndGet());
+            } catch (final Exception e) {
             }
-        } 
-    	
-    	public int getCounter() {
-    		return counter.get();
-    	}
-    	
-    	public int getDelay() {
-    		return delay;
-    	}
-    	
-    	public void setDelay(int newDelay) {
-    		this.delay = newDelay;
-    	}
-    	
-    	public synchronized void onException(JMSException ex) {
+        }
+
+        public int getCounter() {
+            return counter.get();
+        }
+
+        public synchronized void onException(JMSException ex) {
             LOG.error("JMS Exception occured.  Shutting down client.");
         }
     }
 }
- 
\ No newline at end of file