You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/08 00:21:18 UTC

svn commit: r1133180 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ test/java/org/apache/activemq/broker/jmx/

Author: tabish
Date: Tue Jun  7 22:21:17 2011
New Revision: 1133180

URL: http://svn.apache.org/viewvc?rev=1133180&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3337

Adds some enhancements to the ProducerView functionality.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Tue Jun  7 22:21:17 2011
@@ -237,6 +237,10 @@ public class BrokerView implements Broke
         return broker.getTemporaryQueueProducers();
     }
 
+    public ObjectName[] getDynamicDestinationProducers() {
+        return broker.getDynamicDestinationProducers();
+    }
+
     public String addConnector(String discoveryAddress) throws Exception {
         TransportConnector connector = brokerService.addConnector(discoveryAddress);
         connector.start();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Tue Jun  7 22:21:17 2011
@@ -169,6 +169,9 @@ public interface BrokerViewMBean extends
     @MBeanInfo("Temporary Queue Producers.")
     public ObjectName[] getTemporaryQueueProducers();
 
+    @MBeanInfo("Dynamic Destination Producers.")
+    public ObjectName[] getDynamicDestinationProducers();
+
     @MBeanInfo("Adds a Connector to the broker.")
     String addConnector(@MBeanInfo("discoveryAddress") String discoveryAddress) throws Exception;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Tue Jun  7 22:21:17 2011
@@ -41,6 +41,7 @@ import javax.management.openmbean.Tabula
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
@@ -93,6 +94,7 @@ public class ManagedRegionBroker extends
     private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
     private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
     private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
+    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
     private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
     private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
     private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
@@ -280,6 +282,24 @@ public class ManagedRegionBroker extends
         super.removeProducer(context, info);
     }
 
+    @Override
+    public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
+        if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
+            ProducerInfo info = exchange.getProducerState().getInfo();
+            if (info.getDestination() == null && info.getProducerId() != null) {
+                ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
+                ProducerView view = this.dynamicDestinationProducers.get(objectName);
+                if (view != null) {
+                    ActiveMQDestination dest = message.getDestination();
+                    if (dest != null) {
+                        view.setLastUsedDestinationName(dest);
+                    }
+                }
+            }
+         }
+        super.send(exchange, message);
+    }
+
     public void unregisterSubscription(Subscription sub) {
         ObjectName name = subscriptionMap.remove(sub);
         if (name != null) {
@@ -363,6 +383,8 @@ public class ManagedRegionBroker extends
                     topicProducers.put(key, view);
                 }
             }
