You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2021/07/06 19:07:00 UTC

[activemq-artemis] branch main updated: ARTEMIS-3374 config-managed queue can be deleted by durable subscriber

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d02d06  ARTEMIS-3374 config-managed queue can be deleted by durable subscriber
     new 478a28c  This closes #3642
1d02d06 is described below

commit 1d02d06eab7a1ea387c39ed17cda74c52f5948d9
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Jun 23 14:22:17 2021 -0500

    ARTEMIS-3374 config-managed queue can be deleted by durable subscriber
---
 .../artemis/api/core/client/ClientSession.java     |  2 +
 .../artemis/core/client/impl/QueueQueryImpl.java   | 13 ++++++-
 .../SessionQueueQueryResponseMessage_V3.java       | 32 ++++++++++++++--
 .../artemis/core/server/QueueQueryResult.java      | 11 +++++-
 .../artemis/jms/client/ActiveMQSession.java        |  2 +-
 .../amqp/proton/ProtonServerSenderContext.java     |  9 +++--
 .../core/protocol/openwire/amq/AMQConsumer.java    |  2 +-
 .../core/server/impl/ActiveMQServerImpl.java       |  6 +--
 .../integration/amqp/JMSClientTestSupport.java     |  8 ++++
 .../integration/amqp/JMSMessageConsumerTest.java   | 44 ++++++++++++++++++++++
 10 files changed, 114 insertions(+), 15 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index d2c0f43..ad7e702 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -177,6 +177,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
       Long getRingSize();
 
       Boolean isEnabled();
+
+      Boolean isConfigurationManaged();
    }
 
    // Lifecycle operations ------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index d532dc0..5529448 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -78,6 +78,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
 
    private final Boolean enabled;
 
+   private final Boolean configurationManaged;
+
 
    private final Integer defaultConsumerWindowSize;
 
@@ -164,7 +166,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final Long autoDeleteDelay,
                          final Long autoDeleteMessageCount,
                          final Integer defaultConsumerWindowSize) {
-      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
+      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null, null);
    }
 
    public QueueQueryImpl(final boolean durable,
@@ -195,7 +197,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final Long autoDeleteMessageCount,
                          final Integer defaultConsumerWindowSize,
                          final Long ringSize,
-                         final Boolean enabled) {
+                         final Boolean enabled,
+                         final Boolean configurationManaged) {
       this.durable = durable;
       this.temporary = temporary;
       this.consumerCount = consumerCount;
@@ -225,6 +228,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
       this.defaultConsumerWindowSize = defaultConsumerWindowSize;
       this.ringSize = ringSize;
       this.enabled = enabled;
+      this.configurationManaged = configurationManaged;
    }
 
    @Override
@@ -371,5 +375,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
    public Boolean isEnabled() {
       return enabled;
    }
+
+   @Override
+   public Boolean isConfigurationManaged() {
+      return configurationManaged;
+   }
 }
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index bf83222..9deffe1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -66,12 +66,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    private Boolean enabled;
 
+   private Boolean configurationManaged;
+
    public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
-      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), res [...]
+      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), res [...]
    }
 
    public SessionQueueQueryResponseMessage_V3() {
-      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
    }
 
    private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -102,7 +104,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
                                                final Long autoDeleteMessageCount,
                                                final Integer defaultConsumerWindowSize,
                                                final Long ringSize,
-                                               final Boolean enabled) {
+                                               final Boolean enabled,
+                                               final Boolean configurationManaged) {
       super(SESS_QUEUEQUERY_RESP_V3);
 
       this.durable = durable;
@@ -162,6 +165,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       this.ringSize = ringSize;
 
       this.enabled = enabled;
+
+      this.configurationManaged = configurationManaged;
    }
 
    public boolean isAutoCreated() {
@@ -312,6 +317,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       this.enabled = enabled;
    }
 
+   public Boolean isConfigurationManaged() {
+      return configurationManaged;
+   }
+
+   public void setConfigurationManaged(Boolean configurationManaged) {
+      this.configurationManaged = configurationManaged;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -335,6 +348,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       BufferHelper.writeNullableLong(buffer, ringSize);
       BufferHelper.writeNullableBoolean(buffer, enabled);
       BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
+      BufferHelper.writeNullableBoolean(buffer, configurationManaged);
 
    }
 
@@ -375,6 +389,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       if (buffer.readableBytes() > 0) {
          groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
       }
+      if (buffer.readableBytes() > 0) {
+         configurationManaged = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -401,6 +418,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
       result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
       result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237);
+      result = prime * result + (configurationManaged == null ? 0 : configurationManaged ? 1231 : 1237);
       return result;
    }
 
@@ -434,12 +452,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
       buff.append(", ringSize=" + ringSize);
       buff.append(", enabled=" + enabled);
