You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/10/30 12:56:59 UTC

svn commit: r1635468 - /qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java

Author: orudyy
Date: Thu Oct 30 11:56:59 2014
New Revision: 1635468

URL: http://svn.apache.org/r1635468
Log:
QPID-6199: [Java Broker] Enhance BDBStoreUpgradeTestPreparer to create sorted queue, custom direct exchange and publish messages into priority queue

Modified:
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1635468&r1=1635467&r2=1635468&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Thu Oct 30 11:56:59 2014
@@ -78,10 +78,14 @@ public class BDBStoreUpgradeTestPreparer
     public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
     public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
     public static final String MISUSED_OWNER = "misused-owner-as-description";
+    private static final String VIRTUAL_HOST_NAME = "test";
+    private static final String SORTED_QUEUE_NAME = "mySortedQueue";
+    private static final String SORT_KEY = "mySortKey";
+    private static final String TEST_EXCHANGE_NAME = "myCustomExchange";
+    private static final String TEST_QUEUE_NAME = "myCustomQueue";
 
     private static AMQConnectionFactory _connFac;
-    private static final String CONN_URL =
-        "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
+    private static final String CONN_URL = "amqp://guest:guest@clientid/" + VIRTUAL_HOST_NAME + "?brokerlist='tcp://localhost:5672'";
 
     /**
      * Create a BDBStoreUpgradeTestPreparer instance
@@ -145,7 +149,7 @@ public class BDBStoreUpgradeTestPreparer
         MessageConsumer messageConsumer = session.createConsumer(queue);
         messageConsumer.close();
 
-        // Create a Message producer
+        // Create a Message priorityQueueProducer
         MessageProducer messageProducer = session.createProducer(queue);
 
         // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
@@ -164,7 +168,18 @@ public class BDBStoreUpgradeTestPreparer
         // Create a priority queue on broker
         final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>();
         priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10);
-        createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
+        Queue priorityQueue = createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
+        MessageProducer priorityQueueProducer = session.createProducer(priorityQueue);
+
+        for (int msg = 0; msg < 5; msg++)
+        {
+            priorityQueueProducer.setPriority(msg % 10);
+            Message message = session.createTextMessage(generateString(256*1024));
+            message.setIntProperty("ID", msg);
+            priorityQueueProducer.send(message);
+        }
+        session.commit();
+        priorityQueueProducer.close();
 
         // Create a queue that has a DLQ
         final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>();
@@ -180,33 +195,99 @@ public class BDBStoreUpgradeTestPreparer
 
         // Create a queue with JMX specifying an owner, so it can later be moved into description
         createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments);
+
+        createExchange(TEST_EXCHANGE_NAME, "direct");
+        Queue customQueue = createAndBindQueueOnBroker(session, TEST_QUEUE_NAME, null, TEST_EXCHANGE_NAME, "direct");
+        MessageProducer customQueueMessageProducer = session.createProducer(customQueue);
+        sendMessages(session, customQueueMessageProducer, customQueue, DeliveryMode.PERSISTENT, 1*1024, 1);
+        session.commit();
+        customQueueMessageProducer.close();
+
+        prepareSortedQueue(session, SORTED_QUEUE_NAME, SORT_KEY);
+
         session.close();
         connection.close();
     }
 
-    private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
+    private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
+    {
+        return createAndBindQueueOnBroker(session, queueName, arguments, "amq.direct", "direct");
+    }
+
+    private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments, String exchangeName, String exchangeType) throws Exception
     {
         ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
-        Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'");
+        Queue queue = session.createQueue("BURL:" + exchangeType + "://" + exchangeName + "/" + queueName + "/" + queueName + "?durable='true'");
         ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+        return queue;
     }
 
     private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments)  throws Exception
     {
-        Map<String, Object> environment = new HashMap<String, Object>();
-         environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"});
-         JMXServiceURL url =  new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
-         JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
-         MBeanServerConnection mbsc =  jmxConnector.getMBeanServerConnection();
-         ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\"");
-
-         Object[] params = new Object[] {queueName, owner, true, arguments};
-         String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
-         mbsc.invoke(virtualHost, "createNewQueue", params, signature);
+        JMXConnector jmxConnector = createJMXConnector();
+        try
+        {
+            MBeanServerConnection mbsc =  jmxConnector.getMBeanServerConnection();
+            ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\"");
+
+            Object[] params = new Object[] {queueName, owner, true, arguments};
+            String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
+            mbsc.invoke(virtualHost, "createNewQueue", params, signature);
 
-         ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct");
-         mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
+            ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\",name=\"amq.direct\",ExchangeType=direct");
+            mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
+        }
+        finally
+        {
+            jmxConnector.close();
+        }
     }
+
+    private void createExchange(String exchangeName, String exchangeType) throws Exception
+    {
+        JMXConnector jmxConnector = createJMXConnector();
+        try
+        {
+            MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+            ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\"");
+
+            Object[] params = new Object[]{exchangeName, exchangeType, true};
+            String[] signature = new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()};
+            mbsc.invoke(virtualHost, "createNewExchange", params, signature);
+        }
+        finally
+        {
+            jmxConnector.close();
+        }
+    }
+
+    private JMXConnector createJMXConnector() throws Exception
+    {
+        Map<String, Object> environment = new HashMap<>();
+        environment.put(JMXConnector.CREDENTIALS, new String[] {"admin", "admin"});
+        JMXServiceURL url =  new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
+        return JMXConnectorFactory.connect(url, environment);
+    }
+
+    private void prepareSortedQueue(Session session, String queueName, String sortKey) throws Exception
+    {
+        final Map<String, Object> arguments = new HashMap<String, Object>();
+        arguments.put("qpid.queue_sort_key", sortKey);
+        Queue sortedQueue = createAndBindQueueOnBroker(session, queueName, arguments);
+
+        MessageProducer messageProducer2 = session.createProducer(sortedQueue);
+
+        String[] sortKeys = {"c", "b", "e", "a", "d"};
+        for (int i = 1; i <= sortKeys.length; i++)
+        {
+            Message message = session.createTextMessage(generateString(256*1024));
+            message.setIntProperty("ID", i);
+            message.setStringProperty(sortKey, sortKeys[i - 1]);
+            messageProducer2.send(message);
+        }
+        session.commit();
+    }
+
     /**
      * Prepare a DurableSubscription backing queue for use in testing selector
      * recovery and queue exclusivity marking during the upgrade process.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org