You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/21 15:38:40 UTC
svn commit: r966223 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/broker/region/cursors/
src/test/java/org/apache/activemq/store/
src/test/java/org/apache/activemq/store/jdbc/
Author: dejanb
Date: Wed Jul 21 13:38:40 2010
New Revision: 966223
URL: http://svn.apache.org/viewvc?rev=966223&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2789 and https://issues.apache.org/activemq/browse/AMQ-2843 - adds tests for cursors and stores and fixes a PrioritizedPendingList bug
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Jul 21 13:38:40 2010
@@ -506,6 +506,9 @@
<!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
<exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
+ <!-- exclude until implemented -->
+ <exclude>**/JDBCMessagePriorityTest.*</exclude>
+
</excludes>
</configuration>
</plugin>
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java Wed Jul 21 13:38:40 2010
@@ -82,7 +82,7 @@ public class PrioritizedPendingList impl
protected int getPriority(MessageReference message) {
int priority = javax.jms.Message.DEFAULT_PRIORITY;
if (message.getMessageId() != null) {
- Math.max(message.getMessage().getPriority(), 0);
+ priority = Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);
}
return priority;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Wed Jul 21 13:38:40 2010
@@ -18,19 +18,28 @@
package org.apache.activemq.store;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
-
-import junit.framework.TestCase;
+import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
-abstract public class MessagePriorityTest extends TestCase {
+abstract public class MessagePriorityTest extends CombinationTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(MessagePriorityTest.class);
BrokerService broker;
PersistenceAdapter adapter;
@@ -39,6 +48,12 @@ abstract public class MessagePriorityTes
Connection conn;
Session sess;
+ public boolean useCache;
+
+ int MSG_NUM = 1000;
+ int HIGH_PRI = 7;
+ int LOW_PRI = 3;
+
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
protected void setUp() throws Exception {
@@ -48,6 +63,7 @@ abstract public class MessagePriorityTes
broker.setPersistenceAdapter(adapter);
PolicyEntry policy = new PolicyEntry();
policy.setPrioritizedMessages(true);
+ policy.setUseCache(useCache);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
broker.setDestinationPolicy(policyMap);
@@ -56,6 +72,8 @@ abstract public class MessagePriorityTes
factory = new ActiveMQConnectionFactory("vm://priorityTest");
conn = factory.createConnection();
+ conn.setClientID("priority");
+ conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@@ -75,7 +93,7 @@ abstract public class MessagePriorityTes
MessageProducer topicProducer = sess.createProducer(topic);
- Thread.sleep(100); // get it all propagated
+ Thread.sleep(500); // get it all propagated
assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
@@ -85,4 +103,84 @@ abstract public class MessagePriorityTes
}
+ class ProducerThread extends Thread {
+
+ int priority;
+ int messageCount;
+ ActiveMQDestination dest;
+
+ public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
+ this.messageCount = messageCount;
+ this.priority = priority;
+ this.dest = dest;
+ }
+
+ public void run() {
+ try {
+ MessageProducer producer = sess.createProducer(dest);
+ producer.setPriority(priority);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(sess.createTextMessage("message priority: " + priority));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ public void initCombosForTestQueues() {
+ addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)});
+ }
+
+ public void testQueues() throws Exception {
+ ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
+
+ ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI);
+ ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI);
+
+ lowPri.start();
+ highPri.start();
+
+ lowPri.join();
+ highPri.join();
+
+ MessageConsumer queueConsumer = sess.createConsumer(queue);
+ for (int i = 0; i < MSG_NUM * 2; i++) {
+ Message msg = queueConsumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
+ }
+ }
+
+ protected Message createMessage(int priority) throws Exception {
+ final String text = "Message with priority " + priority;
+ Message msg = sess.createTextMessage(text);
+ LOG.info("Sending " + text);
+ return msg;
+ }
+
+ public void testDurableSubs() throws Exception {
+ ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+ TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority");
+ sub.close();
+
+ ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+ ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+
+ lowPri.start();
+ highPri.start();
+
+ lowPri.join();
+ highPri.join();
+
+ sub = sess.createDurableSubscriber(topic, "priority");
+ for (int i = 0; i < MSG_NUM * 2; i++) {
+ Message msg = sub.receive(1000);
+ assertNotNull(msg);
+ assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
+ }
+
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=966223&r1=966222&r2=966223&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Wed Jul 21 13:38:40 2010
@@ -17,6 +17,8 @@
package org.apache.activemq.store.jdbc;
+import junit.framework.Test;
+
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -30,7 +32,12 @@ public class JDBCMessagePriorityTest ext
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
+ jdbc.deleteAllMessages();
return jdbc;
}
+
+ public static Test suite() {
+ return suite(JDBCMessagePriorityTest.class);
+ }
}