+      buff.append(", configurationManaged=" + configurationManaged);
       return buff.toString();
    }
 
    @Override
    public ClientSession.QueueQuery toQueueQuery() {
-      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getA [...]
+      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getA [...]
    }
 
    @Override
@@ -542,6 +561,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
          return false;
       if (maxConsumers != other.maxConsumers)
          return false;
+      if (configurationManaged == null) {
+         if (other.configurationManaged != null)
+            return false;
+      } else if (!configurationManaged.equals(other.configurationManaged))
+         return false;
       return true;
    }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index 798f417..9fb07ca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -79,6 +79,8 @@ public class QueueQueryResult {
 
    private Boolean enabled;
 
+   private Boolean configurationManaged;
+
    public QueueQueryResult(final SimpleString name,
                            final SimpleString address,
                            final boolean durable,
@@ -107,7 +109,8 @@ public class QueueQueryResult {
                            final Long autoDeleteMessageCount,
                            final Integer defaultConsumerWindowSize,
                            final Long ringSize,
-                           final Boolean enabled) {
+                           final Boolean enabled,
+                           final Boolean configurationManaged) {
       this.durable = durable;
 
       this.temporary = temporary;
@@ -165,6 +168,8 @@ public class QueueQueryResult {
       this.ringSize = ringSize;
 
       this.enabled = enabled;
+
+      this.configurationManaged = configurationManaged;
    }
 
    public boolean isExists() {
@@ -286,4 +291,8 @@ public class QueueQueryResult {
    public Boolean isEnabled() {
       return enabled;
    }
+
+   public Boolean isConfigurationManaged() {
+      return configurationManaged;
+   }
 }
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 3fd2bb4..2076478 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -912,7 +912,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
                   boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
 
-                  if (selectorChanged || topicChanged) {
+                  if ((selectorChanged || topicChanged) && !subResponse.isConfigurationManaged()) {
                      // Delete the old durable sub
                      session.deleteQueue(queueName);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 0e138d8..c65d86d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -1098,9 +1098,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                   queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
                   QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
                   if (result.isExists()) {
-                     // If a client reattaches to a durable subscription with a different no-local
-                     // filter value, selector or address then we must recreate the queue (JMS semantics).
-                     if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+                     /*
+                      * If a client reattaches to a durable subscription with a different filter or address then we must
+                      * recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
+                      * configuration then we don't want to change it
+                      */
+                     if (!result.isConfigurationManaged() && (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))) {
 
                         if (result.getConsumerCount() == 0) {
                            sessionSPI.deleteQueue(queue);
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index c06227e..d609051 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -199,7 +199,7 @@ public class AMQConsumer {
 
             boolean topicChanged = !oldTopicName.equals(address);
 
-            if (selectorChanged || topicChanged) {
+            if ((selectorChanged || topicChanged) && !result.isConfigurationManaged()) {
                // Delete the old durable sub
                session.getCoreSession().deleteQueue(queueName);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c73e63e..ba7fe19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1060,12 +1060,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue [...]
+         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue [...]
       } else if (realName.equals(managementAddress)) {
          // make an exception for the management address (see HORNETQ-29)
-         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
+         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null, null);
       } else {
-         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDelete [...]
+         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDelete [...]
       }
 
       return response;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index fbed4ec..b3d1393 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -205,6 +205,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
       return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
    }
 
+   protected Connection createCoreConnection(boolean start) throws JMSException {
+      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, start);
+   }
+
    private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
       ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
 
@@ -257,6 +261,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
       return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
    }
 
+   protected Connection createOpenWireConnection(boolean start) throws JMSException {
+      return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, false);
+   }
+
    private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index b5b170d..fab4231 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -40,9 +40,12 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.DestinationUtil;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.junit.Assert;
@@ -217,6 +220,47 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception {
+      testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false));
+
+   }
+
+   @Test(timeout = 60000)
+   public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception {
+      testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false));
+
+   }
+
+   @Test(timeout = 60000)
+   public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception {
+      testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false));
+   }
+
+   private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception {
+      final String clientId = "bar";
+      final String subName = "foo";
+      final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString();
+      server.stop();
+      server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true);
+      server.start();
+
+      try (Connection connection = connectionSupplier.createConnection()) {
+         connection.setClientID(clientId);
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic destination = session.createTopic("myTopic");
+
+         MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName);
+         messageConsumer.close();
+
+         Queue queue = server.locateQueue(queueName);
+         assertNotNull(queue);
+         assertNotNull(queue.getFilter());
+         assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
+      }
+   }
+
    @Test(timeout = 30000)
    public void testSelectorsWithJMSTypeOnTopic() throws Exception {
       doTestSelectorsWithJMSType(true);