You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2021/07/06 19:07:00 UTC
[activemq-artemis] branch main updated: ARTEMIS-3374 config-managed
queue can be deleted by durable subscriber
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1d02d06 ARTEMIS-3374 config-managed queue can be deleted by durable subscriber
new 478a28c This closes #3642
1d02d06 is described below
commit 1d02d06eab7a1ea387c39ed17cda74c52f5948d9
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Jun 23 14:22:17 2021 -0500
ARTEMIS-3374 config-managed queue can be deleted by durable subscriber
---
.../artemis/api/core/client/ClientSession.java | 2 +
.../artemis/core/client/impl/QueueQueryImpl.java | 13 ++++++-
.../SessionQueueQueryResponseMessage_V3.java | 32 ++++++++++++++--
.../artemis/core/server/QueueQueryResult.java | 11 +++++-
.../artemis/jms/client/ActiveMQSession.java | 2 +-
.../amqp/proton/ProtonServerSenderContext.java | 9 +++--
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
.../core/server/impl/ActiveMQServerImpl.java | 6 +--
.../integration/amqp/JMSClientTestSupport.java | 8 ++++
.../integration/amqp/JMSMessageConsumerTest.java | 44 ++++++++++++++++++++++
10 files changed, 114 insertions(+), 15 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index d2c0f43..ad7e702 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -177,6 +177,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Long getRingSize();
Boolean isEnabled();
+
+ Boolean isConfigurationManaged();
}
// Lifecycle operations ------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index d532dc0..5529448 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -78,6 +78,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Boolean enabled;
+ private final Boolean configurationManaged;
+
private final Integer defaultConsumerWindowSize;
@@ -164,7 +166,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
- this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
+ this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null, null);
}
public QueueQueryImpl(final boolean durable,
@@ -195,7 +197,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
- final Boolean enabled) {
+ final Boolean enabled,
+ final Boolean configurationManaged) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@@ -225,6 +228,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
this.enabled = enabled;
+ this.configurationManaged = configurationManaged;
}
@Override
@@ -371,5 +375,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
public Boolean isEnabled() {
return enabled;
}
+
+ @Override
+ public Boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index bf83222..9deffe1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -66,12 +66,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Boolean enabled;
+ private Boolean configurationManaged;
+
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
- this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), res [...]
+ this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), res [...]
}
public SessionQueueQueryResponseMessage_V3() {
- this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+ this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -102,7 +104,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
- final Boolean enabled) {
+ final Boolean enabled,
+ final Boolean configurationManaged) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
@@ -162,6 +165,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.ringSize = ringSize;
this.enabled = enabled;
+
+ this.configurationManaged = configurationManaged;
}
public boolean isAutoCreated() {
@@ -312,6 +317,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.enabled = enabled;
}
+ public Boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
+ public void setConfigurationManaged(Boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ }
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@@ -335,6 +348,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
+ BufferHelper.writeNullableBoolean(buffer, configurationManaged);
}
@@ -375,6 +389,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
}
+ if (buffer.readableBytes() > 0) {
+ configurationManaged = BufferHelper.readNullableBoolean(buffer);
+ }
}
@Override
@@ -401,6 +418,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237);
+ result = prime * result + (configurationManaged == null ? 0 : configurationManaged ? 1231 : 1237);
return result;
}
@@ -434,12 +452,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
buff.append(", ringSize=" + ringSize);
buff.append(", enabled=" + enabled);
+ buff.append(", configurationManaged=" + configurationManaged);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
- return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getA [...]
+ return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getA [...]
}
@Override
@@ -542,6 +561,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
if (maxConsumers != other.maxConsumers)
return false;
+ if (configurationManaged == null) {
+ if (other.configurationManaged != null)
+ return false;
+ } else if (!configurationManaged.equals(other.configurationManaged))
+ return false;
return true;
}
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index 798f417..9fb07ca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -79,6 +79,8 @@ public class QueueQueryResult {
private Boolean enabled;
+ private Boolean configurationManaged;
+
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@@ -107,7 +109,8 @@ public class QueueQueryResult {
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
- final Boolean enabled) {
+ final Boolean enabled,
+ final Boolean configurationManaged) {
this.durable = durable;
this.temporary = temporary;
@@ -165,6 +168,8 @@ public class QueueQueryResult {
this.ringSize = ringSize;
this.enabled = enabled;
+
+ this.configurationManaged = configurationManaged;
}
public boolean isExists() {
@@ -286,4 +291,8 @@ public class QueueQueryResult {
public Boolean isEnabled() {
return enabled;
}
+
+ public Boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
}
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 3fd2bb4..2076478 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -912,7 +912,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
- if (selectorChanged || topicChanged) {
+ if ((selectorChanged || topicChanged) && !subResponse.isConfigurationManaged()) {
// Delete the old durable sub
session.deleteQueue(queueName);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 0e138d8..c65d86d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -1098,9 +1098,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) {
- // If a client reattaches to a durable subscription with a different no-local
- // filter value, selector or address then we must recreate the queue (JMS semantics).
- if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+ /*
+ * If a client reattaches to a durable subscription with a different filter or address then we must
+ * recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
+ * configuration then we don't want to change it
+ */
+ if (!result.isConfigurationManaged() && (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index c06227e..d609051 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -199,7 +199,7 @@ public class AMQConsumer {
boolean topicChanged = !oldTopicName.equals(address);
- if (selectorChanged || topicChanged) {
+ if ((selectorChanged || topicChanged) && !result.isConfigurationManaged()) {
// Delete the old durable sub
session.getCoreSession().deleteQueue(queueName);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c73e63e..ba7fe19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1060,12 +1060,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue [...]
+ response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue [...]
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
- response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
+ response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null, null);
} else {
- response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDelete [...]
+ response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDelete [...]
}
return response;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index fbed4ec..b3d1393 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -205,6 +205,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
+ protected Connection createCoreConnection(boolean start) throws JMSException {
+ return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, start);
+ }
+
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
@@ -257,6 +261,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
}
+ protected Connection createOpenWireConnection(boolean start) throws JMSException {
+ return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, false);
+ }
+
private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index b5b170d..fab4231 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -40,9 +40,12 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.junit.Assert;
@@ -217,6 +220,47 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
+ @Test(timeout = 60000)
+ public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception {
+ testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false));
+
+ }
+
+ @Test(timeout = 60000)
+ public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception {
+ testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false));
+
+ }
+
+ @Test(timeout = 60000)
+ public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception {
+ testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false));
+ }
+
+ private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception {
+ final String clientId = "bar";
+ final String subName = "foo";
+ final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString();
+ server.stop();
+ server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST));
+ server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true);
+ server.start();
+
+ try (Connection connection = connectionSupplier.createConnection()) {
+ connection.setClientID(clientId);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic destination = session.createTopic("myTopic");
+
+ MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName);
+ messageConsumer.close();
+
+ Queue queue = server.locateQueue(queueName);
+ assertNotNull(queue);
+ assertNotNull(queue.getFilter());
+ assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
+ }
+ }
+
@Test(timeout = 30000)
public void testSelectorsWithJMSTypeOnTopic() throws Exception {
doTestSelectorsWithJMSType(true);