You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/12/06 13:16:23 UTC

activemq-artemis git commit: more amqp work plus test fixes and API enhancements

Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 82299f4de -> bfc1f0be6


more amqp work plus test fixes and API enhancements


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bfc1f0be
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bfc1f0be
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bfc1f0be

Branch: refs/heads/ARTEMIS-780
Commit: bfc1f0be6982b8ecf2c823cb40f1d7a41cd27865
Parents: 82299f4
Author: Andy Taylor <an...@gmail.com>
Authored: Tue Dec 6 13:14:51 2016 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue Dec 6 13:15:50 2016 +0000

----------------------------------------------------------------------
 .../activemq/cli/test/FileBrokerTest.java       |  24 ----
 .../core/management/ActiveMQServerControl.java  |  59 +++++++++
 .../amqp/broker/AMQPSessionCallback.java        |   4 +
 .../protocol/amqp/proton/AmqpSupport.java       |   1 +
 .../proton/ProtonServerReceiverContext.java     |  11 ++
 .../amqp/proton/ProtonServerSenderContext.java  |   6 +
 .../amqp/proton/handler/ExtCapability.java      |   2 +-
 .../artemis/rest/test/FindDestinationTest.java  |   3 +
 .../impl/ActiveMQServerControlImpl.java         | 120 +++++++++++++------
 .../cluster/impl/ClusterConnectionImpl.java     |   2 +-
 .../amqp/AmqpTempDestinationTest.java           |   2 -
 .../management/ActiveMQServerControlTest.java   |  87 +++++++-------
 .../ActiveMQServerControlUsingCoreTest.java     |  15 +++
 .../management/ManagementControlHelper.java     |   8 ++
 14 files changed, 238 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
