You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/08/09 12:05:55 UTC
svn commit: r983571 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/store/
Author: robbie
Date: Mon Aug 9 10:05:54 2010
New Revision: 983571
URL: http://svn.apache.org/viewvc?rev=983571&view=rev
Log:
QPID-2787: Add test for persistence of Conflation/LastValue queues
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Mon Aug 9 10:05:54 2010
@@ -33,10 +33,9 @@ import java.util.HashMap;
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
-
- private static final String QPID_LVQ_KEY = "qpid.LVQ_key";
- private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
- private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+ public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+ public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
private abstract static class QueueProperty
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Mon Aug 9 10:05:54 2010
@@ -39,5 +39,9 @@ public class ConflationQueue extends Sim
super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
}
+ public String getConflationKey()
+ {
+ return ((ConflationQueueList) _entries).getConflationKey();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Mon Aug 9 10:05:54 2010
@@ -21,19 +21,13 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.message.ServerMessage;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+
public class ConflationQueueList extends SimpleQueueEntryList
{
@@ -47,6 +41,11 @@ public class ConflationQueueList extends
_conflationKey = conflationKey;
}
+ public String getConflationKey()
+ {
+ return _conflationKey;
+ }
+
@Override
protected ConflationQueueEntry createQueueEntry(ServerMessage message)
{
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Mon Aug 9 10:05:54 2010
@@ -47,6 +47,7 @@ import org.apache.qpid.server.queue.AMQP
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.ConflationQueue;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -67,6 +68,7 @@ public class MessageStoreTest extends In
{
public static final int DEFAULT_PRIORTY_LEVEL = 5;
public static final String SELECTOR_VALUE = "Test = 'MST'";
+ public static final String LVQ_KEY = "MST-LVQ-KEY";
AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
@@ -79,6 +81,7 @@ public class MessageStoreTest extends In
AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
+ AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
AMQShortString queueName = new AMQShortString("MST-Queue");
@@ -180,7 +183,7 @@ public class MessageStoreTest extends In
validateMessageOnTopics(2, true);
assertEquals("Not all queues correctly registered",
- 9, _virtualHost.getQueueRegistry().getQueues().size());
+ 10, _virtualHost.getQueueRegistry().getQueues().size());
}
/**
@@ -212,7 +215,7 @@ public class MessageStoreTest extends In
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
assertEquals("Incorrect number of queues registered after recovery",
- 5, queueRegistry.getQueues().size());
+ 6, queueRegistry.getQueues().size());
//clear the queue
queueRegistry.getQueue(durableQueueName).clearQueue();
@@ -246,7 +249,7 @@ public class MessageStoreTest extends In
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
assertEquals("Incorrect number of queues registered after recovery",
- 5, queueRegistry.getQueues().size());
+ 6, queueRegistry.getQueues().size());
//Validate the non-Durable Queues were not recovered.
assertNull("Non-Durable queue still registered:" + priorityQueueName,
@@ -280,7 +283,7 @@ public class MessageStoreTest extends In
public void testDurableQueueRemoval() throws Exception
{
//Register Durable Queue
- createQueue(durableQueueName, false, true, false);
+ createQueue(durableQueueName, false, true, false, false);
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
assertEquals("Incorrect number of queues registered before recovery",
@@ -401,7 +404,7 @@ public class MessageStoreTest extends In
//create durable queue and exchange, bind them
Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
- createQueue(durableQueueName, false, true, false);
+ createQueue(durableQueueName, false, true, false, false);
bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
assertEquals("Incorrect number of bindings registered before recovery",
@@ -460,7 +463,7 @@ public class MessageStoreTest extends In
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- assertEquals("There should be 5 (durable) queues following recovery", 5, queueRegistry.getQueues().size());
+ assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size());
validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true);
@@ -515,24 +518,35 @@ public class MessageStoreTest extends In
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false);
- validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false);
- validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false);
- validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false);
- validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true);
+ validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false);
+ validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false);
+ validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false);
+ validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false);
+ validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false);
+ validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true);
}
- private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive)
+ private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
+ if(usePriority || lastValueQueue)
+ {
+ assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
+ }
+
if (usePriority)
{
assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
assertEquals("Priority Queue does not have set priorities",
DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
}
+ else if (lastValueQueue)
+ {
+ assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass());
+ assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey());
+ }
else
{
- assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass());
+ assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass());
}
assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
@@ -628,46 +642,60 @@ public class MessageStoreTest extends In
private void createAllQueues()
{
//Register Durable Priority Queue
- createQueue(durablePriorityQueueName, true, true, false);
+ createQueue(durablePriorityQueueName, true, true, false, false);
//Register Durable Simple Queue
- createQueue(durableQueueName, false, true, false);
+ createQueue(durableQueueName, false, true, false, false);
//Register Durable Exclusive Simple Queue
- createQueue(durableExclusiveQueueName, false, true, true);
+ createQueue(durableExclusiveQueueName, false, true, true, false);
+
+ //Register Durable LastValue Queue
+ createQueue(durableLastValueQueueName, false, true, true, true);
//Register NON-Durable Priority Queue
- createQueue(priorityQueueName, true, false, false);
+ createQueue(priorityQueueName, true, false, false, false);
//Register NON-Durable Simple Queue
- createQueue(queueName, false, false, false);
+ createQueue(queueName, false, false, false, false);
}
private void createAllTopicQueues()
{
//Register Durable Priority Queue
- createQueue(durablePriorityTopicQueueName, true, true, false);
+ createQueue(durablePriorityTopicQueueName, true, true, false, false);
//Register Durable Simple Queue
- createQueue(durableTopicQueueName, false, true, false);
+ createQueue(durableTopicQueueName, false, true, false, false);
//Register NON-Durable Priority Queue
- createQueue(priorityTopicQueueName, true, false, false);
+ createQueue(priorityTopicQueueName, true, false, false, false);
//Register NON-Durable Simple Queue
- createQueue(topicQueueName, false, false, false);
+ createQueue(topicQueueName, false, false, false, false);
}
- private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive)
+ private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
FieldTable queueArguments = null;
+
+ if(usePriority || lastValueQueue)
+ {
+ assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
+ }
if (usePriority)
{
queueArguments = new FieldTable();
queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
}
+
+ if (lastValueQueue)
+ {
+ queueArguments = new FieldTable();
+ queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY);
+ }
AMQQueue queue = null;
@@ -677,7 +705,7 @@ public class MessageStoreTest extends In
queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive,
_virtualHost, queueArguments);
- validateQueueProperties(queue, usePriority, durable, exclusive);
+ validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
if (queue.isDurable() && !queue.isAutoDelete())
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org