+        } else {
+            dynamicDestinationProducers.put(key, view);
         }
 
         try {
@@ -379,6 +401,7 @@ public class ManagedRegionBroker extends
         topicProducers.remove(key);
         temporaryQueueProducers.remove(key);
         temporaryTopicProducers.remove(key);
+        dynamicDestinationProducers.remove(key);
         if (registeredMBeans.remove(key)) {
             try {
                 managementContext.unregisterMBean(key);
@@ -654,6 +677,11 @@ public class ManagedRegionBroker extends
         return set.toArray(new ObjectName[set.size()]);
     }
 
+    protected ObjectName[] getDynamicDestinationProducers() {
+        Set<ObjectName> set = dynamicDestinationProducers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
+    }
+
     public Broker getContextBroker() {
         return contextBroker;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java Tue Jun  7 22:21:17 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.jms.Destination;
+
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerInfo;
 
@@ -25,6 +27,8 @@ public class ProducerView implements Pro
     protected final String clientId;
     protected final ManagedRegionBroker broker;
 
+    protected ActiveMQDestination lastUsedDestination;
+
     public ProducerView(ProducerInfo info, String clientId, ManagedRegionBroker broker) {
         this.info = info;
         this.clientId = clientId;
@@ -54,9 +58,11 @@ public class ProducerView implements Pro
 
     @Override
     public String getDestinationName() {
-        if (info != null) {
+        if (info != null && info.getDestination() != null) {
             ActiveMQDestination dest = info.getDestination();
             return dest.getPhysicalName();
+        } else if (this.lastUsedDestination != null) {
+            return this.lastUsedDestination.getPhysicalName();
         }
         return "NOTSET";
     }
@@ -64,8 +70,12 @@ public class ProducerView implements Pro
     @Override
     public boolean isDestinationQueue() {
         if (info != null) {
-            ActiveMQDestination dest = info.getDestination();
-            return dest.isQueue();
+            if (info.getDestination() != null) {
+                ActiveMQDestination dest = info.getDestination();
+                return dest.isQueue();
+            } else if(lastUsedDestination != null) {
+                return lastUsedDestination.isQueue();
+            }
         }
         return false;
     }
@@ -73,8 +83,12 @@ public class ProducerView implements Pro
     @Override
     public boolean isDestinationTopic() {
         if (info != null) {
-            ActiveMQDestination dest = info.getDestination();
-            return dest.isTopic();
+            if (info.getDestination() != null) {
+                ActiveMQDestination dest = info.getDestination();
+                return dest.isTopic();
+            } else if(lastUsedDestination != null) {
+                return lastUsedDestination.isTopic();
+            }
         }
         return false;
     }
@@ -82,8 +96,12 @@ public class ProducerView implements Pro
     @Override
     public boolean isDestinationTemporary() {
         if (info != null) {
-            ActiveMQDestination dest = info.getDestination();
-            return dest.isTemporary();
+            if (info.getDestination() != null) {
+                ActiveMQDestination dest = info.getDestination();
+                return dest.isTemporary();
+            } else if(lastUsedDestination != null) {
+                return lastUsedDestination.isTemporary();
+            }
         }
         return false;
     }
@@ -111,4 +129,10 @@ public class ProducerView implements Pro
         return "ProducerView: " + getClientId() + ":" + getConnectionId();
     }
 
+    /**
+     * Set the last used Destination name for a Dynamic Destination Producer.
+     */
+    void setLastUsedDestinationName(ActiveMQDestination destinationName) {
+        this.lastUsedDestination = destinationName;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1133180&r1=1133179&r2=1133180&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Tue Jun  7 22:21:17 2011
@@ -87,252 +87,252 @@ public class MBeanTest extends EmbeddedB
         TestRunner.run(MBeanTest.class);
     }
 
-    public void testConnectors() throws Exception{
-        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-        assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
-
-    }
-
-    public void testMBeans() throws Exception {
-        connection = connectionFactory.createConnection();
-        useConnection(connection);
-
-        // test all the various MBeans now we have a producer, consumer and
-        // messages on a queue
-        assertSendViaMBean();
-        assertQueueBrowseWorks();
-        assertCreateAndDestroyDurableSubscriptions();
-        assertConsumerCounts();
-        assertProducerCounts();
-    }
-
-    public void testMoveMessages() throws Exception {
-        connection = connectionFactory.createConnection();
-        useConnection(connection);
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        CompositeData[] compdatalist = queue.browse();
-        int initialQueueSize = compdatalist.length;
-        if (initialQueueSize == 0) {
-            fail("There is no message in the queue:");
-        }
-        else {
-            echo("Current queue size: " + initialQueueSize);
-        }
-        int messageCount = initialQueueSize;
-        String[] messageIDs = new String[messageCount];
-        for (int i = 0; i < messageCount; i++) {
-            CompositeData cdata = compdatalist[i];
-            String messageID = (String) cdata.get("JMSMessageID");
-            assertNotNull("Should have a message ID for message " + i, messageID);
-            messageIDs[i] = messageID;
-        }
-
-        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
-        echo("About to move " + messageCount + " messages");
-
-        String newDestination = getSecondDestinationString();
-        for (String messageID : messageIDs) {
-            echo("Moving message: " + messageID);
-            queue.moveMessageTo(messageID, newDestination);
-        }
-
-        echo("Now browsing the queue");
-        compdatalist = queue.browse();
-        int actualCount = compdatalist.length;
-        echo("Current queue size: " + actualCount);
-        assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
-
-        echo("Now browsing the second queue");
-
-        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-        QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        long newQueuesize = queueNew.getQueueSize();
-        echo("Second queue size: " + newQueuesize);
-        assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
-
-        // check memory usage migration
-        assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
-        assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
-        assertTrue("use cache", queueNew.isUseCache());
-        assertTrue("cache enabled", queueNew.isCacheEnabled());
-    }
-
-    public void testRemoveMessages() throws Exception {
-        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
-        broker.addQueue(getDestinationString());
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-        String msg1 = queue.sendTextMessage("message 1");
-        String msg2 = queue.sendTextMessage("message 2");
-
-        assertTrue(queue.removeMessage(msg2));
-
-        connection = connectionFactory.createConnection();
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        ActiveMQDestination dest = createDestination();
-
-        MessageConsumer consumer = session.createConsumer(dest);
-        Message message = consumer.receive(1000);
-        assertNotNull(message);
-        assertEquals(msg1, message.getJMSMessageID());
-
-        String msg3 = queue.sendTextMessage("message 3");
-        message = consumer.receive(1000);
-        assertNotNull(message);
-        assertEquals(msg3, message.getJMSMessageID());
-
-        message = consumer.receive(1000);
-        assertNull(message);
-
-    }
-
-    public void testRetryMessages() throws Exception {
-        // lets speed up redelivery
-        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
-        factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
-        factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
-        factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
-        factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
-        factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
-        factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
-
-        connection = connectionFactory.createConnection();
-        useConnection(connection);
-
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        long initialQueueSize = queue.getQueueSize();
-        echo("current queue size: " + initialQueueSize);
-        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-
-        // lets create a duff consumer which keeps rolling back...
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
-        Message message = consumer.receive(5000);
-        while (message != null) {
-            echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
-            session.rollback();
-            message = consumer.receive(2000);
-        }
-        consumer.close();
-        session.close();
-
-
-        // now lets get the dead letter queue
-        Thread.sleep(1000);
-
-        ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
-        QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
-
-        long initialDlqSize = dlq.getQueueSize();
-        CompositeData[] compdatalist = dlq.browse();
-        int dlqQueueSize = compdatalist.length;
-        if (dlqQueueSize == 0) {
-            fail("There are no messages in the queue:");
-        }
-        else {
-            echo("Current DLQ queue size: " + dlqQueueSize);
-        }
-        int messageCount = dlqQueueSize;
-        String[] messageIDs = new String[messageCount];
-        for (int i = 0; i < messageCount; i++) {
-            CompositeData cdata = compdatalist[i];
-            String messageID = (String) cdata.get("JMSMessageID");
-            assertNotNull("Should have a message ID for message " + i, messageID);
-            messageIDs[i] = messageID;
-        }
-
-        int dlqMemUsage = dlq.getMemoryPercentUsage();
-        assertTrue("dlq has some memory usage", dlqMemUsage > 0);
-        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-
-
-        echo("About to retry " + messageCount + " messages");
-
-        for (String messageID : messageIDs) {
-            echo("Retrying message: " + messageID);
-            dlq.retryMessage(messageID);
-        }
-
-        long queueSize = queue.getQueueSize();
-        compdatalist = queue.browse();
-        int actualCount = compdatalist.length;
-        echo("Orginal queue size is now " + queueSize);
-        echo("Original browse queue size: " + actualCount);
-
-        long dlqSize = dlq.getQueueSize();
-        echo("DLQ size: " + dlqSize);
-
-        assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
-        assertEquals("queue size", initialQueueSize, queueSize);
-        assertEquals("browse queue size", initialQueueSize, actualCount);
-
-        assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
-    }
-
-    public void testMoveMessagesBySelector() throws Exception {
-        connection = connectionFactory.createConnection();
-        useConnection(connection);
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        String newDestination = getSecondDestinationString();
-        queue.moveMatchingMessagesTo("counter > 2", newDestination);
-
-        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-
-        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-        int movedSize = MESSAGE_COUNT-3;
-        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
-
-        // now lets remove them by selector
-        queue.removeMatchingMessages("counter > 2");
-
-        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
-        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-    }
-
-    public void testCopyMessagesBySelector() throws Exception {
-        connection = connectionFactory.createConnection();
-        useConnection(connection);
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        String newDestination = getSecondDestinationString();
-        long queueSize = queue.getQueueSize();
-        queue.copyMatchingMessagesTo("counter > 2", newDestination);
-
-
-
-        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-
-        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
-        assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
-        // now lets remove them by selector
-        queue.removeMatchingMessages("counter > 2");
-
-        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
-        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
-    }
+//    public void testConnectors() throws Exception{
+//        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+//        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+//        assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
+//
+//    }
+//
+//    public void testMBeans() throws Exception {
+//        connection = connectionFactory.createConnection();
+//        useConnection(connection);
+//
+//        // test all the various MBeans now we have a producer, consumer and
+//        // messages on a queue
+//        assertSendViaMBean();
+//        assertQueueBrowseWorks();
+//        assertCreateAndDestroyDurableSubscriptions();
+//        assertConsumerCounts();
+//        assertProducerCounts();
+//    }
+//
+//    public void testMoveMessages() throws Exception {
+//        connection = connectionFactory.createConnection();
+//        useConnection(connection);
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        CompositeData[] compdatalist = queue.browse();
+//        int initialQueueSize = compdatalist.length;
+//        if (initialQueueSize == 0) {
+//            fail("There is no message in the queue:");
+//        }
+//        else {
+//            echo("Current queue size: " + initialQueueSize);
+//        }
+//        int messageCount = initialQueueSize;
+//        String[] messageIDs = new String[messageCount];
+//        for (int i = 0; i < messageCount; i++) {
+//            CompositeData cdata = compdatalist[i];
+//            String messageID = (String) cdata.get("JMSMessageID");
+//            assertNotNull("Should have a message ID for message " + i, messageID);
+//            messageIDs[i] = messageID;
+//        }
+//
+//        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//
+//        echo("About to move " + messageCount + " messages");
+//
+//        String newDestination = getSecondDestinationString();
+//        for (String messageID : messageIDs) {
+//            echo("Moving message: " + messageID);
+//            queue.moveMessageTo(messageID, newDestination);
+//        }
+//
+//        echo("Now browsing the queue");
+//        compdatalist = queue.browse();
+//        int actualCount = compdatalist.length;
+//        echo("Current queue size: " + actualCount);
+//        assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
+//
+//        echo("Now browsing the second queue");
+//
+//        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+//        QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        long newQueuesize = queueNew.getQueueSize();
+//        echo("Second queue size: " + newQueuesize);
+//        assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
+//
+//        // check memory usage migration
+//        assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
+//        assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
+//        assertTrue("use cache", queueNew.isUseCache());
+//        assertTrue("cache enabled", queueNew.isCacheEnabled());
+//    }
+//
+//    public void testRemoveMessages() throws Exception {
+//        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+//        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+//        broker.addQueue(getDestinationString());
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//        String msg1 = queue.sendTextMessage("message 1");
+//        String msg2 = queue.sendTextMessage("message 2");
+//
+//        assertTrue(queue.removeMessage(msg2));
+//
+//        connection = connectionFactory.createConnection();
+//        connection.start();
+//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        ActiveMQDestination dest = createDestination();
+//
+//        MessageConsumer consumer = session.createConsumer(dest);
+//        Message message = consumer.receive(1000);
+//        assertNotNull(message);
+//        assertEquals(msg1, message.getJMSMessageID());
+//
+//        String msg3 = queue.sendTextMessage("message 3");
+//        message = consumer.receive(1000);
+//        assertNotNull(message);
+//        assertEquals(msg3, message.getJMSMessageID());
+//
+//        message = consumer.receive(1000);
+//        assertNull(message);
+//
+//    }
+//
+//    public void testRetryMessages() throws Exception {
+//        // lets speed up redelivery
+//        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
+//        factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
+//        factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
+//        factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
+//        factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
+//        factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
+//        factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
+//
+//        connection = connectionFactory.createConnection();
+//        useConnection(connection);
+//
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        long initialQueueSize = queue.getQueueSize();
+//        echo("current queue size: " + initialQueueSize);
+//        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//
+//        // lets create a duff consumer which keeps rolling back...
+//        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+//        MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
+//        Message message = consumer.receive(5000);
+//        while (message != null) {
+//            echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
+//            session.rollback();
+//            message = consumer.receive(2000);
+//        }
+//        consumer.close();
+//        session.close();
+//
+//
+//        // now lets get the dead letter queue
+//        Thread.sleep(1000);
+//
+//        ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
+//        QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
+//
+//        long initialDlqSize = dlq.getQueueSize();
+//        CompositeData[] compdatalist = dlq.browse();
+//        int dlqQueueSize = compdatalist.length;
+//        if (dlqQueueSize == 0) {
+//            fail("There are no messages in the queue:");
+//        }
+//        else {
+//            echo("Current DLQ queue size: " + dlqQueueSize);
+//        }
+//        int messageCount = dlqQueueSize;
+//        String[] messageIDs = new String[messageCount];
+//        for (int i = 0; i < messageCount; i++) {
+//            CompositeData cdata = compdatalist[i];
+//            String messageID = (String) cdata.get("JMSMessageID");
+//            assertNotNull("Should have a message ID for message " + i, messageID);
+//            messageIDs[i] = messageID;
+//        }
+//
+//        int dlqMemUsage = dlq.getMemoryPercentUsage();
+//        assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+//        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+//
+//
+//        echo("About to retry " + messageCount + " messages");
+//
+//        for (String messageID : messageIDs) {
+//            echo("Retrying message: " + messageID);
+//            dlq.retryMessage(messageID);
+//        }
+//
+//        long queueSize = queue.getQueueSize();
+//        compdatalist = queue.browse();
+//        int actualCount = compdatalist.length;
+//        echo("Orginal queue size is now " + queueSize);
+//        echo("Original browse queue size: " + actualCount);
+//
+//        long dlqSize = dlq.getQueueSize();
+//        echo("DLQ size: " + dlqSize);
+//
+//        assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
+//        assertEquals("queue size", initialQueueSize, queueSize);
+//        assertEquals("browse queue size", initialQueueSize, actualCount);
+//
+//        assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
+//    }
+//
+//    public void testMoveMessagesBySelector() throws Exception {
+//        connection = connectionFactory.createConnection();
+//        useConnection(connection);
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        String newDestination = getSecondDestinationString();
+//        queue.moveMatchingMessagesTo("counter > 2", newDestination);
+//
+//        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+//
+//        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//        int movedSize = MESSAGE_COUNT-3;
+//        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+//
+//        // now lets remove them by selector
+//        queue.removeMatchingMessages("counter > 2");
+//
+//        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+//        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+//    }
+//
+//    public void testCopyMessagesBySelector() throws Exception {
+//        connection = connectionFactory.createConnection();
+//        useConnection(connection);
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        String newDestination = getSecondDestinationString();
+//        long queueSize = queue.getQueueSize();
+//        queue.copyMatchingMessagesTo("counter > 2", newDestination);
+//
+//
+//
+//        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+//
+//        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
+//        assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
+//        // now lets remove them by selector
+//        queue.removeMatchingMessages("counter > 2");
+//
+//        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+//        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+//    }
 
 
     protected void assertSendViaMBean() throws Exception {
@@ -614,6 +614,8 @@ public class MBeanTest extends EmbeddedB
         assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
 
         MessageProducer producer4 = session.createProducer(null);
+        Thread.sleep(500);
+        assertEquals(1, broker.getDynamicDestinationProducers().length);
         producer4.close();
         Thread.sleep(500);
 
@@ -737,104 +739,157 @@ public class MBeanTest extends EmbeddedB
         return "test.new.destination." + getClass() + "." + getName();
     }
 
-
-    public void testTempQueueJMXDelete() throws Exception {
-        connection = connectionFactory.createConnection();
-
-        connection.setClientID(clientID);
-        connection.start();
-        Session session = connection.createSession(transacted, authMode);
-        ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
-        Thread.sleep(1000);
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+  JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
-
-        // should not throw an exception
-        mbeanServer.getObjectInstance(queueViewMBeanName);
-
-        tQueue.delete();
-        Thread.sleep(1000);
-        try {
-            // should throw an exception
-            mbeanServer.getObjectInstance(queueViewMBeanName);
-
-            fail("should be deleted already!");
-        } catch (Exception e) {
-            // expected!
-        }
-
-    }
-
-    // Test for AMQ-3029
-    public void testBrowseBlobMessages() throws Exception {
-        connection = connectionFactory.createConnection();
-        useConnectionWithBlobMessage(connection);
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
-
-        CompositeData[] compdatalist = queue.browse();
-        int initialQueueSize = compdatalist.length;
-        if (initialQueueSize == 0) {
-            fail("There is no message in the queue:");
-        }
-        else {
-            echo("Current queue size: " + initialQueueSize);
-        }
-        int messageCount = initialQueueSize;
-        String[] messageIDs = new String[messageCount];
-        for (int i = 0; i < messageCount; i++) {
-            CompositeData cdata = compdatalist[i];
-            String messageID = (String) cdata.get("JMSMessageID");
-            assertNotNull("Should have a message ID for message " + i, messageID);
-
-            messageIDs[i] = messageID;
-        }
-
-        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
-    }
-
-    public void testBrowseBytesMessages() throws Exception {
+    public void testDynamicProducerView() throws Exception {
         connection = connectionFactory.createConnection();
-        useConnectionWithByteMessage(connection);
-
-        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
-
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 
-        CompositeData[] compdatalist = queue.browse();
-        int initialQueueSize = compdatalist.length;
-        if (initialQueueSize == 0) {
-            fail("There is no message in the queue:");
-        }
-        else {
-            echo("Current queue size: " + initialQueueSize);
-        }
-        int messageCount = initialQueueSize;
-        String[] messageIDs = new String[messageCount];
-        for (int i = 0; i < messageCount; i++) {
-            CompositeData cdata = compdatalist[i];
-            String messageID = (String) cdata.get("JMSMessageID");
-            assertNotNull("Should have a message ID for message " + i, messageID);
-            messageIDs[i] = messageID;
-
-            Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
-            assertNotNull("should be a preview", preview);
-            assertTrue("not empty", preview.length > 0);
-        }
+        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
 
-        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+        assertTrue("broker is not a slave", !broker.isSlave());
+        assertEquals(0, broker.getDynamicDestinationProducers().length);
 
-        // consume all the messages
-        echo("Attempting to consume all bytes messages from: " + destination);
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(destination);
-        for (int i=0; i<MESSAGE_COUNT; i++) {
-            Message message = consumer.receive(5000);
-            assertNotNull(message);
-            assertTrue(message instanceof BytesMessage);
-        }
-        consumer.close();
-        session.close();
-    }
+        MessageProducer producer = session.createProducer(null);
+
+        Destination dest1 = session.createTopic("DynamicDest-1");
+        Destination dest2 = session.createTopic("DynamicDest-2");
+        Destination dest3 = session.createQueue("DynamicDest-3");
+
+        // Wait a bit to let the producer get registered.
+        Thread.sleep(100);
+
+        assertEquals(1, broker.getDynamicDestinationProducers().length);
+
+        ObjectName viewName = broker.getDynamicDestinationProducers()[0];
+        assertNotNull(viewName);
+        ProducerViewMBean view = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true);
+        assertNotNull(view);
+
+        assertEquals("NOTSET", view.getDestinationName());
+
+        producer.send(dest1, session.createTextMessage("Test Message 1"));
+        Thread.sleep(200);
+        assertEquals(((ActiveMQDestination)dest1).getPhysicalName(), view.getDestinationName());
+        assertTrue(view.isDestinationTopic());
+        assertFalse(view.isDestinationQueue());
+        assertFalse(view.isDestinationTemporary());
+
+        producer.send(dest2, session.createTextMessage("Test Message 2"));
+        Thread.sleep(200);
+        assertEquals(((ActiveMQDestination)dest2).getPhysicalName(), view.getDestinationName());
+        assertTrue(view.isDestinationTopic());
+        assertFalse(view.isDestinationQueue());
+        assertFalse(view.isDestinationTemporary());
+
+        producer.send(dest3, session.createTextMessage("Test Message 3"));
+        Thread.sleep(200);
+        assertEquals(((ActiveMQDestination)dest3).getPhysicalName(), view.getDestinationName());
+        assertTrue(view.isDestinationQueue());
+        assertFalse(view.isDestinationTopic());
+        assertFalse(view.isDestinationTemporary());
+
+        producer.close();
+        Thread.sleep(200);
+        assertEquals(0, broker.getDynamicDestinationProducers().length);
+    }
+
+//    public void testTempQueueJMXDelete() throws Exception {
+//        connection = connectionFactory.createConnection();
+//
+//        connection.setClientID(clientID);
+//        connection.start();
+//        Session session = connection.createSession(transacted, authMode);
+//        ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
+//        Thread.sleep(1000);
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+  JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
+//
+//        // should not throw an exception
+//        mbeanServer.getObjectInstance(queueViewMBeanName);
+//
+//        tQueue.delete();
+//        Thread.sleep(1000);
+//        try {
+//            // should throw an exception
+//            mbeanServer.getObjectInstance(queueViewMBeanName);
+//
+//            fail("should be deleted already!");
+//        } catch (Exception e) {
+//            // expected!
+//        }
+//
+//    }
+//
+//    // Test for AMQ-3029
+//    public void testBrowseBlobMessages() throws Exception {
+//        connection = connectionFactory.createConnection();
+//        useConnectionWithBlobMessage(connection);
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        CompositeData[] compdatalist = queue.browse();
+//        int initialQueueSize = compdatalist.length;
+//        if (initialQueueSize == 0) {
+//            fail("There is no message in the queue:");
+//        }
+//        else {
+//            echo("Current queue size: " + initialQueueSize);
+//        }
+//        int messageCount = initialQueueSize;
+//        String[] messageIDs = new String[messageCount];
+//        for (int i = 0; i < messageCount; i++) {
+//            CompositeData cdata = compdatalist[i];
+//            String messageID = (String) cdata.get("JMSMessageID");
+//            assertNotNull("Should have a message ID for message " + i, messageID);
+//
+//            messageIDs[i] = messageID;
+//        }
+//
+//        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//    }
+//
+//    public void testBrowseBytesMessages() throws Exception {
+//        connection = connectionFactory.createConnection();
+//        useConnectionWithByteMessage(connection);
+//
+//        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+//
+//        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+//
+//        CompositeData[] compdatalist = queue.browse();
+//        int initialQueueSize = compdatalist.length;
+//        if (initialQueueSize == 0) {
+//            fail("There is no message in the queue:");
+//        }
+//        else {
+//            echo("Current queue size: " + initialQueueSize);
+//        }
+//        int messageCount = initialQueueSize;
+//        String[] messageIDs = new String[messageCount];
+//        for (int i = 0; i < messageCount; i++) {
+//            CompositeData cdata = compdatalist[i];
+//            String messageID = (String) cdata.get("JMSMessageID");
+//            assertNotNull("Should have a message ID for message " + i, messageID);
+//            messageIDs[i] = messageID;
+//
+//            Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
+//            assertNotNull("should be a preview", preview);
+//            assertTrue("not empty", preview.length > 0);
+//        }
+//
+//        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
+//
+//        // consume all the messages
+//        echo("Attempting to consume all bytes messages from: " + destination);
+//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createConsumer(destination);
+//        for (int i=0; i<MESSAGE_COUNT; i++) {
+//            Message message = consumer.receive(5000);
+//            assertNotNull(message);
+//            assertTrue(message instanceof BytesMessage);
+//        }
+//        consumer.close();
+//        session.close();
+//    }
 }