You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/06/19 11:44:31 UTC

[1/2] activemq-artemis git commit: This closes #1337

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8dd6b712f -> 7610dadaf


This closes #1337


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7610dada
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7610dada
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7610dada

Branch: refs/heads/master
Commit: 7610dadaf36d4fb05cdf35f900c840ef5496a173
Parents: 8dd6b71 44b7e45
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Jun 19 12:44:13 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Jun 19 12:44:13 2017 +0100

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  6 ++
 .../artemis/jms/client/ActiveMQDestination.java |  6 +-
 .../artemis/jms/client/ActiveMQSession.java     |  6 +-
 .../amqp/broker/ProtonProtocolManager.java      |  3 +-
 .../client/AMQPClientConnectionFactory.java     |  4 +-
 .../amqp/proton/AMQPConnectionContext.java      |  8 ++
 .../amqp/proton/ProtonServerSenderContext.java  | 52 +++++++++---
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../core/protocol/openwire/amq/AMQConsumer.java |  2 +-
 .../ra/inflow/ActiveMQMessageHandler.java       |  2 +-
 .../artemis/core/config/Configuration.java      | 10 +++
 .../core/config/impl/ConfigurationImpl.java     | 14 ++++
 .../deployers/impl/FileConfigurationParser.java |  6 ++
 .../resources/schema/artemis-configuration.xsd  | 10 +++
 .../impl/DefaultsFileConfigurationTest.java     |  2 +
 .../core/config/impl/FileConfigurationTest.java |  1 +
 .../resources/ConfigurationTest-full-config.xml |  1 +
 .../amqp/ClientDefinedMultiConsumerTest.java    | 32 ++++----
 .../integration/amqp/JMSClientTestSupport.java  | 52 ++++++++++++
 .../amqp/JMSDurableConsumerTest.java            | 22 ++++++
 .../integration/amqp/JMSSharedConsumerTest.java | 79 ++++++-------------
 .../amqp/JMSSharedDurableConsumerTest.java      | 83 +++++++-------------
 22 files changed, 255 insertions(+), 148 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour

Posted by ma...@apache.org.
ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour

Use AcitveMQDestination for subscription naming, fixing and aligning queue naming in the process.

The change is behind a configuration toggle so to avoid causing any breaking changes for uses not expecting.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/44b7e455
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/44b7e455
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/44b7e455

Branch: refs/heads/master
Commit: 44b7e455cb93ca0151793ee24de40b0a7a58301b
Parents: 8dd6b71
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Tue Jun 13 09:23:33 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Jun 19 12:44:13 2017 +0100

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  6 ++
 .../artemis/jms/client/ActiveMQDestination.java |  6 +-
 .../artemis/jms/client/ActiveMQSession.java     |  6 +-
 .../amqp/broker/ProtonProtocolManager.java      |  3 +-
 .../client/AMQPClientConnectionFactory.java     |  4 +-
 .../amqp/proton/AMQPConnectionContext.java      |  8 ++
 .../amqp/proton/ProtonServerSenderContext.java  | 52 +++++++++---
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../core/protocol/openwire/amq/AMQConsumer.java |  2 +-
 .../ra/inflow/ActiveMQMessageHandler.java       |  2 +-
 .../artemis/core/config/Configuration.java      | 10 +++
 .../core/config/impl/ConfigurationImpl.java     | 14 ++++
 .../deployers/impl/FileConfigurationParser.java |  6 ++
 .../resources/schema/artemis-configuration.xsd  | 10 +++
 .../impl/DefaultsFileConfigurationTest.java     |  2 +
 .../core/config/impl/FileConfigurationTest.java |  1 +
 .../resources/ConfigurationTest-full-config.xml |  1 +
 .../amqp/ClientDefinedMultiConsumerTest.java    | 32 ++++----
 .../integration/amqp/JMSClientTestSupport.java  | 52 ++++++++++++
 .../amqp/JMSDurableConsumerTest.java            | 22 ++++++
 .../integration/amqp/JMSSharedConsumerTest.java | 79 ++++++-------------
 .../amqp/JMSSharedDurableConsumerTest.java      | 83 +++++++-------------
 22 files changed, 255 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 6bd0ae8..7ace35c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -442,6 +442,8 @@ public final class ActiveMQDefaultConfiguration {
    // Default period to wait between configuration file checks
    public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
 
+   public static final boolean DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING = false;
+
    public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2;
 
    public static final int DEFAULT_MAX_DISK_USAGE = 100;
@@ -1207,6 +1209,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD;
    }
 
