You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/08 15:44:30 UTC
svn commit: r1133399 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Author: tabish
Date: Wed Jun 8 13:44:30 2011
New Revision: 1133399
URL: http://svn.apache.org/viewvc?rev=1133399&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3337
Updated unit tests
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1133399&r1=1133398&r2=1133399&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Wed Jun 8 13:44:30 2011
@@ -87,253 +87,252 @@ public class MBeanTest extends EmbeddedB
TestRunner.run(MBeanTest.class);
}
-// public void testConnectors() throws Exception{
-// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
-// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-// assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
-//
-// }
-//
-// public void testMBeans() throws Exception {
-// connection = connectionFactory.createConnection();
-// useConnection(connection);
-//
-// // test all the various MBeans now we have a producer, consumer and
-// // messages on a queue
-// assertSendViaMBean();
-// assertQueueBrowseWorks();
-// assertCreateAndDestroyDurableSubscriptions();
-// assertConsumerCounts();
-// assertProducerCounts();
-// }
-//
-// public void testMoveMessages() throws Exception {
-// connection = connectionFactory.createConnection();
-// useConnection(connection);
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-//
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// CompositeData[] compdatalist = queue.browse();
-// int initialQueueSize = compdatalist.length;
-// if (initialQueueSize == 0) {
-// fail("There is no message in the queue:");
-// }
-// else {
-// echo("Current queue size: " + initialQueueSize);
-// }
-// int messageCount = initialQueueSize;
-// String[] messageIDs = new String[messageCount];
-// for (int i = 0; i < messageCount; i++) {
-// CompositeData cdata = compdatalist[i];
-// String messageID = (String) cdata.get("JMSMessageID");
-// assertNotNull("Should have a message ID for message " + i, messageID);
-// messageIDs[i] = messageID;
-// }
-//
-// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-//
-// echo("About to move " + messageCount + " messages");
-//
-// String newDestination = getSecondDestinationString();
-// for (String messageID : messageIDs) {
-// echo("Moving message: " + messageID);
-// queue.moveMessageTo(messageID, newDestination);
-// }
-//
-// echo("Now browsing the queue");
-// compdatalist = queue.browse();
-// int actualCount = compdatalist.length;
-// echo("Current queue size: " + actualCount);
-// assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
-//
-// echo("Now browsing the second queue");
-//
-// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-// QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// long newQueuesize = queueNew.getQueueSize();
-// echo("Second queue size: " + newQueuesize);
-// assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
-//
-// // check memory usage migration
-// assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
-// assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
-// assertTrue("use cache", queueNew.isUseCache());
-// assertTrue("cache enabled", queueNew.isCacheEnabled());
-// }
-//
-// public void testRemoveMessages() throws Exception {
-// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
-// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-// broker.addQueue(getDestinationString());
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-//
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-// String msg1 = queue.sendTextMessage("message 1");
-// String msg2 = queue.sendTextMessage("message 2");
-//
-// assertTrue(queue.removeMessage(msg2));
-//
-// connection = connectionFactory.createConnection();
-// connection.start();
-// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-// ActiveMQDestination dest = createDestination();
-//
-// MessageConsumer consumer = session.createConsumer(dest);
-// Message message = consumer.receive(1000);
-// assertNotNull(message);
-// assertEquals(msg1, message.getJMSMessageID());
-//
-// String msg3 = queue.sendTextMessage("message 3");
-// message = consumer.receive(1000);
-// assertNotNull(message);
-// assertEquals(msg3, message.getJMSMessageID());
-//
-// message = consumer.receive(1000);
-// assertNull(message);
-//
-// }
-//
-// public void testRetryMessages() throws Exception {
-// // lets speed up redelivery
-// ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
-// factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
-// factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
-// factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
-// factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
-// factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
-// factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
-//
-// connection = connectionFactory.createConnection();
-// useConnection(connection);
-//
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// long initialQueueSize = queue.getQueueSize();
-// echo("current queue size: " + initialQueueSize);
-// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-//
-// // lets create a duff consumer which keeps rolling back...
-// Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-// MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
-// Message message = consumer.receive(5000);
-// while (message != null) {
-// echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
-// session.rollback();
-// message = consumer.receive(2000);
-// }
-// consumer.close();
-// session.close();
-//
-//
-// // now lets get the dead letter queue
-// Thread.sleep(1000);
-//
-// ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
-// QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
-//
-// long initialDlqSize = dlq.getQueueSize();
-// CompositeData[] compdatalist = dlq.browse();
-// int dlqQueueSize = compdatalist.length;
-// if (dlqQueueSize == 0) {
-// fail("There are no messages in the queue:");
-// }
-// else {
-// echo("Current DLQ queue size: " + dlqQueueSize);
-// }
-// int messageCount = dlqQueueSize;
-// String[] messageIDs = new String[messageCount];
-// for (int i = 0; i < messageCount; i++) {
-// CompositeData cdata = compdatalist[i];
-// String messageID = (String) cdata.get("JMSMessageID");
-// assertNotNull("Should have a message ID for message " + i, messageID);
-// messageIDs[i] = messageID;
-// }
-//
-// int dlqMemUsage = dlq.getMemoryPercentUsage();
-// assertTrue("dlq has some memory usage", dlqMemUsage > 0);
-// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-//
-//
-// echo("About to retry " + messageCount + " messages");
-//
-// for (String messageID : messageIDs) {
-// echo("Retrying message: " + messageID);
-// dlq.retryMessage(messageID);
-// }
-//
-// long queueSize = queue.getQueueSize();
-// compdatalist = queue.browse();
-// int actualCount = compdatalist.length;
-// echo("Orginal queue size is now " + queueSize);
-// echo("Original browse queue size: " + actualCount);
-//
-// long dlqSize = dlq.getQueueSize();
-// echo("DLQ size: " + dlqSize);
-//
-// assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
-// assertEquals("queue size", initialQueueSize, queueSize);
-// assertEquals("browse queue size", initialQueueSize, actualCount);
-//
-// assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
-// }
-//
-// public void testMoveMessagesBySelector() throws Exception {
-// connection = connectionFactory.createConnection();
-// useConnection(connection);
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-//
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// String newDestination = getSecondDestinationString();
-// queue.moveMatchingMessagesTo("counter > 2", newDestination);
-//
-// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-//
-// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-// int movedSize = MESSAGE_COUNT-3;
-// assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
-//
-// // now lets remove them by selector
-// queue.removeMatchingMessages("counter > 2");
-//
-// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
-// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-// }
-//
-// public void testCopyMessagesBySelector() throws Exception {
-// connection = connectionFactory.createConnection();
-// useConnection(connection);
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-//
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// String newDestination = getSecondDestinationString();
-// long queueSize = queue.getQueueSize();
-// queue.copyMatchingMessagesTo("counter > 2", newDestination);
-//
-//
-//
-// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-//
-// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
-// assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
-// // now lets remove them by selector
-// queue.removeMatchingMessages("counter > 2");
-//
-// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
-// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-// }
+ public void testConnectors() throws Exception{
+ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+ BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+ assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
+
+ }
+
+ public void testMBeans() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ // test all the various MBeans now we have a producer, consumer and
+ // messages on a queue
+ assertSendViaMBean();
+ assertQueueBrowseWorks();
+ assertCreateAndDestroyDurableSubscriptions();
+ assertConsumerCounts();
+ assertProducerCounts();
+ }
+
+ public void testMoveMessages() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ CompositeData[] compdatalist = queue.browse();
+ int initialQueueSize = compdatalist.length;
+ if (initialQueueSize == 0) {
+ fail("There is no message in the queue:");
+ }
+ else {
+ echo("Current queue size: " + initialQueueSize);
+ }
+ int messageCount = initialQueueSize;
+ String[] messageIDs = new String[messageCount];
+ for (int i = 0; i < messageCount; i++) {
+ CompositeData cdata = compdatalist[i];
+ String messageID = (String) cdata.get("JMSMessageID");
+ assertNotNull("Should have a message ID for message " + i, messageID);
+ messageIDs[i] = messageID;
+ }
+
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+
+ echo("About to move " + messageCount + " messages");
+
+ String newDestination = getSecondDestinationString();
+ for (String messageID : messageIDs) {
+ echo("Moving message: " + messageID);
+ queue.moveMessageTo(messageID, newDestination);
+ }
+
+ echo("Now browsing the queue");
+ compdatalist = queue.browse();
+ int actualCount = compdatalist.length;
+ echo("Current queue size: " + actualCount);
+ assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
+
+ echo("Now browsing the second queue");
+
+ queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+ QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ long newQueuesize = queueNew.getQueueSize();
+ echo("Second queue size: " + newQueuesize);
+ assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
+
+ // check memory usage migration
+ assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
+ assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ assertTrue("use cache", queueNew.isUseCache());
+ assertTrue("cache enabled", queueNew.isCacheEnabled());
+ }
+
+ public void testRemoveMessages() throws Exception {
+ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+ BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+ broker.addQueue(getDestinationString());
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ String msg1 = queue.sendTextMessage("message 1");
+ String msg2 = queue.sendTextMessage("message 2");
+
+ assertTrue(queue.removeMessage(msg2));
+
+ connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination dest = createDestination();
+
+ MessageConsumer consumer = session.createConsumer(dest);
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(msg1, message.getJMSMessageID());
+
+ String msg3 = queue.sendTextMessage("message 3");
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(msg3, message.getJMSMessageID());
+
+ message = consumer.receive(1000);
+ assertNull(message);
+
+ }
+
+ public void testRetryMessages() throws Exception {
+ // lets speed up redelivery
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
+ factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
+ factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
+ factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
+ factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
+ factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
+ factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
+
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ long initialQueueSize = queue.getQueueSize();
+ echo("current queue size: " + initialQueueSize);
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+
+ // lets create a duff consumer which keeps rolling back...
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
+ Message message = consumer.receive(5000);
+ while (message != null) {
+ echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
+ session.rollback();
+ message = consumer.receive(2000);
+ }
+ consumer.close();
+ session.close();
+
+
+ // now lets get the dead letter queue
+ Thread.sleep(1000);
+
+ ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
+ QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
+
+ long initialDlqSize = dlq.getQueueSize();
+ CompositeData[] compdatalist = dlq.browse();
+ int dlqQueueSize = compdatalist.length;
+ if (dlqQueueSize == 0) {
+ fail("There are no messages in the queue:");
+ }
+ else {
+ echo("Current DLQ queue size: " + dlqQueueSize);
+ }
+ int messageCount = dlqQueueSize;
+ String[] messageIDs = new String[messageCount];
+ for (int i = 0; i < messageCount; i++) {
+ CompositeData cdata = compdatalist[i];
+ String messageID = (String) cdata.get("JMSMessageID");
+ assertNotNull("Should have a message ID for message " + i, messageID);
+ messageIDs[i] = messageID;
+ }
+
+ int dlqMemUsage = dlq.getMemoryPercentUsage();
+ assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+
+
+ echo("About to retry " + messageCount + " messages");
+
+ for (String messageID : messageIDs) {
+ echo("Retrying message: " + messageID);
+ dlq.retryMessage(messageID);
+ }
+
+ long queueSize = queue.getQueueSize();
+ compdatalist = queue.browse();
+ int actualCount = compdatalist.length;
+ echo("Orginal queue size is now " + queueSize);
+ echo("Original browse queue size: " + actualCount);
+
+ long dlqSize = dlq.getQueueSize();
+ echo("DLQ size: " + dlqSize);
+
+ assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
+ assertEquals("queue size", initialQueueSize, queueSize);
+ assertEquals("browse queue size", initialQueueSize, actualCount);
+
+ assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
+ }
+
+ public void testMoveMessagesBySelector() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ String newDestination = getSecondDestinationString();
+ queue.moveMatchingMessagesTo("counter > 2", newDestination);
+
+ queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+ queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ int movedSize = MESSAGE_COUNT-3;
+ assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+
+ // now lets remove them by selector
+ queue.removeMatchingMessages("counter > 2");
+
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ }
+
+ public void testCopyMessagesBySelector() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ String newDestination = getSecondDestinationString();
+ long queueSize = queue.getQueueSize();
+ queue.copyMatchingMessagesTo("counter > 2", newDestination);
+
+
+
+ queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+ queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
+ assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
+ // now lets remove them by selector
+ queue.removeMatchingMessages("counter > 2");
+
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ }
protected void assertSendViaMBean() throws Exception {
String queueName = getDestinationString() + ".SendMBBean";
@@ -793,103 +792,103 @@ public class MBeanTest extends EmbeddedB
assertEquals(0, broker.getDynamicDestinationProducers().length);
}
-// public void testTempQueueJMXDelete() throws Exception {
-// connection = connectionFactory.createConnection();
-//
-// connection.setClientID(clientID);
-// connection.start();
-// Session session = connection.createSession(transacted, authMode);
-// ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
-// Thread.sleep(1000);
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
-//
-// // should not throw an exception
-// mbeanServer.getObjectInstance(queueViewMBeanName);
-//
-// tQueue.delete();
-// Thread.sleep(1000);
-// try {
-// // should throw an exception
-// mbeanServer.getObjectInstance(queueViewMBeanName);
-//
-// fail("should be deleted already!");
-// } catch (Exception e) {
-// // expected!
-// }
-//
-// }
-//
-// // Test for AMQ-3029
-// public void testBrowseBlobMessages() throws Exception {
-// connection = connectionFactory.createConnection();
-// useConnectionWithBlobMessage(connection);
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-//
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// CompositeData[] compdatalist = queue.browse();
-// int initialQueueSize = compdatalist.length;
-// if (initialQueueSize == 0) {
-// fail("There is no message in the queue:");
-// }
-// else {
-// echo("Current queue size: " + initialQueueSize);
-// }
-// int messageCount = initialQueueSize;
-// String[] messageIDs = new String[messageCount];
-// for (int i = 0; i < messageCount; i++) {
-// CompositeData cdata = compdatalist[i];
-// String messageID = (String) cdata.get("JMSMessageID");
-// assertNotNull("Should have a message ID for message " + i, messageID);
-//
-// messageIDs[i] = messageID;
-// }
-//
-// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-// }
-//
-// public void testBrowseBytesMessages() throws Exception {
-// connection = connectionFactory.createConnection();
-// useConnectionWithByteMessage(connection);
-//
-// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-//
-// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-//
-// CompositeData[] compdatalist = queue.browse();
-// int initialQueueSize = compdatalist.length;
-// if (initialQueueSize == 0) {
-// fail("There is no message in the queue:");
-// }
-// else {
-// echo("Current queue size: " + initialQueueSize);
-// }
-// int messageCount = initialQueueSize;
-// String[] messageIDs = new String[messageCount];
-// for (int i = 0; i < messageCount; i++) {
-// CompositeData cdata = compdatalist[i];
-// String messageID = (String) cdata.get("JMSMessageID");
-// assertNotNull("Should have a message ID for message " + i, messageID);
-// messageIDs[i] = messageID;
-//
-// Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
-// assertNotNull("should be a preview", preview);
-// assertTrue("not empty", preview.length > 0);
-// }
-//
-// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-//
-// // consume all the messages
-// echo("Attempting to consume all bytes messages from: " + destination);
-// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-// MessageConsumer consumer = session.createConsumer(destination);
-// for (int i=0; i<MESSAGE_COUNT; i++) {
-// Message message = consumer.receive(5000);
-// assertNotNull(message);
-// assertTrue(message instanceof BytesMessage);
-// }
-// consumer.close();
-// session.close();
-// }
+ public void testTempQueueJMXDelete() throws Exception {
+ connection = connectionFactory.createConnection();
+
+ connection.setClientID(clientID);
+ connection.start();
+ Session session = connection.createSession(transacted, authMode);
+ ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
+ Thread.sleep(1000);
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
+
+ // should not throw an exception
+ mbeanServer.getObjectInstance(queueViewMBeanName);
+
+ tQueue.delete();
+ Thread.sleep(1000);
+ try {
+ // should throw an exception
+ mbeanServer.getObjectInstance(queueViewMBeanName);
+
+ fail("should be deleted already!");
+ } catch (Exception e) {
+ // expected!
+ }
+
+ }
+
+ // Test for AMQ-3029
+ public void testBrowseBlobMessages() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnectionWithBlobMessage(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ CompositeData[] compdatalist = queue.browse();
+ int initialQueueSize = compdatalist.length;
+ if (initialQueueSize == 0) {
+ fail("There is no message in the queue:");
+ }
+ else {
+ echo("Current queue size: " + initialQueueSize);
+ }
+ int messageCount = initialQueueSize;
+ String[] messageIDs = new String[messageCount];
+ for (int i = 0; i < messageCount; i++) {
+ CompositeData cdata = compdatalist[i];
+ String messageID = (String) cdata.get("JMSMessageID");
+ assertNotNull("Should have a message ID for message " + i, messageID);
+
+ messageIDs[i] = messageID;
+ }
+
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+ }
+
+ public void testBrowseBytesMessages() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnectionWithByteMessage(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ CompositeData[] compdatalist = queue.browse();
+ int initialQueueSize = compdatalist.length;
+ if (initialQueueSize == 0) {
+ fail("There is no message in the queue:");
+ }
+ else {
+ echo("Current queue size: " + initialQueueSize);
+ }
+ int messageCount = initialQueueSize;
+ String[] messageIDs = new String[messageCount];
+ for (int i = 0; i < messageCount; i++) {
+ CompositeData cdata = compdatalist[i];
+ String messageID = (String) cdata.get("JMSMessageID");
+ assertNotNull("Should have a message ID for message " + i, messageID);
+ messageIDs[i] = messageID;
+
+ Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
+ assertNotNull("should be a preview", preview);
+ assertTrue("not empty", preview.length > 0);
+ }
+
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+
+ // consume all the messages
+ echo("Attempting to consume all bytes messages from: " + destination);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i=0; i<MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof BytesMessage);
+ }
+ consumer.close();
+ session.close();
+ }
}