You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/10/30 12:56:59 UTC
svn commit: r1635468 -
/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
Author: orudyy
Date: Thu Oct 30 11:56:59 2014
New Revision: 1635468
URL: http://svn.apache.org/r1635468
Log:
QPID-6199: [Java Broker] Enhance BDBStoreUpgradeTestPreparer to create sorted queue, custom direct exchange and publish messages into priority queue
Modified:
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1635468&r1=1635467&r2=1635468&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Thu Oct 30 11:56:59 2014
@@ -78,10 +78,14 @@ public class BDBStoreUpgradeTestPreparer
public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
public static final String MISUSED_OWNER = "misused-owner-as-description";
+ private static final String VIRTUAL_HOST_NAME = "test";
+ private static final String SORTED_QUEUE_NAME = "mySortedQueue";
+ private static final String SORT_KEY = "mySortKey";
+ private static final String TEST_EXCHANGE_NAME = "myCustomExchange";
+ private static final String TEST_QUEUE_NAME = "myCustomQueue";
private static AMQConnectionFactory _connFac;
- private static final String CONN_URL =
- "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
+ private static final String CONN_URL = "amqp://guest:guest@clientid/" + VIRTUAL_HOST_NAME + "?brokerlist='tcp://localhost:5672'";
/**
* Create a BDBStoreUpgradeTestPreparer instance
@@ -145,7 +149,7 @@ public class BDBStoreUpgradeTestPreparer
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.close();
- // Create a Message producer
+ // Create a Message priorityQueueProducer
MessageProducer messageProducer = session.createProducer(queue);
// Publish 5 persistent messages, 256k chars to ensure they are multi-frame
@@ -164,7 +168,18 @@ public class BDBStoreUpgradeTestPreparer
// Create a priority queue on broker
final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>();
priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10);
- createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
+ Queue priorityQueue = createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
+ MessageProducer priorityQueueProducer = session.createProducer(priorityQueue);
+
+ for (int msg = 0; msg < 5; msg++)
+ {
+ priorityQueueProducer.setPriority(msg % 10);
+ Message message = session.createTextMessage(generateString(256*1024));
+ message.setIntProperty("ID", msg);
+ priorityQueueProducer.send(message);
+ }
+ session.commit();
+ priorityQueueProducer.close();
// Create a queue that has a DLQ
final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>();
@@ -180,33 +195,99 @@ public class BDBStoreUpgradeTestPreparer
// Create a queue with JMX specifying an owner, so it can later be moved into description
createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments);
+
+ createExchange(TEST_EXCHANGE_NAME, "direct");
+ Queue customQueue = createAndBindQueueOnBroker(session, TEST_QUEUE_NAME, null, TEST_EXCHANGE_NAME, "direct");
+ MessageProducer customQueueMessageProducer = session.createProducer(customQueue);
+ sendMessages(session, customQueueMessageProducer, customQueue, DeliveryMode.PERSISTENT, 1*1024, 1);
+ session.commit();
+ customQueueMessageProducer.close();
+
+ prepareSortedQueue(session, SORTED_QUEUE_NAME, SORT_KEY);
+
session.close();
connection.close();
}
- private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
+ private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
+ {
+ return createAndBindQueueOnBroker(session, queueName, arguments, "amq.direct", "direct");
+ }
+
+ private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments, String exchangeName, String exchangeType) throws Exception
{
((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
- Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'");
+ Queue queue = session.createQueue("BURL:" + exchangeType + "://" + exchangeName + "/" + queueName + "/" + queueName + "?durable='true'");
((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ return queue;
}
private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception
{
- Map<String, Object> environment = new HashMap<String, Object>();
- environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"});
- JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
- JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
- MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
- ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\"");
-
- Object[] params = new Object[] {queueName, owner, true, arguments};
- String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
- mbsc.invoke(virtualHost, "createNewQueue", params, signature);
+ JMXConnector jmxConnector = createJMXConnector();
+ try
+ {
+ MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+ ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\"");
+
+ Object[] params = new Object[] {queueName, owner, true, arguments};
+ String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
+ mbsc.invoke(virtualHost, "createNewQueue", params, signature);
- ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct");
- mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
+ ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\",name=\"amq.direct\",ExchangeType=direct");
+ mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
+ }
+ finally
+ {
+ jmxConnector.close();
+ }
}
+
+ private void createExchange(String exchangeName, String exchangeType) throws Exception
+ {
+ JMXConnector jmxConnector = createJMXConnector();
+ try
+ {
+ MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+ ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\"");
+
+ Object[] params = new Object[]{exchangeName, exchangeType, true};
+ String[] signature = new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()};
+ mbsc.invoke(virtualHost, "createNewExchange", params, signature);
+ }
+ finally
+ {
+ jmxConnector.close();
+ }
+ }
+
+ private JMXConnector createJMXConnector() throws Exception
+ {
+ Map<String, Object> environment = new HashMap<>();
+ environment.put(JMXConnector.CREDENTIALS, new String[] {"admin", "admin"});
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
+ return JMXConnectorFactory.connect(url, environment);
+ }
+
+ private void prepareSortedQueue(Session session, String queueName, String sortKey) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.queue_sort_key", sortKey);
+ Queue sortedQueue = createAndBindQueueOnBroker(session, queueName, arguments);
+
+ MessageProducer messageProducer2 = session.createProducer(sortedQueue);
+
+ String[] sortKeys = {"c", "b", "e", "a", "d"};
+ for (int i = 1; i <= sortKeys.length; i++)
+ {
+ Message message = session.createTextMessage(generateString(256*1024));
+ message.setIntProperty("ID", i);
+ message.setStringProperty(sortKey, sortKeys[i - 1]);
+ messageProducer2.send(message);
+ }
+ session.commit();
+ }
+
/**
* Prepare a DurableSubscription backing queue for use in testing selector
* recovery and queue exclusivity marking during the upgrade process.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org