+   public static boolean getDefaultAmqpUseCoreSubscriptionNaming() {
+      return DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING;
+   }
+
    /**
     * The default global max size. -1 = no global max size.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 297efe8..0bf4dd6 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -102,9 +102,9 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
       }
    }
 
-   public static String createQueueNameForDurableSubscription(final boolean isDurable,
-                                                              final String clientID,
-                                                              final String subscriptionName) {
+   public static String createQueueNameForSubscription(final boolean isDurable,
+                                                       final String clientID,
+                                                       final String subscriptionName) {
       if (clientID != null) {
          if (isDurable) {
             return ActiveMQDestination.escape(clientID) + SEPARATOR +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index a8aceec..374a985 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
             throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
          }
 
-         queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
+         queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
 
          if (durability == ConsumerDurability.DURABLE) {
             try {
@@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
                   throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
                }
 
-               queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), subscriptionName));
+               queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName));
 
                QueueQuery subResponse = session.queueQuery(queueName);
 
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
          throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
       }
 
-      SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), name));
+      SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name));
 
       try {
          QueueQuery response = session.queueQuery(queueName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index b5325fb..d36f18e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -116,7 +116,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
       }
 
       String id = server.getConfiguration().getName();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
+      boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
 
       Executor executor = server.getExecutorFactory().getExecutor();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 6aa8fda..4e532bb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -39,12 +39,14 @@ public class AMQPClientConnectionFactory {
    private final String containerId;
    private final Map<Symbol, Object> connectionProperties;
    private final int ttl;
+   private final boolean useCoreSubscriptionNaming;
 
    public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, Map<Symbol, Object> connectionProperties, int ttl) {
       this.server = server;
       this.containerId = containerId;
       this.connectionProperties = connectionProperties;
       this.ttl = ttl;
+      this.useCoreSubscriptionNaming = false;
    }
 
    public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
@@ -52,7 +54,7 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 4a46a8a..0ab4171 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -74,16 +74,20 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    private final ProtonProtocolManager protocolManager;
 
+   private final boolean useCoreSubscriptionNaming;
+
    public AMQPConnectionContext(ProtonProtocolManager protocolManager,
                                 AMQPConnectionCallback connectionSP,
                                 String containerId,
                                 int idleTimeout,
                                 int maxFrameSize,
                                 int channelMax,
+                                boolean useCoreSubscriptionNaming,
                                 ScheduledExecutorService scheduledPool) {
 
       this.protocolManager = protocolManager;
       this.connectionCallback = connectionSP;
+      this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
       this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
 
       connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
@@ -260,6 +264,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       }
    }
 
+   public boolean isUseCoreSubscriptionNaming() {
+      return useCoreSubscriptionNaming;
+   }
+
    @Override
    public void onInit(Connection connection) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ad8bee7..8f8222b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
@@ -68,6 +69,7 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
@@ -188,7 +190,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // subscription queue
          String clientId = getClientId();
          String pubId = sender.getName();
-         queue = createQueueName(clientId, pubId, true, global, false);
+         global = hasRemoteDesiredCapability(sender, GLOBAL);
+         queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
          QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
          multicast = true;
          routingTypeToUse = RoutingType.MULTICAST;
@@ -343,7 +346,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                // id and link name
                String clientId = getClientId();
                String pubId = sender.getName();
-               queue = createQueueName(clientId, pubId, shared, global, false);
+               queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
                QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
 
                if (result.isExists()) {
@@ -369,7 +372,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                // otherwise we are a volatile subscription
                isVolatile = true;
                if (shared && sender.getName() != null) {
-                  queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile);
+                  queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
                   try {
                      sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                   } catch (ActiveMQQueueExistsException e) {
@@ -493,7 +496,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                      if (pubId.contains("|")) {
                         pubId = pubId.split("\\|")[0];
                      }
-                     String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+                     String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
                      result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
                      //only delete if it isn't volatile and has no consumers
                      if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@@ -733,20 +736,43 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(String clientId,
+   private static boolean hasRemoteDesiredCapability(Link link, Symbol capability) {
+      Symbol[] remoteDesiredCapabilities = link.getRemoteDesiredCapabilities();
+      if (remoteDesiredCapabilities != null) {
+         for (Symbol cap : remoteDesiredCapabilities) {
+            if (capability.equals(cap)) {
+               return true;
+            }
+         }
+      }
+      return false;
+   }
+
+   private static String createQueueName(boolean useCoreSubscriptionNaming,
+                                         String clientId,
                                          String pubId,
                                          boolean shared,
                                          boolean global,
                                          boolean isVolatile) {
-      String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
-      if (shared) {
-         if (queue.contains("|")) {
-            queue = queue.split("\\|")[0];
-         }
-         if (isVolatile) {
-            queue = "nonDurable" + "." + queue;
+      if (useCoreSubscriptionNaming) {
+         final boolean durable = !isVolatile;
+         final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
+         final String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId;
+         return ActiveMQDestination.createQueueNameForSubscription(durable, clientID, subscriptionName);
+      } else {
+         String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
+         if (shared) {
+            if (queue.contains("|")) {
+               queue = queue.split("\\|")[0];
+            }
+            if (isVolatile) {
+               queue += ":shared-volatile";
+            }
+            if (global) {
+               queue += ":global";
+            }
          }
+         return queue;
       }
-      return queue;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index c63d266..a56901e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1093,7 +1093,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-         SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+         SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
          server.destroyQueue(subQueueName);
 
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3bdee8b..6bba4e1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -150,7 +150,7 @@ public class AMQConsumer {
          addressInfo.addRoutingType(RoutingType.MULTICAST);
       }
       if (isDurable) {
-         queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
+         queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
          QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
          if (result.isExists()) {
             // Already exists

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 4ff6357..bb2f870 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -112,7 +112,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
       // Create the message consumer
       SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
       if (activation.isTopic() && spec.isSubscriptionDurable()) {
-         SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
+         SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
 
          QueueQuery subResponse = session.queueQuery(queueName);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index aad3968..9d5d4a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -290,6 +290,16 @@ public interface Configuration {
    Configuration setConnectionTTLOverride(long ttl);
 
    /**
+    * Returns if to use Core subscription naming for AMQP.
+    */
+   boolean isAmqpUseCoreSubscriptionNaming();
+
+   /**
+    * Sets if to use Core subscription naming for AMQP.
+    */
+   Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming);
+
+   /**
     * Returns whether code coming from connection is executed asynchronously or not. <br>
     * Default value is
     * {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED}.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 8edeb5b..fb678b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -262,6 +262,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private Long globalMaxSize;
 
+   private boolean amqpUseCoreSubscriptionNaming = ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming();
+
    private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
 
    private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod();
@@ -454,6 +456,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public boolean isAmqpUseCoreSubscriptionNaming() {
+      return amqpUseCoreSubscriptionNaming;
+   }
+
+   @Override
+   public Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming) {
+      this.amqpUseCoreSubscriptionNaming = amqpUseCoreSubscriptionNaming;
+      return this;
+   }
+
+
+   @Override
    public boolean isAsyncConnectionExecutionEnabled() {
       return asyncConnectionExecutionEnabled;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 5243325..a3d59f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -219,6 +219,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String INTERNAL_NAMING_PREFIX = "internal-naming-prefix";
 
+   private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
+
+
    // Attributes ----------------------------------------------------
 
    private boolean validateAIO = false;
@@ -342,6 +345,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       config.setInternalNamingPrefix(getString(e, INTERNAL_NAMING_PREFIX, config.getInternalNamingPrefix(), Validators.NO_CHECK));
 
+      config.setAmqpUseCoreSubscriptionNaming(getBoolean(e, AMQP_USE_CORE_SUBSCRIPTION_NAMING, config.isAmqpUseCoreSubscriptionNaming()));
+
+
       // parsing cluster password
       String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index b0e8502..6af18bd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -52,6 +52,16 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="amqp-use-core-subscription-naming" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
+            <xsd:annotation>
+               <xsd:documentation>
+                  This enables making AMQP subscription queue names, match core queue names, for better interoperability between protocols.
+                  Note: Enabling this to an existing broker if pre-existing amqp durable subscriptions already existed will require
+                  clients to re-subscribe and to clean up old subscription names.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="resolve-protocols" type="xsd:boolean" default="true" maxOccurs="1"
                       minOccurs="0">
             <xsd:annotation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
index 07d5f58..a07c797 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
@@ -133,6 +133,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
       Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled());
 
       Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout());
+
+      Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(), conf.isAmqpUseCoreSubscriptionNaming());
    }
 
    // Protected ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 2f18365..28ad83e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -92,6 +92,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       Assert.assertEquals("pagingdir", conf.getPagingDirectory());
       Assert.assertEquals("somedir", conf.getBindingsDirectory());
       Assert.assertEquals(false, conf.isCreateBindingsDir());
+      Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
 
       Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
       Assert.assertEquals("somedir2", conf.getJournalDirectory());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 0691e95..ca10eb9 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -122,6 +122,7 @@
             <exclusive>false</exclusive>
          </divert>
       </diverts>
+      <amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
       <queues>
          <queue name="queue1">
             <address>address1</address>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
index 3f8da1a..51c70ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
       receiver2.close();
       //check its been deleted
       Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisfied() throws Exception {
-            return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
          }
       }, 1000);
       connection.close();
@@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AddressInfo addressInfo = new AddressInfo(address);
       addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
       server.addAddressInfo(addressInfo);
-      server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false);
+      server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
       AmqpClient client = createAmqpClient();
 
       AmqpConnection connection = addConnection(client.connect("myClientId"));
@@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
       receiver2.close();
       //check its **Hasn't** been deleted
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
       connection.close();
    }
 
@@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
       //check its been deleted
       connection.close();
       Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisfied() throws Exception {
-            return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
          }
       }, 1000);
    }
@@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
       receiver2.close();
       //check its been deleted
       Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisfied() throws Exception {
-            return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null;
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
          }
       }, 1000);
       connection.close();
@@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
       receiver2.close();
       //check its been deleted
-      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
       connection.close();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index edd4968..3f96711 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -23,6 +23,7 @@ import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.jboss.logging.Logger;
 import org.junit.After;
@@ -154,4 +155,55 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
 
       return connection;
    }
+
+
+   protected String getBrokerCoreJMSConnectionString() {
+
+      try {
+         int port = AMQP_PORT;
+
+         String uri = null;
+
+         if (isUseSSL()) {
+            uri = "tcp://127.0.0.1:" + port;
+         } else {
+            uri = "tcp://127.0.0.1:" + port;
+         }
+
+         if (!getJmsConnectionURIOptions().isEmpty()) {
+            uri = uri + "?" + getJmsConnectionURIOptions();
+         }
+
+         return uri;
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   protected Connection createCoreConnection() throws JMSException {
+      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
+   }
+
+   private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
+      ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
+
+      Connection connection = trackJMSConnection(factory.createConnection(username, password));
+
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+      if (clientId != null && !clientId.isEmpty()) {
+         connection.setClientID(clientId);
+      }
+
+      if (start) {
+         connection.start();
+      }
+
+      return connection;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
index 26097f6..31de59d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -31,11 +33,31 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class JMSDurableConsumerTest extends JMSClientTestSupport {
 
+   @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
+   @Parameterized.Parameter(0)
+   public boolean amqpUseCoreSubscriptionNaming;
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
+   }
+
    @Test(timeout = 30000)
    public void testDurableConsumerAsync() throws Exception {
       final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
index c49fcff..4113e4e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -27,11 +26,33 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class JMSSharedConsumerTest extends JMSClientTestSupport {
 
+   @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
+   @Parameterized.Parameter(0)
+   public boolean amqpUseCoreSubscriptionNaming;
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
+   }
+
    @Override
    protected String getConfiguredProtocols() {
       return "AMQP,OPENWIRE,CORE";
@@ -94,6 +115,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
 
    @Test(timeout = 30000)
    public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception {
+      org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
 
       Connection connection = createConnection(); //AMQP
       Connection connection2 = createCoreConnection(); //CORE
@@ -104,6 +126,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
 
    @Test(timeout = 30000)
    public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception {
+      org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
 
       Connection connection = createCoreConnection(); //CORE
       Connection connection2 = createConnection(); //AMQP
@@ -111,56 +134,4 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
       testSharedConsumer(connection, connection2);
 
    }
-
-
-   protected String getBrokerCoreJMSConnectionString() {
-
-      try {
-         int port = AMQP_PORT;
-
-         String uri = null;
-
-         if (isUseSSL()) {
-            uri = "tcp://127.0.0.1:" + port;
-         } else {
-            uri = "tcp://127.0.0.1:" + port;
-         }
-
-         if (!getJmsConnectionURIOptions().isEmpty()) {
-            uri = uri + "?" + getJmsConnectionURIOptions();
-         }
-
-         return uri;
-      } catch (Exception e) {
-         throw new RuntimeException();
-      }
-   }
-
-   protected Connection createCoreConnection() throws JMSException {
-      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
-   }
-
-   private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
-      ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
-
-      Connection connection = trackJMSConnection(factory.createConnection(username, password));
-
-      connection.setExceptionListener(new ExceptionListener() {
-         @Override
-         public void onException(JMSException exception) {
-            exception.printStackTrace();
-         }
-      });
-
-      if (clientId != null && !clientId.isEmpty()) {
-         connection.setClientID(clientId);
-      }
-
-      if (start) {
-         connection.start();
-      }
-
-      return connection;
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
index 040506b..ad0d9dd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -27,11 +26,33 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
 
+   @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
+   @Parameterized.Parameter(0)
+   public boolean amqpUseCoreSubscriptionNaming;
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
+   }
+
    @Override
    protected String getConfiguredProtocols() {
       return "AMQP,OPENWIRE,CORE";
@@ -68,6 +89,10 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
          }
          assertNotNull("Should have received a message by now.", received);
          assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
+
+         consumer1.close();
+         consumer2.close();
+         session1.unsubscribe("SharedConsumer");
       } finally {
          connection1.close();
          connection2.close();
@@ -94,6 +119,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
 
    @Test(timeout = 30000)
    public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception {
+      org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
 
       Connection connection = createConnection(); //AMQP
       Connection connection2 = createCoreConnection(); //CORE
@@ -104,6 +130,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
 
    @Test(timeout = 30000)
    public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception {
+      org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
 
       Connection connection = createCoreConnection(); //CORE
       Connection connection2 = createConnection(); //AMQP
@@ -111,56 +138,4 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
       testSharedDurableConsumer(connection, connection2);
 
    }
-
-
-   protected String getBrokerCoreJMSConnectionString() {
-
-      try {
-         int port = AMQP_PORT;
-
-         String uri = null;
-
-         if (isUseSSL()) {
-            uri = "tcp://127.0.0.1:" + port;
-         } else {
-            uri = "tcp://127.0.0.1:" + port;
-         }
-
-         if (!getJmsConnectionURIOptions().isEmpty()) {
-            uri = uri + "?" + getJmsConnectionURIOptions();
-         }
-
-         return uri;
-      } catch (Exception e) {
-         throw new RuntimeException();
-      }
-   }
-
-   protected Connection createCoreConnection() throws JMSException {
-      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
-   }
-
-   private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
-      ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
-
-      Connection connection = trackJMSConnection(factory.createConnection(username, password));
-
-      connection.setExceptionListener(new ExceptionListener() {
-         @Override
-         public void onException(JMSException exception) {
-            exception.printStackTrace();
-         }
-      });
-
-      if (clientId != null && !clientId.isEmpty()) {
-         connection.setClientID(clientId);
-      }
-
-      if (start) {
-         connection.start();
-      }
-
-      return connection;
-   }
-
 }