index a50a49f..b04b540 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
@@ -45,30 +45,6 @@ import static org.junit.Assert.fail;
 public class FileBrokerTest {
 
    @Test
-   public void startWithJMS() throws Exception {
-      ServerDTO serverDTO = new ServerDTO();
-      serverDTO.configuration = "broker.xml";
-      FileBroker broker = null;
-      try {
-         broker = new FileBroker(serverDTO, new ActiveMQJAASSecurityManager());
-         broker.start();
-         JMSServerManagerImpl jmsServerManager = (JMSServerManagerImpl) broker.getComponents().get("jms");
-         Assert.assertNotNull(jmsServerManager);
-         Assert.assertTrue(jmsServerManager.isStarted());
-         //this tells us the jms server is activated
-         Assert.assertTrue(jmsServerManager.getJMSStorageManager().isStarted());
-         ActiveMQServerImpl activeMQServer = (ActiveMQServerImpl) broker.getComponents().get("core");
-         Assert.assertNotNull(activeMQServer);
-         Assert.assertTrue(activeMQServer.isStarted());
-         Assert.assertTrue(broker.isStarted());
-      } finally {
-         if (broker != null) {
-            broker.stop();
-         }
-      }
-   }
-
-   @Test
    public void startWithoutJMS() throws Exception {
       ServerDTO serverDTO = new ServerDTO();
       serverDTO.configuration = "broker-nojms.xml";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 1797c9a..abd8e9e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -451,10 +451,29 @@ public interface ActiveMQServerControl {
     * @param address address to bind the queue to
     * @param name    name of the queue
     */
+   @Deprecated
    @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
    void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
                     @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
 
+
+   /**
+    * Create a durable queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+
    /**
     * Create a queue.
     * <br>
@@ -466,6 +485,7 @@ public interface ActiveMQServerControl {
     * @param name    name of the queue
     * @param durable whether the queue is durable
     */
+   @Deprecated
    @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
    void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
                     @Parameter(name = "name", desc = "Name of the queue") String name,
@@ -480,6 +500,24 @@ public interface ActiveMQServerControl {
     *
     * @param address address to bind the queue to
     * @param name    name of the queue
+    * @param durable whether the queue is durable
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
     * @param filter  of the queue
     * @param durable whether the queue is durable
     */
@@ -496,6 +534,27 @@ public interface ActiveMQServerControl {
     * <br>
     * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
     *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
+    * @param filter  of the queue
+    * @param durable whether the queue is durable
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "filter", desc = "Filter of the queue") String filter,
+                    @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
     * @param address             address to bind the queue to
     * @param routingType         the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
     * @param name                name of the queue

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index a1928be..c870eec 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -563,4 +563,8 @@ public class AMQPSessionCallback implements SessionCallback {
    public AddressInfo getAddress(SimpleString address) {
       return  serverSession.getAddress(address);
    }
+
+   public void removeTemporaryQueue(String address) throws Exception {
+      serverSession.deleteQueue(SimpleString.toSimpleString(address));
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 7bdbd2e..ff398dc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -55,6 +55,7 @@ public class AmqpSupport {
    public static final Symbol PLATFORM = Symbol.valueOf("platform");
    public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
    public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
+   public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
 
 
    // Symbols used in configuration of newly opened links.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 515acc3..446edc0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
@@ -56,6 +57,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
    private static int minCreditRefresh = 30;
+   private TerminusExpiryPolicy expiryPolicy;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
                                       AMQPConnectionContext connection,
@@ -88,6 +90,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
+            expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
             target.setAddress(address);
          } else {
             //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
@@ -165,6 +168,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+      if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+         try {
+            sessionSPI.removeTemporaryQueue(target.getAddress());
+         } catch (Exception e) {
+            //ignore on close, its temp anyway and will be removed later
+         }
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index c0b9643..7e9a243 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -446,6 +446,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                      sessionSPI.deleteQueue(queue);
                   }
                }
+            } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+               try {
+                  sessionSPI.removeTemporaryQueue(source.getAddress());
+               } catch (Exception e) {
+                  //ignore on close, its temp anyway and will be removed later
+               }
             }
          }
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
index 6325ff6..931efa7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
@@ -22,7 +22,7 @@ import org.apache.qpid.proton.engine.Connection;
 
 public class ExtCapability {
 
-   public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY};
+   public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS};
 
    public static Symbol[] getCapabilities() {
       return capabilities;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
index db23f56..be14056 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.test;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.jboss.resteasy.client.ClientRequest;
 import org.jboss.resteasy.client.ClientResponse;
 import org.jboss.resteasy.spi.Link;
@@ -30,6 +31,7 @@ public class FindDestinationTest extends MessageTestBase {
    @Test
    public void testFindQueue() throws Exception {
       String testName = "testFindQueue";
+      server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
       server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false);
 
       ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName));
@@ -60,6 +62,7 @@ public class FindDestinationTest extends MessageTestBase {
 
    @Test
    public void testFindTopic() throws Exception {
+      server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
       server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false);
       ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic"));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4464062..841aa84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -619,7 +619,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       }
    }
 
-   @Deprecated
    @Override
    public void createQueue(final String address, final String name) throws Exception {
       checkStarted();
@@ -633,6 +632,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
+   public void createQueue(final String address, final String name, final String routingType) throws Exception {
+      checkStarted();
+
+      clearIO();
+      try {
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public void createQueue(final String address, final String name, final boolean durable) throws Exception {
       checkStarted();
 
@@ -645,35 +656,44 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
-   public void createQueue(String address,
-                           String routingType,
-                           String name,
-                           String filterStr,
-                           boolean durable,
-                           int maxConsumers,
-                           boolean deleteOnNoConsumers,
-                           boolean autoCreateAddress) throws Exception {
+   public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
+      try {
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false);
+      } finally {
+         blockOnIO();
+      }
+   }
 
-      SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+   @Override
+   public void createQueue(final String address,
+                           final String name,
+                           final String filterStr,
+                           final boolean durable) throws Exception {
+      checkStarted();
+
+      clearIO();
       try {
+         SimpleString filter = null;
          if (filterStr != null && !filterStr.trim().equals("")) {
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+         server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
       } finally {
          blockOnIO();
       }
    }
 
+
    @Override
    public void createQueue(final String address,
                            final String name,
                            final String filterStr,
-                           final boolean durable) throws Exception {
+                           final boolean durable,
+                           final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
@@ -683,12 +703,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public void createQueue(String address,
+                           String routingType,
+                           String name,
+                           String filterStr,
+                           boolean durable,
+                           int maxConsumers,
+                           boolean deleteOnNoConsumers,
+                           boolean autoCreateAddress) throws Exception {
+      checkStarted();
+
+      clearIO();
+
+      SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+      try {
+         if (filterStr != null && !filterStr.trim().equals("")) {
+            filter = new SimpleString(filterStr);
+         }
+
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
       } finally {
          blockOnIO();
       }
    }
 
+
    @Override
    public String[] getQueueNames() {
       checkStarted();
@@ -1704,30 +1750,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
          settings.add("expiryAddress", addressSettings.getExpiryAddress().toString());
       }
       return settings.add("expiryDelay", addressSettings.getExpiryDelay())
-                     .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
-                     .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
-                     .add("maxSizeBytes", addressSettings.getMaxSizeBytes())
-                     .add("pageSizeBytes", addressSettings.getPageSizeBytes())
-                     .add("redeliveryDelay", addressSettings.getRedeliveryDelay())
-                     .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
-                     .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
-                     .add("redistributionDelay", addressSettings.getRedistributionDelay())
-                     .add("lastValueQueue", addressSettings.isLastValueQueue())
-                     .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
-                     .add("addressFullMessagePolicy", policy)
-                     .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
-                     .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
-                     .add("slowConsumerPolicy", consumerPolicy)
-                     .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
-                     .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
-                     .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
-                     .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
-                     .add("autoCreateQueues", addressSettings.isAutoCreateQueues())
-                     .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
-                     .add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
-                     .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
-                     .build()
-                     .toString();
+            .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
+            .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
+            .add("maxSizeBytes", addressSettings.getMaxSizeBytes())
+            .add("pageSizeBytes", addressSettings.getPageSizeBytes())
+            .add("redeliveryDelay", addressSettings.getRedeliveryDelay())
+            .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
+            .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
+            .add("redistributionDelay", addressSettings.getRedistributionDelay())
+            .add("lastValueQueue", addressSettings.isLastValueQueue())
+            .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
+            .add("addressFullMessagePolicy", policy)
+            .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
+            .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
+            .add("slowConsumerPolicy", consumerPolicy)
+            .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
+            .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
+            .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
+            .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
+            .add("autoCreateQueues", addressSettings.isAutoCreateQueues())
+            .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
+            .add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
+            .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
+            .build()
+            .toString();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 2ae2329..c997dab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -720,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
                } else {
                   // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
                   // actually routed to at that address though
-                  queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
+                  queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false, -1, false, true);
                }
 
                // There are a few things that will behave differently when it's an internal queue

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
index d7874e3..4dbe21e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
@@ -111,8 +111,6 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
 
       sender.close();
 
-      Thread.sleep(10000);
-
       queueView = getProxyToQueue(remoteTarget.getAddress());
       assertNull(queueView);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 8de7678..bd2392e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.jlibaio.LibaioContext;
@@ -185,12 +186,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       ActiveMQServerControl serverControl = createManagementControl();
 
-      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
-      serverControl.createQueue(address.toString(), name.toString());
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
 
-      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
       Assert.assertEquals(address.toString(), queueControl.getAddress());
       Assert.assertEquals(name.toString(), queueControl.getName());
       Assert.assertNull(queueControl.getFilter());
@@ -211,12 +212,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       ActiveMQServerControl serverControl = createManagementControl();
 
-      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
-      serverControl.createQueue(address.toString(), name.toString(), filter, durable);
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(),  "ANYCAST", name.toString(), filter, durable, -1, false, false);
 
-      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
       Assert.assertEquals(address.toString(), queueControl.getAddress());
       Assert.assertEquals(name.toString(), queueControl.getName());
       Assert.assertEquals(filter, queueControl.getFilter());
@@ -236,12 +237,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       ActiveMQServerControl serverControl = createManagementControl();
 
-      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
-      serverControl.createQueue(address.toString(), name.toString(), durable);
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
 
-      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
       Assert.assertEquals(address.toString(), queueControl.getAddress());
       Assert.assertEquals(name.toString(), queueControl.getName());
       Assert.assertNull(queueControl.getFilter());
@@ -264,12 +265,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       ActiveMQServerControl serverControl = createManagementControl();
 
-      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
       serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
 
       checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
       Assert.assertEquals(address.toString(), queueControl.getAddress());
       Assert.assertEquals(name.toString(), queueControl.getName());
       Assert.assertNull(queueControl.getFilter());
@@ -297,8 +298,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       ActiveMQServerControl serverControl = createManagementControl();
 
       checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
-      serverControl.createQueue(address.toString(), name.toString(), durable);
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
 
       ServerLocator receiveLocator = createInVMNonHALocator();
       ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
@@ -307,7 +308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       Assert.assertFalse(consumer.isClosed());
 
-      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
       serverControl.destroyQueue(name.toString(), true);
       Wait.waitFor(new Wait.Condition() {
          @Override
@@ -329,12 +330,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       ActiveMQServerControl serverControl = createManagementControl();
 
-      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
-      serverControl.createQueue(address.toString(), name.toString(), filter, durable);
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
 
-      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
       Assert.assertEquals(address.toString(), queueControl.getAddress());
       Assert.assertEquals(name.toString(), queueControl.getName());
       Assert.assertNull(queueControl.getFilter());
@@ -355,12 +356,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
 
       ActiveMQServerControl serverControl = createManagementControl();
 
-      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
 
-      serverControl.createQueue(address.toString(), name.toString(), filter, durable);
-
-      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
       Assert.assertEquals(address.toString(), queueControl.getAddress());
       Assert.assertEquals(name.toString(), queueControl.getName());
       Assert.assertNull(queueControl.getFilter());
@@ -383,8 +384,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       // management operations
 
       Assert.assertFalse(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames()));
-
-      serverControl.createQueue(address.toString(), name.toString());
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
       Assert.assertTrue(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames()));
 
       serverControl.destroyQueue(name.toString());
@@ -402,8 +403,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       // management operations
 
       Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
-
-      serverControl.createQueue(address.toString(), name.toString());
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
       Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
 
       serverControl.destroyQueue(name.toString());
@@ -1212,7 +1213,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       ServerLocator locator = createInVMNonHALocator();
       ClientSessionFactory factory = createSessionFactory(locator);
       ClientSession session = addClientSession(factory.createSession());
-      server.createQueue(queueName, queueName, null, false, false);
+      server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
       addClientConsumer(session.createConsumer(queueName));
       addClientConsumer(session.createConsumer(queueName, SimpleString.toSimpleString(filter), true));
 
@@ -1268,8 +1270,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       ServerLocator locator2 = createInVMNonHALocator();
       ClientSessionFactory factory2 = createSessionFactory(locator2);
       ClientSession session2 = addClientSession(factory2.createSession());
-
-      server.createQueue(queueName, queueName, null, false, false);
+      serverControl.createAddress(queueName.toString(), "ANYCAST");
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
 
       addClientConsumer(session.createConsumer(queueName));
       Thread.sleep(200);
@@ -1334,7 +1336,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
    @Test
    public void testListSessionsAsJSON() throws Exception {
       SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
-      server.createQueue(queueName, queueName, null, false, false);
+      server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
       ActiveMQServerControl serverControl = createManagementControl();
 
       ServerLocator locator = createInVMNonHALocator();
@@ -1399,8 +1402,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
 
       server2.start();
-      server.createQueue(address, address, null, true, false);
-      server2.createQueue(address, address, null, true, false);
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
+      server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
       ServerLocator locator = createInVMNonHALocator();
       ClientSessionFactory csf = createSessionFactory(locator);
       ClientSession session = csf.createSession();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 280fdc4..2831f79 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -127,6 +127,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public void createQueue(String address, String name, String routingType) throws Exception {
+            proxy.invokeOperation("createQueue", address, name, routingType);
+         }
+
+         @Override
+         public void createQueue(String address, String name, boolean durable, String routingType) throws Exception {
+            proxy.invokeOperation("createQueue", address, name, durable, routingType);
+         }
+
+         @Override
+         public void createQueue(String address,String name, String filter, boolean durable, String routingType) throws Exception {
+            proxy.invokeOperation("createQueue", address, name, filter, durable, routingType);
+         }
+
+         @Override
          public void createQueue(final String address, final String name, final boolean durable) throws Exception {
             proxy.invokeOperation("createQueue", address, name, durable);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
index 6bc8f3d..11785e4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
 import org.apache.activemq.artemis.api.core.management.DivertControl;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.server.RoutingType;
 
 public class ManagementControlHelper {
 
@@ -73,6 +74,13 @@ public class ManagementControlHelper {
       return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType()), QueueControl.class, mbeanServer);
    }
 
+   public static QueueControl createQueueControl(final SimpleString address,
+                                                 final SimpleString name,
+                                                 final RoutingType routingType,
+                                                 final MBeanServer mbeanServer) throws Exception {
+      return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, routingType), QueueControl.class, mbeanServer);
+   }
+
    public static AddressControl createAddressControl(final SimpleString address,
                                                      final MBeanServer mbeanServer) throws Exception {
       return (AddressControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getAddressObjectName(address), AddressControl.class, mbeanServer);