You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2012/11/29 12:26:31 UTC

svn commit: r1415127 - in /qpid/trunk/qpid/java: bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java

Author: robbie
Date: Thu Nov 29 11:26:30 2012
New Revision: 1415127

URL: http://svn.apache.org/viewvc?rev=1415127&view=rev
Log:
QPID-4441: add system test verification of the priority queue and queue with DLQ following the store upgrade

Modified:
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1415127&r1=1415126&r2=1415127&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Thu Nov 29 11:26:30 2012
@@ -22,7 +22,9 @@ package org.apache.qpid.server.store.ber
 
 
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
@@ -32,6 +34,7 @@ import java.io.File;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -43,7 +46,10 @@ import javax.jms.TopicConnection;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
 
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -311,6 +317,104 @@ public class BDBUpgradeTest extends Qpid
         }
     }
 
+    /**
+     * Tests store upgrade has maintained the priority queue configuration,
+     * such that sending messages with priorities out-of-order and then consuming
+     * them gets the messages back in priority order.
+     */
+    public void testPriorityQueue() throws Exception
+    {
+        // Create a connection and start it
+        Connection connection = getConnection();
+        connection.start();
+
+        // send some messages to the priority queue
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+        MessageProducer producer = session.createProducer(queue);
+
+        producer.setPriority(4);
+        producer.send(createMessage(1, false, session, producer));
+        producer.setPriority(1);
+        producer.send(createMessage(2, false, session, producer));
+        producer.setPriority(9);
+        producer.send(createMessage(3, false, session, producer));
+        session.close();
+
+        //consume the messages, expected order: msg 3, msg 1, msg 2.
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        Message msg = consumer.receive(1500);
+        assertNotNull("expected message was not received", msg);
+        assertEquals(3, msg.getIntProperty("msg"));
+        msg = consumer.receive(1500);
+        assertNotNull("expected message was not received", msg);
+        assertEquals(1, msg.getIntProperty("msg"));
+        msg = consumer.receive(1500);
+        assertNotNull("expected message was not received", msg);
+        assertEquals(2, msg.getIntProperty("msg"));
+    }
+
+    /**
+     * Test that the queue configured to have a DLQ was recovered and has the alternate exchange
+     * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
+     * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
+     *
+     * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
+     * that turned it on for this specific queue.
+     */
+    public void testRecoveryOfQueueWithDLQ() throws Exception
+    {
+        JMXTestUtils jmxUtils = null;
+        try
+        {
+            jmxUtils = new JMXTestUtils(this, "guest", "guest");
+            jmxUtils.open();
+        }
+        catch (Exception e)
+        {
+            fail("Unable to establish JMX connection, test cannot proceed");
+        }
+
+        try
+        {
+            //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ
+            ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE");
+            assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType());
+            TabularDataSupport bindings = (TabularDataSupport) exchange.bindings();
+            assertEquals(1, bindings.size());
+            for(Object o : bindings.values())
+            {
+                CompositeData binding = (CompositeData) o;
+
+                String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY);
+                String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES);
+
+                //Because its a fanout exchange, we just return a single '*' key with all bound queues
+                assertEquals("unexpected binding key", "*", bindingKey);
+                assertEquals("unexpected number of queues bound", 1, queueNames.length);
+                assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]);
+            }
+
+            //verify the queue exists, has the expected alternate exchange and max delivery count
+            ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME);
+            assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange());
+            assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount());
+
+            ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ");
+            assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange());
+            assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount());
+
+            String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ");
+            assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString));
+        }
+        finally
+        {
+            jmxUtils.close();
+        }
+    }
+
     private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception
     {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -388,4 +492,11 @@ public class BDBUpgradeTest extends Qpid
         session.close();
     }
 
+    private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+    {
+        Message send = producerSession.createTextMessage("Message: " + msgId);
+        send.setIntProperty("msg", msgId);
+
+        return send;
+    }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=1415127&r1=1415126&r2=1415127&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java Thu Nov 29 11:26:30 2012
@@ -286,9 +286,7 @@ public class JMXTestUtils
     public ObjectName getQueueObjectName(String virtualHostName, String queue)
     {
         // Get the name of the test manager
-        String query = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
-                       + ObjectName.quote(virtualHostName) + ",name="
-                       + ObjectName.quote(queue) + ",*";
+        String query = getQueueObjectNameString(virtualHostName, queue);
 
         Set<ObjectName> objectNames = queryObjects(query);
 
@@ -301,6 +299,12 @@ public class JMXTestUtils
         return objectName;
     }
 
+	public String getQueueObjectNameString(String virtualHostName, String queue) {
+	    return "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
+                       + ObjectName.quote(virtualHostName) + ",name="
+                       + ObjectName.quote(queue) + ",*";
+    }
+
     /**
      * Generate the ObjectName for the given Exchange on a VirtualHost.
      */



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org