You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/21 15:38:40 UTC

svn commit: r966223 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/cursors/ src/test/java/org/apache/activemq/store/ src/test/java/org/apache/activemq/store/jdbc/

Author: dejanb
Date: Wed Jul 21 13:38:40 2010
New Revision: 966223

URL: http://svn.apache.org/viewvc?rev=966223&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2789 and https://issues.apache.org/activemq/browse/AMQ-2843 - adds tests for cursors and stores and fixes a PrioritizedPendingList bug

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Jul 21 13:38:40 2010
@@ -506,6 +506,9 @@
             <!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
             <exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
             
+            <!-- exclude until implemented -->
+            <exclude>**/JDBCMessagePriorityTest.*</exclude>
+            
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java Wed Jul 21 13:38:40 2010
@@ -82,7 +82,7 @@ public class PrioritizedPendingList impl
     protected int getPriority(MessageReference message) {
         int priority = javax.jms.Message.DEFAULT_PRIORITY;
         if (message.getMessageId() != null) {
-            Math.max(message.getMessage().getPriority(), 0);
+            priority = Math.max(message.getMessage().getPriority(), 0);
             priority = Math.min(priority, 9);
         }
         return priority;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Wed Jul 21 13:38:40 2010
@@ -18,19 +18,28 @@
 package org.apache.activemq.store;
 
 import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
-
-import junit.framework.TestCase;
+import javax.jms.TopicSubscriber;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-abstract public class MessagePriorityTest extends TestCase {
+abstract public class MessagePriorityTest extends CombinationTestSupport {
+    
+    private static final Log LOG = LogFactory.getLog(MessagePriorityTest.class);
 
     BrokerService broker;
     PersistenceAdapter adapter;
@@ -39,6 +48,12 @@ abstract public class MessagePriorityTes
     Connection conn;
     Session sess;
     
+    public boolean useCache;
+    
+    int MSG_NUM = 1000;
+    int HIGH_PRI = 7;
+    int LOW_PRI = 3;
+    
     abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
     
     protected void setUp() throws Exception {
@@ -48,6 +63,7 @@ abstract public class MessagePriorityTes
         broker.setPersistenceAdapter(adapter);
         PolicyEntry policy = new PolicyEntry();
         policy.setPrioritizedMessages(true);
+        policy.setUseCache(useCache);
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(policy);
         broker.setDestinationPolicy(policyMap);
@@ -56,6 +72,8 @@ abstract public class MessagePriorityTes
         
         factory = new ActiveMQConnectionFactory("vm://priorityTest");
         conn = factory.createConnection();
+        conn.setClientID("priority");
+        conn.start();
         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
     
@@ -75,7 +93,7 @@ abstract public class MessagePriorityTes
         MessageProducer topicProducer = sess.createProducer(topic);
         
         
-        Thread.sleep(100); // get it all propagated
+        Thread.sleep(500); // get it all propagated
         
         assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
         assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
@@ -85,4 +103,84 @@ abstract public class MessagePriorityTes
         
     }
     
+    class ProducerThread extends Thread {
+
+        int priority;
+        int messageCount;
+        ActiveMQDestination dest;
+        
+        public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
+            this.messageCount = messageCount;
+            this.priority = priority;
+            this.dest = dest;
+        }
+        
+        public void run() {
+            try {
+                MessageProducer producer = sess.createProducer(dest);
+                producer.setPriority(priority);
+                for (int i = 0; i < messageCount; i++) {
+                    producer.send(sess.createTextMessage("message priority: " + priority));
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        
+    }
+    
+    public void initCombosForTestQueues() {
+        addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)});
+    }
+    
+    public void testQueues() throws Exception {
+        ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
+
+        ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI);
+        ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI);
+        
+        lowPri.start();
+        highPri.start();
+        
+        lowPri.join();
+        highPri.join();
+        
+        MessageConsumer queueConsumer = sess.createConsumer(queue);
+        for (int i = 0; i < MSG_NUM * 2; i++) {
+            Message msg = queueConsumer.receive(1000);
+            assertNotNull(msg);
+            assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
+        }
+    }
+    
+    protected Message createMessage(int priority) throws Exception {
+        final String text = "Message with priority " + priority;
+        Message msg = sess.createTextMessage(text);
+        LOG.info("Sending  " + text);
+        return msg;
+    }
+    
+    public void testDurableSubs() throws Exception {
+        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority");
+        sub.close();
+        
+        ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+        ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+        
+        lowPri.start();
+        highPri.start();
+        
+        lowPri.join();
+        highPri.join();
+        
+        sub = sess.createDurableSubscriber(topic, "priority");
+        for (int i = 0; i < MSG_NUM * 2; i++) {
+            Message msg = sub.receive(1000);
+            assertNotNull(msg);
+            assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
+        }
+        
+    }
+    
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Wed Jul 21 13:38:40 2010
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.store.jdbc;
 
+import junit.framework.Test;
+
 import org.apache.activemq.store.MessagePriorityTest;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -30,7 +32,12 @@ public class JDBCMessagePriorityTest ext
         dataSource.setDatabaseName("derbyDb");
         dataSource.setCreateDatabase("create");
         jdbc.setDataSource(dataSource);
+        jdbc.deleteAllMessages();
         return jdbc;
     }
+    
+    public static Test suite() {
+        return suite(JDBCMessagePriorityTest.class);
+    }
 
 }