You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:31:10 UTC
[28/69] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
deleted file mode 100644
index 98afe30..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ /dev/null
@@ -1,1505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.jmx;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-
-import junit.textui.TestRunner;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.BlobMessage;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.BaseDestination;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.util.JMXSupport;
-import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A test case of the various MBeans in ActiveMQ. If you want to look at the
- * various MBeans after the test has been run then run this test case as a
- * command line application.
- */
-public class MBeanTest extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class);
-
- private static boolean waitForKeyPress;
-
- protected MBeanServer mbeanServer;
- protected String domain = "org.apache.activemq";
- protected String clientID = "foo";
-
- protected Connection connection;
- protected boolean transacted;
- protected int authMode = Session.AUTO_ACKNOWLEDGE;
- protected static final int MESSAGE_COUNT = 2 * BaseDestination.MAX_PAGE_SIZE;
- final static String QUEUE_WITH_OPTIONS = "QueueWithOptions";
-
- /**
- * When you run this test case from the command line it will pause before
- * terminating so that you can look at the MBeans state for debugging
- * purposes.
- */
- public static void main(String[] args) {
- waitForKeyPress = true;
- TestRunner.run(MBeanTest.class);
- }
-
- public void testConnectors() throws Exception {
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getTransportConnectorByType("tcp")).getPort(), new URI(this.broker.getTransportConnectors().get(0).getPublishableConnectString()).getPort());
- }
-
- public void testMBeans() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- // test all the various MBeans now we have a producer, consumer and
- // messages on a queue
- assertSendViaMBean();
- assertSendCsnvViaMBean();
- assertQueueBrowseWorks();
- assertCreateAndDestroyDurableSubscriptions();
- assertConsumerCounts();
- assertProducerCounts();
- }
-
- public void testMoveMessages() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- if (initialQueueSize == 0) {
- fail("There is no message in the queue:");
- }
- else {
- echo("Current queue size: " + initialQueueSize);
- }
- int messageCount = initialQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
- messageIDs[i] = messageID;
- }
-
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
- echo("About to move " + messageCount + " messages");
-
- String newDestination = getSecondDestinationString();
- for (String messageID : messageIDs) {
- //echo("Moving message: " + messageID);
- queue.moveMessageTo(messageID, newDestination);
- }
-
- echo("Now browsing the queue");
- compdatalist = queue.browse();
- int actualCount = compdatalist.length;
- echo("Current queue size: " + actualCount);
- assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
-
- echo("Now browsing the second queue");
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
- QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- long newQueuesize = queueNew.getQueueSize();
- echo("Second queue size: " + newQueuesize);
- assertEquals("Unexpected number of messages ", messageCount, newQueuesize);
-
- // check memory usage migration
- assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
- assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
- assertTrue("use cache", queueNew.isUseCache());
- assertTrue("cache enabled", queueNew.isCacheEnabled());
- assertEquals("no forwards", 0, queueNew.getForwardCount());
- }
-
- public void testRemoveMessages() throws Exception {
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- broker.addQueue(getDestinationString());
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- String msg1 = queue.sendTextMessage("message 1");
- String msg2 = queue.sendTextMessage("message 2");
-
- assertTrue(queue.removeMessage(msg2));
-
- connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- ActiveMQDestination dest = createDestination();
-
- MessageConsumer consumer = session.createConsumer(dest);
- Message message = consumer.receive(1000);
- assertNotNull(message);
- assertEquals(msg1, message.getJMSMessageID());
-
- String msg3 = queue.sendTextMessage("message 3");
- message = consumer.receive(1000);
- assertNotNull(message);
- assertEquals(msg3, message.getJMSMessageID());
-
- message = consumer.receive(1000);
- assertNull(message);
-
- }
-
- public void testRemoveQueue() throws Exception {
- String queueName = "TEST";
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- broker.addQueue(queueName);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName);
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- queue.sendTextMessage("message 1");
- queue.sendTextMessage("message 2");
-
- assertEquals(2, broker.getTotalMessageCount());
-
- broker.removeQueue(queueName);
-
- assertEquals(0, broker.getTotalMessageCount());
-
- }
-
- public void testRetryMessages() throws Exception {
- // lets speed up redelivery
- ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
- factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
- factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
- factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
- factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
- factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
- factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
-
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- long initialQueueSize = queue.getQueueSize();
- echo("current queue size: " + initialQueueSize);
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
- // lets create a duff consumer which keeps rolling back...
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
- Message message = consumer.receive(5000);
- while (message != null) {
- echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
- session.rollback();
- message = consumer.receive(2000);
- }
- consumer.close();
- session.close();
-
- // now lets get the dead letter queue
- Thread.sleep(1000);
-
- ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME);
- QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
-
- long initialDlqSize = dlq.getQueueSize();
- CompositeData[] compdatalist = dlq.browse();
- int dlqQueueSize = compdatalist.length;
- if (dlqQueueSize == 0) {
- fail("There are no messages in the queue:");
- }
- else {
- echo("Current DLQ queue size: " + dlqQueueSize);
- }
- int messageCount = dlqQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
- messageIDs[i] = messageID;
- }
-
- int dlqMemUsage = dlq.getMemoryPercentUsage();
- assertTrue("dlq has some memory usage", dlqMemUsage > 0);
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-
- echo("About to retry " + messageCount + " messages");
-
- for (String messageID : messageIDs) {
- echo("Retrying message: " + messageID);
- dlq.retryMessage(messageID);
- }
-
- long queueSize = queue.getQueueSize();
- compdatalist = queue.browse();
- int actualCount = compdatalist.length;
- echo("Original queue size is now " + queueSize);
- echo("Original browse queue size: " + actualCount);
-
- long dlqSize = dlq.getQueueSize();
- echo("DLQ size: " + dlqSize);
-
- assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
- assertEquals("queue size", initialQueueSize, queueSize);
- assertEquals("browse queue size", initialQueueSize, actualCount);
-
- assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
- }
-
- public void testMoveMessagesBySelector() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String newDestination = getSecondDestinationString();
- queue.moveMatchingMessagesTo("counter > 2", newDestination);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
-
- queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- int movedSize = MESSAGE_COUNT - 3;
- assertEquals("Unexpected number of messages ", movedSize, queue.getQueueSize());
-
- // now lets remove them by selector
- queue.removeMatchingMessages("counter > 2");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
-
- public void testCopyMessagesBySelector() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String newDestination = getSecondDestinationString();
- long queueSize = queue.getQueueSize();
- assertTrue(queueSize > 0);
- queue.copyMatchingMessagesTo("counter > 2", newDestination);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
-
- queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
- assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT - 3, queue.getQueueSize());
- // now lets remove them by selector
- queue.removeMatchingMessages("counter > 2");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
-
- public void testCreateDestinationWithSpacesAtEnds() throws Exception {
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertTrue("broker is not a slave", !broker.isSlave());
- // create 2 topics
- broker.addTopic(getDestinationString() + "1 ");
- broker.addTopic(" " + getDestinationString() + "2");
- broker.addTopic(" " + getDestinationString() + "3 ");
-
- assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1 ");
- assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName= " + getDestinationString() + "2");
- assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName= " + getDestinationString() + "3 ");
-
- ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1");
- ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2");
- ObjectName topicObjName3 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "3");
-
- TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
- TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
- TopicViewMBean topic3 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName3, TopicViewMBean.class, true);
-
- assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
- assertEquals("topic3 Durable subscriber count", 0, topic3.getConsumerCount());
-
- String topicName = getDestinationString();
- String selector = null;
-
- // create 1 subscriber for each topic
- broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
- broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);
- broker.createDurableSubscriber(clientID, "topic3.subscriber1", topicName + "3", selector);
-
- assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
- assertEquals("topic3 Durable subscriber count", 1, topic3.getConsumerCount());
- }
-
- protected void assertSendViaMBean() throws Exception {
- String queueName = getDestinationString() + ".SendMBBean";
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- echo("Create QueueView MBean...");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- broker.addQueue(queueName);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName);
-
- echo("Create QueueView MBean...");
- QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- proxy.purge();
-
- int count = 5;
- for (int i = 0; i < count; i++) {
- String body = "message:" + i;
-
- Map<String, Object> headers = new HashMap<>();
- headers.put("JMSCorrelationID", "MyCorrId");
- headers.put("JMSDeliveryMode", Boolean.FALSE);
- headers.put("JMSXGroupID", "MyGroupID");
- headers.put("JMSXGroupSeq", 1234);
- headers.put("JMSPriority", i + 1);
- headers.put("JMSType", "MyType");
- headers.put("MyHeader", i);
- headers.put("MyStringHeader", "StringHeader" + i);
-
- proxy.sendTextMessage(headers, body);
- }
-
- browseAndVerify(proxy);
- }
-
- private void browseAndVerify(QueueViewMBean proxy) throws Exception {
- browseAndVerifyTypes(proxy, false);
- }
-
- @SuppressWarnings("rawtypes")
- private void browseAndVerifyTypes(QueueViewMBean proxy, boolean allStrings) throws Exception {
- CompositeData[] compdatalist = proxy.browse();
- if (compdatalist.length == 0) {
- fail("There is no message in the queue:");
- }
-
- for (int i = 0; i < compdatalist.length; i++) {
- CompositeData cdata = compdatalist[i];
-
- if (i == 0) {
- echo("Columns: " + cdata.getCompositeType().keySet());
- }
-
- assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId");
- assertComplexData(i, cdata, "JMSPriority", i + 1);
- assertComplexData(i, cdata, "JMSType", "MyType");
- assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId");
- assertComplexData(i, cdata, "JMSDeliveryMode", "NON-PERSISTENT");
- String expected = "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}";
- // The order of the properties is different when using the ibm jdk.
- if (System.getProperty("java.vendor").equals("IBM Corporation")) {
- expected = "{MyHeader=" + i + ", MyStringHeader=StringHeader" + i + "}";
- }
- assertComplexData(i, cdata, "PropertiesText", expected);
-
- if (allStrings) {
- Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES);
- assertEquals("stringProperties size()", 2, stringProperties.size());
- assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader"));
- assertEquals("stringProperties.MyHeader", "" + i, stringProperties.get("MyHeader"));
-
- }
- else {
- Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
- assertEquals("intProperties size()", 1, intProperties.size());
- assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader"));
-
- Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES);
- assertEquals("stringProperties size()", 1, stringProperties.size());
- assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader"));
- }
-
- Map properties = CompositeDataHelper.getMessageUserProperties(cdata);
- assertEquals("properties size()", 2, properties.size());
- assertEquals("properties.MyHeader", allStrings ? "" + i : i, properties.get("MyHeader"));
- assertEquals("properties.MyHeader", "StringHeader" + i, properties.get("MyStringHeader"));
-
- assertComplexData(i, cdata, "JMSXGroupSeq", 1234);
- assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID");
- assertComplexData(i, cdata, "Text", "message:" + i);
- }
- }
-
- protected void assertSendCsnvViaMBean() throws Exception {
- String queueName = getDestinationString() + ".SendMBBean";
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- echo("Create QueueView MBean...");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- broker.addQueue(queueName);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName);
-
- echo("Create QueueView MBean...");
- QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- proxy.purge();
-
- int count = 5;
- for (int i = 0; i < count; i++) {
- String props = "body=message:" + i;
-
- props += ",JMSCorrelationID=MyCorrId";
- props += ",JMSDeliveryMode=1";
- props += ",JMSXGroupID=MyGroupID";
- props += ",JMSXGroupSeq=1234";
- props += ",JMSPriority=" + (i + 1);
- props += ",JMSType=MyType";
- props += ",MyHeader=" + i;
- props += ",MyStringHeader=StringHeader" + i;
-
- proxy.sendTextMessageWithProperties(props);
- }
-
- browseAndVerifyTypes(proxy, true);
- }
-
- protected void assertComplexData(int messageIndex, CompositeData cdata, String name, Object expected) {
- Object value = cdata.get(name);
- assertEquals("Message " + messageIndex + " CData field: " + name, expected, value);
- }
-
- protected void assertQueueBrowseWorks() throws Exception {
- Integer mbeancnt = mbeanServer.getMBeanCount();
- echo("Mbean count :" + mbeancnt);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- echo("Create QueueView MBean...");
- QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- long concount = proxy.getConsumerCount();
- echo("Consumer Count :" + concount);
- long messcount = proxy.getQueueSize();
- echo("current number of messages in the queue :" + messcount);
-
- // lets browse
- CompositeData[] compdatalist = proxy.browse();
- if (compdatalist.length == 0) {
- fail("There is no message in the queue:");
- }
- String[] messageIDs = new String[compdatalist.length];
-
- for (int i = 0; i < compdatalist.length; i++) {
- CompositeData cdata = compdatalist[i];
-
- if (i == 0) {
- echo("Columns: " + cdata.getCompositeType().keySet());
- }
- messageIDs[i] = (String) cdata.get("JMSMessageID");
- echo("message " + i + " : " + cdata.values());
- }
-
- TabularData table = proxy.browseAsTable();
- echo("Found tabular data: " + table);
- assertTrue("Table should not be empty!", table.size() > 0);
-
- assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());
-
- String messageID = messageIDs[0];
- String newDestinationName = "queue://dummy.test.cheese";
- echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName);
- proxy.copyMessageTo(messageID, newDestinationName);
-
- assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());
-
- messageID = messageIDs[1];
- echo("Attempting to remove: " + messageID);
- proxy.removeMessage(messageID);
-
- assertEquals("Queue size", MESSAGE_COUNT - 1, proxy.getQueueSize());
-
- echo("Worked!");
- }
-
- protected void assertCreateAndDestroyDurableSubscriptions() throws Exception {
- // lets create a new topic
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- echo("Create QueueView MBean...");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- broker.addTopic(getDestinationString());
-
- assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
-
- String topicName = getDestinationString();
- String selector = null;
- ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector);
- broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
- assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length);
-
- assertNotNull("Should have created an mbean name for the durable subscriber!", name1);
-
- LOG.info("Created durable subscriber with name: " + name1);
-
- // now lets try destroy it
- broker.destroyDurableSubscriber(clientID, "subscriber1");
- assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length);
- }
-
- protected void assertConsumerCounts() throws Exception {
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertTrue("broker is not a slave", !broker.isSlave());
- // create 2 topics
- broker.addTopic(getDestinationString() + "1");
- broker.addTopic(getDestinationString() + "2");
-
- ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1");
- ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2");
- TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
- TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
-
- assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
-
- String topicName = getDestinationString();
- String selector = null;
-
- // create 1 subscriber for each topic
- broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
- broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);
-
- assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
-
- // create 1 more subscriber for topic1
- broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector);
-
- assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
-
- // destroy topic1 subscriber
- broker.destroyDurableSubscriber(clientID, "topic1.subscriber1");
-
- assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
-
- // destroy topic2 subscriber
- broker.destroyDurableSubscriber(clientID, "topic2.subscriber1");
-
- assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
-
- // destroy remaining topic1 subscriber
- broker.destroyDurableSubscriber(clientID, "topic1.subscriber2");
-
- assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
- assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
- }
-
- protected void assertProducerCounts() throws Exception {
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertTrue("broker is not a slave", !broker.isSlave());
- // create 2 topics
- broker.addTopic(getDestinationString() + "1");
- broker.addTopic(getDestinationString() + "2");
-
- ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1");
- ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2");
- TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
- TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
-
- assertEquals("topic1 Producer count", 0, topic1.getProducerCount());
- assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
- assertEquals("broker Topic Producer count", 0, broker.getTopicProducers().length);
-
- // create 1 producer for each topic
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest1 = session.createTopic(getDestinationString() + "1");
- Destination dest2 = session.createTopic(getDestinationString() + "2");
- MessageProducer producer1 = session.createProducer(dest1);
- MessageProducer producer2 = session.createProducer(dest2);
- Thread.sleep(500);
-
- assertEquals("topic1 Producer count", 1, topic1.getProducerCount());
- assertEquals("topic2 Producer count", 1, topic2.getProducerCount());
-
- assertEquals("broker Topic Producer count", 2, broker.getTopicProducers().length);
-
- // create 1 more producer for topic1
- MessageProducer producer3 = session.createProducer(dest1);
- Thread.sleep(500);
-
- assertEquals("topic1 Producer count", 2, topic1.getProducerCount());
- assertEquals("topic2 Producer count", 1, topic2.getProducerCount());
-
- assertEquals("broker Topic Producer count", 3, broker.getTopicProducers().length);
-
- // destroy topic1 producer
- producer1.close();
- Thread.sleep(500);
-
- assertEquals("topic1 Producer count", 1, topic1.getProducerCount());
- assertEquals("topic2 Producer count", 1, topic2.getProducerCount());
-
- assertEquals("broker Topic Producer count", 2, broker.getTopicProducers().length);
-
- // destroy topic2 producer
- producer2.close();
- Thread.sleep(500);
-
- assertEquals("topic1 Producer count", 1, topic1.getProducerCount());
- assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
-
- assertEquals("broker Topic Producer count", 1, broker.getTopicProducers().length);
-
- // destroy remaining topic1 producer
- producer3.close();
- Thread.sleep(500);
-
- assertEquals("topic1 Producer count", 0, topic1.getProducerCount());
- assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
-
- MessageProducer producer4 = session.createProducer(null);
- Thread.sleep(500);
- assertEquals(1, broker.getDynamicDestinationProducers().length);
- producer4.close();
- Thread.sleep(500);
-
- assertEquals("broker Topic Producer count", 0, broker.getTopicProducers().length);
- }
-
- protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, Exception {
- final ObjectName objectName = new ObjectName(name);
- final AtomicBoolean result = new AtomicBoolean(false);
- assertTrue("Bean registered: " + objectName, Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- result.set(mbeanServer.isRegistered(objectName));
- }
- catch (Exception ignored) {
- LOG.debug(ignored.toString());
- }
- return result.get();
- }
- }));
- return objectName;
- }
-
- protected ObjectName assertNotRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
- ObjectName objectName = new ObjectName(name);
- if (mbeanServer.isRegistered(objectName)) {
- fail("Found the MBean!: " + objectName);
- }
- else {
- echo("Bean not registered Registered: " + objectName);
- }
- return objectName;
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:0";
- useTopic = false;
- super.setUp();
- ManagementContext managementContext = broker.getManagementContext();
- mbeanServer = managementContext.getMBeanServer();
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (waitForKeyPress) {
- // We are running from the command line so let folks browse the
- // mbeans...
- System.out.println();
- System.out.println("Press enter to terminate the program.");
- System.out.println("In the meantime you can use your JMX console to view the current MBeans");
- BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
- reader.readLine();
- }
-
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- @Override
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setPersistent(false);
- answer.setDeleteAllMessagesOnStartup(true);
- answer.setUseJmx(true);
-
- // apply memory limit so that %usage is visible
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setMemoryLimit(1024 * 1024 * 4);
- policyMap.setDefaultEntry(defaultEntry);
- answer.setDestinationPolicy(policyMap);
-
- // allow options to be visible via jmx
- answer.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue(QUEUE_WITH_OPTIONS + "?topQueue=true&hasOptions=2")});
-
- answer.addConnector(bindAddress);
- return answer;
- }
-
- protected void useConnection(Connection connection) throws Exception {
- connection.setClientID(clientID);
- connection.start();
- Session session = connection.createSession(transacted, authMode);
- destination = createDestination();
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message message = session.createTextMessage("Message: " + i);
- message.setIntProperty("counter", i);
- message.setJMSCorrelationID("MyCorrelationID");
- message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
- message.setJMSType("MyType");
- message.setJMSPriority(5);
- producer.send(message);
- }
- Thread.sleep(1000);
- }
-
- protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
- connection.setClientID(clientID);
- connection.start();
- ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode);
- destination = createDestination();
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- BlobMessage message = session.createBlobMessage(new URL("http://foo.bar/test"));
- message.setIntProperty("counter", i);
- message.setJMSCorrelationID("MyCorrelationID");
- message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
- message.setJMSType("MyType");
- message.setJMSPriority(5);
- producer.send(message);
- }
- Thread.sleep(1000);
- }
-
- protected void useConnectionWithByteMessage(Connection connection) throws Exception {
- connection.setClientID(clientID);
- connection.start();
- ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode);
- destination = createDestination();
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(("Message: " + i).getBytes());
- message.setIntProperty("counter", i);
- message.setJMSCorrelationID("MyCorrelationID");
- message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
- message.setJMSType("MyType");
- message.setJMSPriority(5);
- producer.send(message);
- }
- Thread.sleep(1000);
- }
-
- protected void echo(String text) {
- //LOG.info(text);
- }
-
- protected String getSecondDestinationString() {
- return "test.new.destination." + getClass() + "." + getName();
- }
-
- public void testDynamicProducerView() throws Exception {
- connection = connectionFactory.createConnection();
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertEquals(0, broker.getDynamicDestinationProducers().length);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- Destination dest1 = session.createTopic("DynamicDest-1");
- Destination dest2 = session.createTopic("DynamicDest-2");
- Destination dest3 = session.createQueue("DynamicDest-3");
-
- // Wait a bit to let the producer get registered.
- Thread.sleep(100);
-
- assertEquals(1, broker.getDynamicDestinationProducers().length);
-
- ObjectName viewName = broker.getDynamicDestinationProducers()[0];
- assertNotNull(viewName);
- ProducerViewMBean view = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true);
- assertNotNull(view);
-
- assertEquals("NOTSET", view.getDestinationName());
-
- producer.send(dest1, session.createTextMessage("Test Message 1"));
- Thread.sleep(200);
- assertEquals(((ActiveMQDestination) dest1).getPhysicalName(), view.getDestinationName());
- assertTrue(view.isDestinationTopic());
- assertFalse(view.isDestinationQueue());
- assertFalse(view.isDestinationTemporary());
-
- producer.send(dest2, session.createTextMessage("Test Message 2"));
- Thread.sleep(200);
- assertEquals(((ActiveMQDestination) dest2).getPhysicalName(), view.getDestinationName());
- assertTrue(view.isDestinationTopic());
- assertFalse(view.isDestinationQueue());
- assertFalse(view.isDestinationTemporary());
-
- producer.send(dest3, session.createTextMessage("Test Message 3"));
- Thread.sleep(200);
- assertEquals(((ActiveMQDestination) dest3).getPhysicalName(), view.getDestinationName());
- assertTrue(view.isDestinationQueue());
- assertFalse(view.isDestinationTopic());
- assertFalse(view.isDestinationTemporary());
-
- producer.close();
- Thread.sleep(200);
- assertEquals(0, broker.getDynamicDestinationProducers().length);
- }
-
- public void testTempQueueJMXDelete() throws Exception {
- connection = connectionFactory.createConnection();
-
- connection.setClientID(clientID);
- connection.start();
- Session session = connection.createSession(transacted, authMode);
- ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
- Thread.sleep(1000);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=" + JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString()) + ",destinationName=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()));
-
- // should not throw an exception
-
- mbeanServer.getObjectInstance(queueViewMBeanName);
-
- tQueue.delete();
- Thread.sleep(1000);
- try {
- // should throw an exception
- mbeanServer.getObjectInstance(queueViewMBeanName);
-
- fail("should be deleted already!");
- }
- catch (Exception e) {
- // expected!
- }
- }
-
- // Test for AMQ-3029
- public void testBrowseBlobMessages() throws Exception {
- connection = connectionFactory.createConnection();
- useConnectionWithBlobMessage(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- if (initialQueueSize == 0) {
- fail("There is no message in the queue:");
- }
- else {
- echo("Current queue size: " + initialQueueSize);
- }
- int messageCount = initialQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
-
- messageIDs[i] = messageID;
- }
-
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
- }
-
- public void testDestinationOptionsAreVisible() throws Exception {
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + QUEUE_WITH_OPTIONS);
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- assertEquals("name match", QUEUE_WITH_OPTIONS, queue.getName());
-
- String options = queue.getOptions();
- LOG.info("Got options: " + options);
-
- Map<String, String> optionsMap = URISupport.parseQuery(options);
- assertEquals("got a map", 2, optionsMap.size());
- assertTrue("matches our options", optionsMap.containsKey("hasOptions"));
- assertTrue("matches our options", optionsMap.containsKey("topQueue"));
-
- assertTrue("matches our options", optionsMap.containsValue("true"));
- assertTrue("matches our options", optionsMap.containsValue("2"));
- }
-
- public void testSubscriptionViewToConnectionMBean() throws Exception {
-
- connection = connectionFactory.createConnection("admin", "admin");
- connection.setClientID("MBeanTest");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination queue = session.createQueue(getDestinationString() + ".Queue");
- MessageConsumer queueConsumer = session.createConsumer(queue);
- MessageProducer producer = session.createProducer(queue);
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- Thread.sleep(100);
-
- assertTrue(broker.getQueueSubscribers().length == 1);
-
- ObjectName subscriptionName = broker.getQueueSubscribers()[0];
- LOG.info("Looking for Subscription: " + subscriptionName);
-
- SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, subscriptionName, SubscriptionViewMBean.class, true);
- assertNotNull(subscriberView);
-
- ObjectName connectionName = subscriberView.getConnection();
- LOG.info("Looking for Connection: " + connectionName);
- assertNotNull(connectionName);
- ConnectionViewMBean connectionView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, connectionName, ConnectionViewMBean.class, true);
- assertNotNull(connectionView);
-
- // Our consumer plus one advisory consumer.
- assertEquals(2, connectionView.getConsumers().length);
-
- assertEquals("client id match", "MBeanTest", connectionView.getClientId());
-
- // Check that the subscription view we found earlier is in this list.
- boolean found = false;
- for (ObjectName name : connectionView.getConsumers()) {
- if (name.equals(subscriptionName)) {
- found = true;
- }
- }
- assertTrue("We should have found: " + subscriptionName, found);
-
- // Our producer and no others.
- assertEquals(1, connectionView.getProducers().length);
-
- // Bean should detect the updates.
- queueConsumer.close();
- producer.close();
-
- Thread.sleep(200);
-
- // Only an advisory consumers now.
- assertEquals(1, connectionView.getConsumers().length);
- assertEquals(0, connectionView.getProducers().length);
- }
-
- public void testCreateAndUnsubscribeDurableSubscriptions() throws Exception {
-
- connection = connectionFactory.createConnection("admin", "admin");
- connection.setClientID("MBeanTest");
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String topicName = getDestinationString() + ".DurableTopic";
- Topic topic = session.createTopic(topicName);
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- echo("Create QueueView MBean...");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
- assertEquals("Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length);
-
- MessageConsumer durableConsumer1 = session.createDurableSubscriber(topic, "subscription1");
- MessageConsumer durableConsumer2 = session.createDurableSubscriber(topic, "subscription2");
-
- Thread.sleep(100);
-
- assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
- assertEquals("Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length);
-
- durableConsumer1.close();
- durableConsumer2.close();
-
- Thread.sleep(100);
-
- assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
- assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length);
-
- session.unsubscribe("subscription1");
-
- Thread.sleep(100);
-
- assertEquals("Inactive Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length);
-
- session.unsubscribe("subscription2");
-
- assertEquals("Inactive Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length);
- }
-
- public void testUserNamePopulated() throws Exception {
- doTestUserNameInMBeans(true);
- }
-
- public void testUserNameNotPopulated() throws Exception {
- doTestUserNameInMBeans(false);
- }
-
- @SuppressWarnings("unused")
- private void doTestUserNameInMBeans(boolean expect) throws Exception {
- broker.setPopulateUserNameInMBeans(expect);
-
- connection = connectionFactory.createConnection("admin", "admin");
- connection.setClientID("MBeanTest");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination queue = session.createQueue(getDestinationString() + ".Queue");
- Topic topic = session.createTopic(getDestinationString() + ".Topic");
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer queueConsumer = session.createConsumer(queue);
- MessageConsumer topicConsumer = session.createConsumer(topic);
- MessageConsumer durable = session.createDurableSubscriber(topic, "Durable");
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- Thread.sleep(100);
-
- assertTrue(broker.getQueueProducers().length == 1);
- assertTrue(broker.getTopicSubscribers().length == 2);
- assertTrue(broker.getQueueSubscribers().length == 1);
-
- ObjectName producerName = broker.getQueueProducers()[0];
- ProducerViewMBean producerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, producerName, ProducerViewMBean.class, true);
- assertNotNull(producerView);
-
- if (expect) {
- assertEquals("admin", producerView.getUserName());
- }
- else {
- assertNull(producerView.getUserName());
- }
-
- for (ObjectName name : broker.getTopicSubscribers()) {
- SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
- if (expect) {
- assertEquals("admin", subscriberView.getUserName());
- }
- else {
- assertNull(subscriberView.getUserName());
- }
- }
-
- for (ObjectName name : broker.getQueueSubscribers()) {
- SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
- if (expect) {
- assertEquals("admin", subscriberView.getUserName());
- }
- else {
- assertNull(subscriberView.getUserName());
- }
- }
- ObjectName query = //new ObjectName(domain + ":type=Broker,brokerName=localhost,connector=*," + "connectorName=*,connectionName=MBeanTest");
- BrokerMBeanSupport.createConnectionQuery(domain, "localhost", connection.getClientID());
-
- Set<ObjectName> names = mbeanServer.queryNames(query, null);
- boolean found = false;
- for (ObjectName name : names) {
- if (name.toString().endsWith("connectionName=MBeanTest")) {
-
- ConnectionViewMBean connectionView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, ConnectionViewMBean.class, true);
- assertNotNull(connectionView);
-
- if (expect) {
- assertEquals("admin", connectionView.getUserName());
- }
- else {
- assertNull(connectionView.getUserName());
- }
-
- found = true;
- break;
- }
- }
-
- assertTrue("Should find the connection's ManagedTransportConnection", found);
- }
-
- public void testMoveMessagesToRetainOrder() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String newDestination = getSecondDestinationString();
- queue.moveMatchingMessagesTo("", newDestination);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
-
- queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- int movedSize = MESSAGE_COUNT;
- assertEquals("Unexpected number of messages ", movedSize, queue.getQueueSize());
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(newDestination);
- MessageConsumer consumer = session.createConsumer(destination);
-
- int last = -1;
- int current = -1;
- Message message = null;
- while ((message = consumer.receive(2000)) != null) {
- if (message.propertyExists("counter")) {
- current = message.getIntProperty("counter");
- assertEquals(last, current - 1);
- last = current;
- }
- }
-
- // now lets remove them by selector
- queue.removeMatchingMessages("");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
-
- public void testConnectionCounts() throws Exception {
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- assertEquals(0, broker.getCurrentConnectionsCount());
-
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- assertEquals(1, broker.getCurrentConnectionsCount());
- connection.close();
- assertEquals(0, broker.getCurrentConnectionsCount());
- assertEquals(1, broker.getTotalConnectionsCount());
- }
-
- public void testCopyMessagesToRetainOrder() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String newDestination = getSecondDestinationString();
- queue.copyMatchingMessagesTo("", newDestination);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
-
- queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- int movedSize = MESSAGE_COUNT;
- assertEquals("Unexpected number of messages ", movedSize, queue.getQueueSize());
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(newDestination);
- MessageConsumer consumer = session.createConsumer(destination);
-
- int last = -1;
- int current = -1;
- Message message = null;
- while ((message = consumer.receive(2000)) != null) {
- if (message.propertyExists("counter")) {
- current = message.getIntProperty("counter");
- assertEquals(last, current - 1);
- last = current;
- }
- }
-
- // now lets remove them by selector
- queue.removeMatchingMessages("");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
-
- public void testRemoveMatchingMessageRetainOrder() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String queueName = getDestinationString();
- queue.removeMatchingMessages("counter < 10");
-
- int newSize = MESSAGE_COUNT - 10;
- assertEquals("Unexpected number of messages ", newSize, queue.getQueueSize());
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(queueName);
- MessageConsumer consumer = session.createConsumer(destination);
-
- int last = 9;
- int current = 0;
- Message message = null;
- while ((message = consumer.receive(2000)) != null) {
- if (message.propertyExists("counter")) {
- current = message.getIntProperty("counter");
- assertEquals(last, current - 1);
- last = current;
- }
- }
-
- // now lets remove them by selector
- queue.removeMatchingMessages("");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
-
- public void testBrowseBytesMessages() throws Exception {
- connection = connectionFactory.createConnection();
- useConnectionWithByteMessage(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- if (initialQueueSize == 0) {
- fail("There is no message in the queue:");
- }
- else {
- echo("Current queue size: " + initialQueueSize);
- }
- int messageCount = initialQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
- messageIDs[i] = messageID;
-
- Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
- assertNotNull("should be a preview", preview);
- assertTrue("not empty", preview.length > 0);
- }
-
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
- // consume all the messages
- echo("Attempting to consume all bytes messages from: " + destination);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message message = consumer.receive(5000);
- assertNotNull(message);
- assertTrue(message instanceof BytesMessage);
- }
- consumer.close();
- session.close();
- }
-
- public void testBrowseOrder() throws Exception {
- connection = connectionFactory.createConnection();
- ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
- prefetchPolicy.setAll(20);
- ((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy);
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
-
- QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- assertEquals("expected", MESSAGE_COUNT, initialQueueSize);
-
- int messageCount = initialQueueSize;
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
-
- Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
- assertTrue("not empty", intProperties.size() > 0);
- assertEquals("counter in order", i, intProperties.get("counter"));
- }
-
- echo("Attempting to consume 5 bytes messages from: " + destination);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = 0; i < 5; i++) {
- Message message = consumer.receive(5000);
- assertNotNull(message);
- assertEquals("ordered", i, message.getIntProperty("counter"));
- echo("Consumed: " + message.getIntProperty("counter"));
- }
- consumer.close();
- session.close();
- connection.close();
-
- // browse again and verify order
- compdatalist = queue.browse();
- initialQueueSize = compdatalist.length;
- assertEquals("5 gone", MESSAGE_COUNT - 5, initialQueueSize);
-
- messageCount = initialQueueSize;
- for (int i = 0; i < messageCount - 4; i++) {
- CompositeData cdata = compdatalist[i];
-
- Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
- assertTrue("not empty", intProperties.size() > 0);
- assertEquals("counter in order", i + 5, intProperties.get("counter"));
- echo("Got: " + intProperties.get("counter"));
- }
- }
-
- public void testAddRemoveConnectorBrokerView() throws Exception {
-
- ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean brokerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-
- Map<String, String> connectors = brokerView.getTransportConnectors();
- LOG.info("Connectors: " + connectors);
- assertEquals("one connector", 1, connectors.size());
-
- ConnectorViewMBean connector = getProxyToConnectionView("tcp");
- assertNotNull(connector);
-
- String name = connectors.keySet().iterator().next().toString();
-
- brokerView.removeConnector(name);
-
- connectors = brokerView.getTransportConnectors();
- assertEquals("empty", 0, connectors.size());
-
- name = brokerView.addConnector("tcp://0.0.0.0:0");
-
- connector = getProxyToConnectionView("tcp");
- assertNotNull(connector);
-
- connectors = brokerView.getTransportConnectors();
- LOG.info("Connectors: " + connectors);
- assertEquals("one connector", 1, connectors.size());
- assertTrue("name is in map: " + connectors.keySet(), connectors.keySet().contains(name));
- }
-
- public void testConnectorView() throws Exception {
- ConnectorViewMBean connector = getProxyToConnectionView("tcp");
- assertNotNull(connector);
-
- assertFalse(connector.isRebalanceClusterClients());
- assertFalse(connector.isUpdateClusterClientsOnRemove());
- assertFalse(connector.isUpdateClusterClients());
- assertFalse(connector.isAllowLinkStealingEnabled());
- }
-
- protected ConnectorViewMBean getProxyToConnectionView(String connectionType) throws Exception {
- ObjectName connectorQuery = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=" + connectionType + "_//*");
-
- Set<ObjectName> results = broker.getManagementContext().queryNames(connectorQuery, null);
-
- if (results == null || results.isEmpty() || results.size() > 1) {
- throw new Exception("Unable to find the exact Connector instance.");
- }
-
- ConnectorViewMBean proxy = (ConnectorViewMBean) broker.getManagementContext().newProxyInstance(results.iterator().next(), ConnectorViewMBean.class, true);
- return proxy;
- }
-
- public void testDynamicProducers() throws Exception {
- connection = connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,endpoint=dynamicProducer,*");
- Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
- assertEquals(mbeans.size(), 1);
- producer.close();
- }
-
- public void testDurableSubQuery() throws Exception {
- connection = connectionFactory.createConnection();
- connection.setClientID("test");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber sub = session.createDurableSubscriber(session.createTopic("test.topic"), "test.consumer");
-
- ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic,endpoint=Consumer,consumerId=Durable(*),*");
- Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
- assertEquals(mbeans.size(), 1);
- sub.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
deleted file mode 100644
index 54e5793..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.jmx;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
-
-import junit.textui.TestRunner;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A specific test of Queue.purge() functionality
- */
-public class PurgeTest extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(PurgeTest.class);
-
- protected MBeanServer mbeanServer;
- protected String domain = "org.apache.activemq";
- protected String clientID = "foo";
-
- protected Connection connection;
- protected boolean transacted;
- protected int authMode = Session.AUTO_ACKNOWLEDGE;
- protected int messageCount = 10;
- public PersistenceAdapter persistenceAdapter;
-
- public static void main(String[] args) {
- TestRunner.run(PurgeTest.class);
- }
-
- public static Test suite() {
- return suite(PurgeTest.class);
- }
-
- public void testPurge() throws Exception {
- // Send some messages
- connection = connectionFactory.createConnection();
- connection.setClientID(clientID);
- connection.start();
- Session session = connection.createSession(transacted, authMode);
- destination = createDestination();
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < messageCount; i++) {
- Message message = session.createTextMessage("Message: " + i);
- producer.send(message);
- }
-
- // Now get the QueueViewMBean and purge
- String objectNameStr = broker.getBrokerObjectName().toString();
- objectNameStr += ",destinationType=Queue,destinationName=" + getDestinationString();
- ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr);
- QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- long count = proxy.getQueueSize();
- assertEquals("Queue size", count, messageCount);
-
- proxy.purge();
- count = proxy.getQueueSize();
- assertEquals("Queue size", count, 0);
- assertEquals("Browse size", proxy.browseMessages().size(), 0);
-
- // Queues have a special case once there are more than a thousand
- // dead messages, make sure we hit that.
- messageCount += 1000;
- for (int i = 0; i < messageCount; i++) {
- Message message = session.createTextMessage("Message: " + i);
- producer.send(message);
- }
-
- count = proxy.getQueueSize();
- assertEquals("Queue size", count, messageCount);
-
- proxy.purge();
- count = proxy.getQueueSize();
- assertEquals("Queue size", count, 0);
- assertEquals("Browse size", proxy.browseMessages().size(), 0);
-
- producer.close();
- }
-
- public void initCombosForTestDelete() {
- addCombinationValues("persistenceAdapter", new Object[]{new MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()});
- }
-
- public void testDeleteSameProducer() throws Exception {
- connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination();
-
- MessageProducer producer = session.createProducer(destination);
- Message message = session.createTextMessage("Test Message");
- producer.send(message);
-
- MessageConsumer consumer = session.createConsumer(destination);
-
- Message received = consumer.receive(1000);
- assertEquals(message, received);
-
- ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean brokerProxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
-
- brokerProxy.removeQueue(getDestinationString());
- producer.send(message);
-
- received = consumer.receive(1000);
-
- assertNotNull("Message not received", received);
- assertEquals(message, received);
- }
-
- public void testDelete() throws Exception {
- // Send some messages
- connection = connectionFactory.createConnection();
- connection.setClientID(clientID);
- connection.start();
- Session session = connection.createSession(transacted, authMode);
- destination = createDestination();
- sendMessages(session, messageCount);
-
- // Now get the QueueViewMBean and purge
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
- QueueViewMBean queueProxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
- BrokerViewMBean brokerProxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
-
- long count = queueProxy.getQueueSize();
- assertEquals("Queue size", count, messageCount);
-
- brokerProxy.removeQueue(getDestinationString());
-
- sendMessages(session, messageCount);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
- queueProxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- count = queueProxy.getQueueSize();
- assertEquals("Queue size", count, messageCount);
-
- queueProxy.purge();
-
- // Queue have a special case once there are more than a thousand
- // dead messages, make sure we hit that.
- messageCount += 1000;
- sendMessages(session, messageCount);
-
- count = queueProxy.getQueueSize();
- assertEquals("Queue size", count, messageCount);
-
- brokerProxy.removeQueue(getDestinationString());
-
- sendMessages(session, messageCount);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
- queueProxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- count = queueProxy.getQueueSize();
- assertEquals("Queue size", count, messageCount);
- }
-
- private void sendMessages(Session session, int count) throws Exception {
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < messageCount; i++) {
- Message message = session.createTextMessage("Message: " + i);
- producer.send(message);
- }
- }
-
- protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
- ObjectName objectName = new ObjectName(name);
- if (mbeanServer.isRegistered(objectName)) {
- echo("Bean Registered: " + objectName);
- }
- else {
- fail("Could not find MBean!: " + objectName);
- }
- return objectName;
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:0";
- useTopic = false;
- super.setUp();
- mbeanServer = broker.getManagementContext().getMBeanServer();
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseJmx(true);
- answer.setEnableStatistics(true);
- answer.addConnector(bindAddress);
- answer.setPersistenceAdapter(persistenceAdapter);
- answer.deleteAllMessages();
- return answer;
- }
-
- @Override
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
- }
-
- protected void echo(String text) {
- LOG.info(text);
- }
-
- /**
- * Returns the name of the destination used in this test case
- */
- @Override
- protected String getDestinationString() {
- return getClass().getName() + "." + getName(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
deleted file mode 100644
index 3653bf7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.jmx;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-
-import java.net.Socket;
-import java.util.Set;
-
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.util.JMXSupport;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransportConnectorMBeanTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransportConnectorMBeanTest.class);
-
- BrokerService broker;
-
- @Test
- public void verifyRemoteAddressInMbeanName() throws Exception {
- doVerifyRemoteAddressInMbeanName(true);
- }
-
- @Test
- public void verifyRemoteAddressNotInMbeanName() throws Exception {
- doVerifyRemoteAddressInMbeanName(false);
- }
-
- @Test
- public void verifyClientIdNetwork() throws Exception {
- doVerifyClientIdNetwork(false);
- }
-
- @Test
- public void verifyClientIdDuplexNetwork() throws Exception {
- doVerifyClientIdNetwork(true);
- }
-
- private void doVerifyClientIdNetwork(boolean duplex) throws Exception {
- createBroker(true);
-
- BrokerService networked = new BrokerService();
- networked.setBrokerName("networked");
- networked.setPersistent(false);
- NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString());
- nc.setDuplex(duplex);
- networked.start();
-
- try {
- assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Set<ObjectName> registeredMbeans = getRegisteredMbeans();
- return match("_outbound", registeredMbeans);
- }
- }));
-
- }
- finally {
- networked.stop();
- }
- }
-
- private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception {
- createBroker(allowRemoteAddress);
- ActiveMQConnection connection = createConnection();
- Set<ObjectName> registeredMbeans = getRegisteredMbeans();
- assertEquals("presence of mbean with clientId", true, match(connection.getClientID(), registeredMbeans));
- assertEquals("presence of mbean with local port", allowRemoteAddress, match(extractLocalPort(connection), registeredMbeans));
- }
-
- @After
- public void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
- private boolean match(String s, Set<ObjectName> registeredMbeans) {
- String encodedName = JMXSupport.encodeObjectNamePart(s);
- for (ObjectName name : registeredMbeans) {
- LOG.info("checking for match:" + encodedName + ", with: " + name.toString());
- if (name.toString().contains(encodedName)) {
- return true;
- }
- }
- return false;
- }
-
- private String extractLocalPort(ActiveMQConnection connection) throws Exception {
- Socket socket = connection.getTransport().narrow(Socket.class);
- return String.valueOf(socket.getLocalPort());
- }
-
- private Set<ObjectName> getRegisteredMbeans() throws Exception {
- // need a little sleep to ensure JMX is up to date
- Thread.sleep(200);
- return broker.getManagementContext().queryNames(null, null);
- }
-
- private ActiveMQConnection createConnection() throws Exception {
- final String opts = "?jms.watchTopicAdvisories=false";
- ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + opts).createConnection();
- connection.start();
- return connection;
- }
-
- private void createBroker(boolean allowRemoteAddressInMbeanNames) throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.addConnector("tcp://localhost:0");
- broker.getManagementContext().setAllowRemoteAddressInMBeanNames(allowRemoteAddressInMbeanNames);
- broker.start();
- }
-
-}