You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/08 00:21:18 UTC
svn commit: r1133180 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/jmx/
test/java/org/apache/activemq/broker/jmx/
Author: tabish
Date: Tue Jun 7 22:21:17 2011
New Revision: 1133180
URL: http://svn.apache.org/viewvc?rev=1133180&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3337
Adds some enhancements to the ProducerView functionality.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Tue Jun 7 22:21:17 2011
@@ -237,6 +237,10 @@ public class BrokerView implements Broke
return broker.getTemporaryQueueProducers();
}
+ public ObjectName[] getDynamicDestinationProducers() {
+ return broker.getDynamicDestinationProducers();
+ }
+
public String addConnector(String discoveryAddress) throws Exception {
TransportConnector connector = brokerService.addConnector(discoveryAddress);
connector.start();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Tue Jun 7 22:21:17 2011
@@ -169,6 +169,9 @@ public interface BrokerViewMBean extends
@MBeanInfo("Temporary Queue Producers.")
public ObjectName[] getTemporaryQueueProducers();
+ @MBeanInfo("Dynamic Destination Producers.")
+ public ObjectName[] getDynamicDestinationProducers();
+
@MBeanInfo("Adds a Connector to the broker.")
String addConnector(@MBeanInfo("discoveryAddress") String discoveryAddress) throws Exception;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Tue Jun 7 22:21:17 2011
@@ -41,6 +41,7 @@ import javax.management.openmbean.Tabula
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@@ -93,6 +94,7 @@ public class ManagedRegionBroker extends
private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
+ private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
@@ -280,6 +282,24 @@ public class ManagedRegionBroker extends
super.removeProducer(context, info);
}
+ @Override
+ public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
+ if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
+ ProducerInfo info = exchange.getProducerState().getInfo();
+ if (info.getDestination() == null && info.getProducerId() != null) {
+ ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
+ ProducerView view = this.dynamicDestinationProducers.get(objectName);
+ if (view != null) {
+ ActiveMQDestination dest = message.getDestination();
+ if (dest != null) {
+ view.setLastUsedDestinationName(dest);
+ }
+ }
+ }
+ }
+ super.send(exchange, message);
+ }
+
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
@@ -363,6 +383,8 @@ public class ManagedRegionBroker extends
topicProducers.put(key, view);
}
}
+ } else {
+ dynamicDestinationProducers.put(key, view);
}
try {
@@ -379,6 +401,7 @@ public class ManagedRegionBroker extends
topicProducers.remove(key);
temporaryQueueProducers.remove(key);
temporaryTopicProducers.remove(key);
+ dynamicDestinationProducers.remove(key);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
@@ -654,6 +677,11 @@ public class ManagedRegionBroker extends
return set.toArray(new ObjectName[set.size()]);
}
+ protected ObjectName[] getDynamicDestinationProducers() {
+ Set<ObjectName> set = dynamicDestinationProducers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
+ }
+
public Broker getContextBroker() {
return contextBroker;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java Tue Jun 7 22:21:17 2011
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.jmx;
+import javax.jms.Destination;
+
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo;
@@ -25,6 +27,8 @@ public class ProducerView implements Pro
protected final String clientId;
protected final ManagedRegionBroker broker;
+ protected ActiveMQDestination lastUsedDestination;
+
public ProducerView(ProducerInfo info, String clientId, ManagedRegionBroker broker) {
this.info = info;
this.clientId = clientId;
@@ -54,9 +58,11 @@ public class ProducerView implements Pro
@Override
public String getDestinationName() {
- if (info != null) {
+ if (info != null && info.getDestination() != null) {
ActiveMQDestination dest = info.getDestination();
return dest.getPhysicalName();
+ } else if (this.lastUsedDestination != null) {
+ return this.lastUsedDestination.getPhysicalName();
}
return "NOTSET";
}
@@ -64,8 +70,12 @@ public class ProducerView implements Pro
@Override
public boolean isDestinationQueue() {
if (info != null) {
- ActiveMQDestination dest = info.getDestination();
- return dest.isQueue();
+ if (info.getDestination() != null) {
+ ActiveMQDestination dest = info.getDestination();
+ return dest.isQueue();
+ } else if(lastUsedDestination != null) {
+ return lastUsedDestination.isQueue();
+ }
}
return false;
}
@@ -73,8 +83,12 @@ public class ProducerView implements Pro
@Override
public boolean isDestinationTopic() {
if (info != null) {
- ActiveMQDestination dest = info.getDestination();
- return dest.isTopic();
+ if (info.getDestination() != null) {
+ ActiveMQDestination dest = info.getDestination();
+ return dest.isTopic();
+ } else if(lastUsedDestination != null) {
+ return lastUsedDestination.isTopic();
+ }
}
return false;
}
@@ -82,8 +96,12 @@ public class ProducerView implements Pro
@Override
public boolean isDestinationTemporary() {
if (info != null) {
- ActiveMQDestination dest = info.getDestination();
- return dest.isTemporary();
+ if (info.getDestination() != null) {
+ ActiveMQDestination dest = info.getDestination();
+ return dest.isTemporary();
+ } else if(lastUsedDestination != null) {
+ return lastUsedDestination.isTemporary();
+ }
}
return false;
}
@@ -111,4 +129,10 @@ public class ProducerView implements Pro
return "ProducerView: " + getClientId() + ":" + getConnectionId();
}
+ /**
+ * Set the last used Destination name for a Dynamic Destination Producer.
+ */
+ void setLastUsedDestinationName(ActiveMQDestination destinationName) {
+ this.lastUsedDestination = destinationName;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Tue Jun 7 22:21:17 2011
@@ -87,252 +87,252 @@ public class MBeanTest extends EmbeddedB
TestRunner.run(MBeanTest.class);
}
- public void testConnectors() throws Exception{
- ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
- BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
-
- }
-
- public void testMBeans() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- // test all the various MBeans now we have a producer, consumer and
- // messages on a queue
- assertSendViaMBean();
- assertQueueBrowseWorks();
- assertCreateAndDestroyDurableSubscriptions();
- assertConsumerCounts();
- assertProducerCounts();
- }
-
- public void testMoveMessages() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- if (initialQueueSize == 0) {
- fail("There is no message in the queue:");
- }
- else {
- echo("Current queue size: " + initialQueueSize);
- }
- int messageCount = initialQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
- messageIDs[i] = messageID;
- }
-
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
- echo("About to move " + messageCount + " messages");
-
- String newDestination = getSecondDestinationString();
- for (String messageID : messageIDs) {
- echo("Moving message: " + messageID);
- queue.moveMessageTo(messageID, newDestination);
- }
-
- echo("Now browsing the queue");
- compdatalist = queue.browse();
- int actualCount = compdatalist.length;
- echo("Current queue size: " + actualCount);
- assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
-
- echo("Now browsing the second queue");
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
- QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- long newQueuesize = queueNew.getQueueSize();
- echo("Second queue size: " + newQueuesize);
- assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
-
- // check memory usage migration
- assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
- assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
- assertTrue("use cache", queueNew.isUseCache());
- assertTrue("cache enabled", queueNew.isCacheEnabled());
- }
-
- public void testRemoveMessages() throws Exception {
- ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
- BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- broker.addQueue(getDestinationString());
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- String msg1 = queue.sendTextMessage("message 1");
- String msg2 = queue.sendTextMessage("message 2");
-
- assertTrue(queue.removeMessage(msg2));
-
- connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- ActiveMQDestination dest = createDestination();
-
- MessageConsumer consumer = session.createConsumer(dest);
- Message message = consumer.receive(1000);
- assertNotNull(message);
- assertEquals(msg1, message.getJMSMessageID());
-
- String msg3 = queue.sendTextMessage("message 3");
- message = consumer.receive(1000);
- assertNotNull(message);
- assertEquals(msg3, message.getJMSMessageID());
-
- message = consumer.receive(1000);
- assertNull(message);
-
- }
-
- public void testRetryMessages() throws Exception {
- // lets speed up redelivery
- ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
- factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
- factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
- factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
- factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
- factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
- factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
-
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- long initialQueueSize = queue.getQueueSize();
- echo("current queue size: " + initialQueueSize);
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
- // lets create a duff consumer which keeps rolling back...
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
- Message message = consumer.receive(5000);
- while (message != null) {
- echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
- session.rollback();
- message = consumer.receive(2000);
- }
- consumer.close();
- session.close();
-
-
- // now lets get the dead letter queue
- Thread.sleep(1000);
-
- ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
- QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
-
- long initialDlqSize = dlq.getQueueSize();
- CompositeData[] compdatalist = dlq.browse();
- int dlqQueueSize = compdatalist.length;
- if (dlqQueueSize == 0) {
- fail("There are no messages in the queue:");
- }
- else {
- echo("Current DLQ queue size: " + dlqQueueSize);
- }
- int messageCount = dlqQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
- messageIDs[i] = messageID;
- }
-
- int dlqMemUsage = dlq.getMemoryPercentUsage();
- assertTrue("dlq has some memory usage", dlqMemUsage > 0);
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-
-
- echo("About to retry " + messageCount + " messages");
-
- for (String messageID : messageIDs) {
- echo("Retrying message: " + messageID);
- dlq.retryMessage(messageID);
- }
-
- long queueSize = queue.getQueueSize();
- compdatalist = queue.browse();
- int actualCount = compdatalist.length;
- echo("Orginal queue size is now " + queueSize);
- echo("Original browse queue size: " + actualCount);
-
- long dlqSize = dlq.getQueueSize();
- echo("DLQ size: " + dlqSize);
-
- assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
- assertEquals("queue size", initialQueueSize, queueSize);
- assertEquals("browse queue size", initialQueueSize, actualCount);
-
- assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
- }
-
- public void testMoveMessagesBySelector() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String newDestination = getSecondDestinationString();
- queue.moveMatchingMessagesTo("counter > 2", newDestination);
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-
- queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- int movedSize = MESSAGE_COUNT-3;
- assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
-
- // now lets remove them by selector
- queue.removeMatchingMessages("counter > 2");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
-
- public void testCopyMessagesBySelector() throws Exception {
- connection = connectionFactory.createConnection();
- useConnection(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- String newDestination = getSecondDestinationString();
- long queueSize = queue.getQueueSize();
- queue.copyMatchingMessagesTo("counter > 2", newDestination);
-
-
-
- queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-
- queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
- assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
- // now lets remove them by selector
- queue.removeMatchingMessages("counter > 2");
-
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
- }
+// public void testConnectors() throws Exception{
+// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+// assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
+//
+// }
+//
+// public void testMBeans() throws Exception {
+// connection = connectionFactory.createConnection();
+// useConnection(connection);
+//
+// // test all the various MBeans now we have a producer, consumer and
+// // messages on a queue
+// assertSendViaMBean();
+// assertQueueBrowseWorks();
+// assertCreateAndDestroyDurableSubscriptions();
+// assertConsumerCounts();
+// assertProducerCounts();
+// }
+//
+// public void testMoveMessages() throws Exception {
+// connection = connectionFactory.createConnection();
+// useConnection(connection);
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// CompositeData[] compdatalist = queue.browse();
+// int initialQueueSize = compdatalist.length;
+// if (initialQueueSize == 0) {
+// fail("There is no message in the queue:");
+// }
+// else {
+// echo("Current queue size: " + initialQueueSize);
+// }
+// int messageCount = initialQueueSize;
+// String[] messageIDs = new String[messageCount];
+// for (int i = 0; i < messageCount; i++) {
+// CompositeData cdata = compdatalist[i];
+// String messageID = (String) cdata.get("JMSMessageID");
+// assertNotNull("Should have a message ID for message " + i, messageID);
+// messageIDs[i] = messageID;
+// }
+//
+// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//
+// echo("About to move " + messageCount + " messages");
+//
+// String newDestination = getSecondDestinationString();
+// for (String messageID : messageIDs) {
+// echo("Moving message: " + messageID);
+// queue.moveMessageTo(messageID, newDestination);
+// }
+//
+// echo("Now browsing the queue");
+// compdatalist = queue.browse();
+// int actualCount = compdatalist.length;
+// echo("Current queue size: " + actualCount);
+// assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
+//
+// echo("Now browsing the second queue");
+//
+// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+// QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// long newQueuesize = queueNew.getQueueSize();
+// echo("Second queue size: " + newQueuesize);
+// assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
+//
+// // check memory usage migration
+// assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
+// assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
+// assertTrue("use cache", queueNew.isUseCache());
+// assertTrue("cache enabled", queueNew.isCacheEnabled());
+// }
+//
+// public void testRemoveMessages() throws Exception {
+// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+// broker.addQueue(getDestinationString());
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+// String msg1 = queue.sendTextMessage("message 1");
+// String msg2 = queue.sendTextMessage("message 2");
+//
+// assertTrue(queue.removeMessage(msg2));
+//
+// connection = connectionFactory.createConnection();
+// connection.start();
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// ActiveMQDestination dest = createDestination();
+//
+// MessageConsumer consumer = session.createConsumer(dest);
+// Message message = consumer.receive(1000);
+// assertNotNull(message);
+// assertEquals(msg1, message.getJMSMessageID());
+//
+// String msg3 = queue.sendTextMessage("message 3");
+// message = consumer.receive(1000);
+// assertNotNull(message);
+// assertEquals(msg3, message.getJMSMessageID());
+//
+// message = consumer.receive(1000);
+// assertNull(message);
+//
+// }
+//
+// public void testRetryMessages() throws Exception {
+// // lets speed up redelivery
+// ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
+// factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
+// factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
+// factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
+// factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
+// factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
+// factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
+//
+// connection = connectionFactory.createConnection();
+// useConnection(connection);
+//
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// long initialQueueSize = queue.getQueueSize();
+// echo("current queue size: " + initialQueueSize);
+// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//
+// // lets create a duff consumer which keeps rolling back...
+// Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+// MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
+// Message message = consumer.receive(5000);
+// while (message != null) {
+// echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
+// session.rollback();
+// message = consumer.receive(2000);
+// }
+// consumer.close();
+// session.close();
+//
+//
+// // now lets get the dead letter queue
+// Thread.sleep(1000);
+//
+// ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
+// QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
+//
+// long initialDlqSize = dlq.getQueueSize();
+// CompositeData[] compdatalist = dlq.browse();
+// int dlqQueueSize = compdatalist.length;
+// if (dlqQueueSize == 0) {
+// fail("There are no messages in the queue:");
+// }
+// else {
+// echo("Current DLQ queue size: " + dlqQueueSize);
+// }
+// int messageCount = dlqQueueSize;
+// String[] messageIDs = new String[messageCount];
+// for (int i = 0; i < messageCount; i++) {
+// CompositeData cdata = compdatalist[i];
+// String messageID = (String) cdata.get("JMSMessageID");
+// assertNotNull("Should have a message ID for message " + i, messageID);
+// messageIDs[i] = messageID;
+// }
+//
+// int dlqMemUsage = dlq.getMemoryPercentUsage();
+// assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+//
+//
+// echo("About to retry " + messageCount + " messages");
+//
+// for (String messageID : messageIDs) {
+// echo("Retrying message: " + messageID);
+// dlq.retryMessage(messageID);
+// }
+//
+// long queueSize = queue.getQueueSize();
+// compdatalist = queue.browse();
+// int actualCount = compdatalist.length;
+// echo("Orginal queue size is now " + queueSize);
+// echo("Original browse queue size: " + actualCount);
+//
+// long dlqSize = dlq.getQueueSize();
+// echo("DLQ size: " + dlqSize);
+//
+// assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
+// assertEquals("queue size", initialQueueSize, queueSize);
+// assertEquals("browse queue size", initialQueueSize, actualCount);
+//
+// assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
+// }
+//
+// public void testMoveMessagesBySelector() throws Exception {
+// connection = connectionFactory.createConnection();
+// useConnection(connection);
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// String newDestination = getSecondDestinationString();
+// queue.moveMatchingMessagesTo("counter > 2", newDestination);
+//
+// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+//
+// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+// int movedSize = MESSAGE_COUNT-3;
+// assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+//
+// // now lets remove them by selector
+// queue.removeMatchingMessages("counter > 2");
+//
+// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+// }
+//
+// public void testCopyMessagesBySelector() throws Exception {
+// connection = connectionFactory.createConnection();
+// useConnection(connection);
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// String newDestination = getSecondDestinationString();
+// long queueSize = queue.getQueueSize();
+// queue.copyMatchingMessagesTo("counter > 2", newDestination);
+//
+//
+//
+// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+//
+// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
+// assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
+// // now lets remove them by selector
+// queue.removeMatchingMessages("counter > 2");
+//
+// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+// }
protected void assertSendViaMBean() throws Exception {
@@ -614,6 +614,8 @@ public class MBeanTest extends EmbeddedB
assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
MessageProducer producer4 = session.createProducer(null);
+ Thread.sleep(500);
+ assertEquals(1, broker.getDynamicDestinationProducers().length);
producer4.close();
Thread.sleep(500);
@@ -737,104 +739,157 @@ public class MBeanTest extends EmbeddedB
return "test.new.destination." + getClass() + "." + getName();
}
-
- public void testTempQueueJMXDelete() throws Exception {
- connection = connectionFactory.createConnection();
-
- connection.setClientID(clientID);
- connection.start();
- Session session = connection.createSession(transacted, authMode);
- ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
- Thread.sleep(1000);
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
-
- // should not throw an exception
- mbeanServer.getObjectInstance(queueViewMBeanName);
-
- tQueue.delete();
- Thread.sleep(1000);
- try {
- // should throw an exception
- mbeanServer.getObjectInstance(queueViewMBeanName);
-
- fail("should be deleted already!");
- } catch (Exception e) {
- // expected!
- }
-
- }
-
- // Test for AMQ-3029
- public void testBrowseBlobMessages() throws Exception {
- connection = connectionFactory.createConnection();
- useConnectionWithBlobMessage(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- if (initialQueueSize == 0) {
- fail("There is no message in the queue:");
- }
- else {
- echo("Current queue size: " + initialQueueSize);
- }
- int messageCount = initialQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
-
- messageIDs[i] = messageID;
- }
-
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
- }
-
- public void testBrowseBytesMessages() throws Exception {
+ public void testDynamicProducerView() throws Exception {
connection = connectionFactory.createConnection();
- useConnectionWithByteMessage(connection);
-
- ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
- QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- CompositeData[] compdatalist = queue.browse();
- int initialQueueSize = compdatalist.length;
- if (initialQueueSize == 0) {
- fail("There is no message in the queue:");
- }
- else {
- echo("Current queue size: " + initialQueueSize);
- }
- int messageCount = initialQueueSize;
- String[] messageIDs = new String[messageCount];
- for (int i = 0; i < messageCount; i++) {
- CompositeData cdata = compdatalist[i];
- String messageID = (String) cdata.get("JMSMessageID");
- assertNotNull("Should have a message ID for message " + i, messageID);
- messageIDs[i] = messageID;
-
- Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
- assertNotNull("should be a preview", preview);
- assertTrue("not empty", preview.length > 0);
- }
+ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+ BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+ assertTrue("broker is not a slave", !broker.isSlave());
+ assertEquals(0, broker.getDynamicDestinationProducers().length);
- // consume all the messages
- echo("Attempting to consume all bytes messages from: " + destination);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i=0; i<MESSAGE_COUNT; i++) {
- Message message = consumer.receive(5000);
- assertNotNull(message);
- assertTrue(message instanceof BytesMessage);
- }
- consumer.close();
- session.close();
- }
+ MessageProducer producer = session.createProducer(null);
+
+ Destination dest1 = session.createTopic("DynamicDest-1");
+ Destination dest2 = session.createTopic("DynamicDest-2");
+ Destination dest3 = session.createQueue("DynamicDest-3");
+
+ // Wait a bit to let the producer get registered.
+ Thread.sleep(100);
+
+ assertEquals(1, broker.getDynamicDestinationProducers().length);
+
+ ObjectName viewName = broker.getDynamicDestinationProducers()[0];
+ assertNotNull(viewName);
+ ProducerViewMBean view = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true);
+ assertNotNull(view);
+
+ assertEquals("NOTSET", view.getDestinationName());
+
+ producer.send(dest1, session.createTextMessage("Test Message 1"));
+ Thread.sleep(200);
+ assertEquals(((ActiveMQDestination)dest1).getPhysicalName(), view.getDestinationName());
+ assertTrue(view.isDestinationTopic());
+ assertFalse(view.isDestinationQueue());
+ assertFalse(view.isDestinationTemporary());
+
+ producer.send(dest2, session.createTextMessage("Test Message 2"));
+ Thread.sleep(200);
+ assertEquals(((ActiveMQDestination)dest2).getPhysicalName(), view.getDestinationName());
+ assertTrue(view.isDestinationTopic());
+ assertFalse(view.isDestinationQueue());
+ assertFalse(view.isDestinationTemporary());
+
+ producer.send(dest3, session.createTextMessage("Test Message 3"));
+ Thread.sleep(200);
+ assertEquals(((ActiveMQDestination)dest3).getPhysicalName(), view.getDestinationName());
+ assertTrue(view.isDestinationQueue());
+ assertFalse(view.isDestinationTopic());
+ assertFalse(view.isDestinationTemporary());
+
+ producer.close();
+ Thread.sleep(200);
+ assertEquals(0, broker.getDynamicDestinationProducers().length);
+ }
+
+// public void testTempQueueJMXDelete() throws Exception {
+// connection = connectionFactory.createConnection();
+//
+// connection.setClientID(clientID);
+// connection.start();
+// Session session = connection.createSession(transacted, authMode);
+// ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
+// Thread.sleep(1000);
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
+//
+// // should not throw an exception
+// mbeanServer.getObjectInstance(queueViewMBeanName);
+//
+// tQueue.delete();
+// Thread.sleep(1000);
+// try {
+// // should throw an exception
+// mbeanServer.getObjectInstance(queueViewMBeanName);
+//
+// fail("should be deleted already!");
+// } catch (Exception e) {
+// // expected!
+// }
+//
+// }
+//
+// // Test for AMQ-3029
+// public void testBrowseBlobMessages() throws Exception {
+// connection = connectionFactory.createConnection();
+// useConnectionWithBlobMessage(connection);
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// CompositeData[] compdatalist = queue.browse();
+// int initialQueueSize = compdatalist.length;
+// if (initialQueueSize == 0) {
+// fail("There is no message in the queue:");
+// }
+// else {
+// echo("Current queue size: " + initialQueueSize);
+// }
+// int messageCount = initialQueueSize;
+// String[] messageIDs = new String[messageCount];
+// for (int i = 0; i < messageCount; i++) {
+// CompositeData cdata = compdatalist[i];
+// String messageID = (String) cdata.get("JMSMessageID");
+// assertNotNull("Should have a message ID for message " + i, messageID);
+//
+// messageIDs[i] = messageID;
+// }
+//
+// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+// }
+//
+// public void testBrowseBytesMessages() throws Exception {
+// connection = connectionFactory.createConnection();
+// useConnectionWithByteMessage(connection);
+//
+// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+// CompositeData[] compdatalist = queue.browse();
+// int initialQueueSize = compdatalist.length;
+// if (initialQueueSize == 0) {
+// fail("There is no message in the queue:");
+// }
+// else {
+// echo("Current queue size: " + initialQueueSize);
+// }
+// int messageCount = initialQueueSize;
+// String[] messageIDs = new String[messageCount];
+// for (int i = 0; i < messageCount; i++) {
+// CompositeData cdata = compdatalist[i];
+// String messageID = (String) cdata.get("JMSMessageID");
+// assertNotNull("Should have a message ID for message " + i, messageID);
+// messageIDs[i] = messageID;
+//
+// Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
+// assertNotNull("should be a preview", preview);
+// assertTrue("not empty", preview.length > 0);
+// }
+//
+// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//
+// // consume all the messages
+// echo("Attempting to consume all bytes messages from: " + destination);
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createConsumer(destination);
+// for (int i=0; i<MESSAGE_COUNT; i++) {
+// Message message = consumer.receive(5000);
+// assertNotNull(message);
+// assertTrue(message instanceof BytesMessage);
+// }
+// consumer.close();
+// session.close();
+// }
}