You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/08/09 12:05:55 UTC

svn commit: r983571 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/store/

Author: robbie
Date: Mon Aug  9 10:05:54 2010
New Revision: 983571

URL: http://svn.apache.org/viewvc?rev=983571&view=rev
Log:
QPID-2787: Add test for persistence of Conflation/LastValue queues

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Mon Aug  9 10:05:54 2010
@@ -33,10 +33,9 @@ import java.util.HashMap;
 public class AMQQueueFactory
 {
     public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
-
-    private static final String QPID_LVQ_KEY = "qpid.LVQ_key";
-    private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
-    private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+    public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+    public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+    public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
 
     private abstract static class QueueProperty
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Mon Aug  9 10:05:54 2010
@@ -39,5 +39,9 @@ public class ConflationQueue extends Sim
         super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
     }
 
+    public String getConflationKey()
+    {
+        return ((ConflationQueueList) _entries).getConflationKey();
+    }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Mon Aug  9 10:05:54 2010
@@ -21,19 +21,13 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.message.ServerMessage;
-
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+
 public class ConflationQueueList extends SimpleQueueEntryList
 {
 
@@ -47,6 +41,11 @@ public class ConflationQueueList extends
         _conflationKey = conflationKey;
     }
 
+    public String getConflationKey()
+    {
+        return _conflationKey;
+    }
+
     @Override
     protected ConflationQueueEntry createQueueEntry(ServerMessage message)
     {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=983571&r1=983570&r2=983571&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Mon Aug  9 10:05:54 2010
@@ -47,6 +47,7 @@ import org.apache.qpid.server.queue.AMQP
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.ConflationQueue;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -67,6 +68,7 @@ public class MessageStoreTest extends In
 {
     public static final int DEFAULT_PRIORTY_LEVEL = 5;
     public static final String SELECTOR_VALUE = "Test = 'MST'";
+    public static final String LVQ_KEY = "MST-LVQ-KEY";
 
     AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
     AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
@@ -79,6 +81,7 @@ public class MessageStoreTest extends In
 
     AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
     AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
+    AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
     AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
     AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
     AMQShortString queueName = new AMQShortString("MST-Queue");
@@ -180,7 +183,7 @@ public class MessageStoreTest extends In
         validateMessageOnTopics(2, true);
 
         assertEquals("Not all queues correctly registered",
-                9, _virtualHost.getQueueRegistry().getQueues().size());
+                10, _virtualHost.getQueueRegistry().getQueues().size());
     }
 
     /**
@@ -212,7 +215,7 @@ public class MessageStoreTest extends In
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
         assertEquals("Incorrect number of queues registered after recovery", 
-                5,  queueRegistry.getQueues().size());
+                6,  queueRegistry.getQueues().size());
 
         //clear the queue
         queueRegistry.getQueue(durableQueueName).clearQueue();
@@ -246,7 +249,7 @@ public class MessageStoreTest extends In
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
         assertEquals("Incorrect number of queues registered after recovery", 
-                5,  queueRegistry.getQueues().size());
+                6,  queueRegistry.getQueues().size());
 
         //Validate the non-Durable Queues were not recovered.
         assertNull("Non-Durable queue still registered:" + priorityQueueName, 
@@ -280,7 +283,7 @@ public class MessageStoreTest extends In
     public void testDurableQueueRemoval() throws Exception
     {
         //Register Durable Queue
-        createQueue(durableQueueName, false, true, false);
+        createQueue(durableQueueName, false, true, false, false);
 
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
         assertEquals("Incorrect number of queues registered before recovery",
@@ -401,7 +404,7 @@ public class MessageStoreTest extends In
 
         //create durable queue and exchange, bind them
         Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
-        createQueue(durableQueueName, false, true, false);
+        createQueue(durableQueueName, false, true, false, false);
         bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
 
         assertEquals("Incorrect number of bindings registered before recovery", 
@@ -460,7 +463,7 @@ public class MessageStoreTest extends In
     {
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
-        assertEquals("There should be 5 (durable) queues following recovery", 5, queueRegistry.getQueues().size());
+        assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size());
 
         validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false);
         validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true);
@@ -515,24 +518,35 @@ public class MessageStoreTest extends In
     {
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
-        validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false);
-        validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false);
-        validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false);
-        validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false);
-        validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true);
+        validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false);
+        validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false);
+        validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false);
+        validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false);
+        validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false);
+        validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true);
     }
 
-    private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive)
+    private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
     {
+        if(usePriority || lastValueQueue)
+        {
+            assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
+        }
+        
         if (usePriority)
         {
             assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
             assertEquals("Priority Queue does not have set priorities",
                     DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
         }
+        else if (lastValueQueue)
+        {
+            assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass());
+            assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey());
+        }
         else
         {
-            assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass());
+            assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass());
         }
 
         assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
@@ -628,46 +642,60 @@ public class MessageStoreTest extends In
     private void createAllQueues()
     {
         //Register Durable Priority Queue
-        createQueue(durablePriorityQueueName, true, true, false);
+        createQueue(durablePriorityQueueName, true, true, false, false);
 
         //Register Durable Simple Queue
-        createQueue(durableQueueName, false, true, false);
+        createQueue(durableQueueName, false, true, false, false);
 
         //Register Durable Exclusive Simple Queue
-        createQueue(durableExclusiveQueueName, false, true, true);
+        createQueue(durableExclusiveQueueName, false, true, true, false);
+
+        //Register Durable LastValue Queue
+        createQueue(durableLastValueQueueName, false, true, true, true);
 
         //Register NON-Durable Priority Queue
-        createQueue(priorityQueueName, true, false, false);
+        createQueue(priorityQueueName, true, false, false, false);
 
         //Register NON-Durable Simple Queue
-        createQueue(queueName, false, false, false);
+        createQueue(queueName, false, false, false, false);
     }
 
     private void createAllTopicQueues()
     {
         //Register Durable Priority Queue
-        createQueue(durablePriorityTopicQueueName, true, true, false);
+        createQueue(durablePriorityTopicQueueName, true, true, false, false);
 
         //Register Durable Simple Queue
-        createQueue(durableTopicQueueName, false, true, false);
+        createQueue(durableTopicQueueName, false, true, false, false);
 
         //Register NON-Durable Priority Queue
-        createQueue(priorityTopicQueueName, true, false, false);
+        createQueue(priorityTopicQueueName, true, false, false, false);
 
         //Register NON-Durable Simple Queue
-        createQueue(topicQueueName, false, false, false);
+        createQueue(topicQueueName, false, false, false, false);
     }
 
-    private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive)
+    private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
     {
 
         FieldTable queueArguments = null;
+        
+        if(usePriority || lastValueQueue)
+        {
+            assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
+        }
 
         if (usePriority)
         {
             queueArguments = new FieldTable();
             queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
         }
+        
+        if (lastValueQueue)
+        {
+            queueArguments = new FieldTable();
+            queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY);
+        }
 
         AMQQueue queue = null;
 
@@ -677,7 +705,7 @@ public class MessageStoreTest extends In
             queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive,
                     _virtualHost, queueArguments);
 
-            validateQueueProperties(queue, usePriority, durable, exclusive);
+            validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
 
             if (queue.isDurable() && !queue.isAutoDelete())
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org