You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/03/22 23:15:12 UTC

[activemq-artemis] branch master updated: ARTEMIS-3197 - add selectorAware option to virtualTopicConsumerWildcards for openwire acceptor

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd1b33  ARTEMIS-3197 - add selectorAware option to virtualTopicConsumerWildcards for openwire acceptor
     new 04fcecb  This closes #3508
8fd1b33 is described below

commit 8fd1b33d1638c1232197d5399c23daadee491289
Author: gtully <ga...@gmail.com>
AuthorDate: Mon Mar 22 14:53:11 2021 +0000

    ARTEMIS-3197 - add selectorAware option to virtualTopicConsumerWildcards for openwire acceptor
---
 .../protocol/openwire/OpenWireProtocolManager.java | 40 ++++++++++---
 .../core/protocol/openwire/amq/AMQSession.java     |  9 ++-
 .../core/protocol/openwire/util/OpenWireUtil.java  | 14 +++++
 docs/migration-guide/en/VirtualTopics.md           | 10 ++--
 docs/user-manual/en/openwire.md                    | 23 +++++---
 .../openwire/OpenWireProtocolManagerTest.java      | 14 +++--
 .../openwire/VirtualTopicToFQQNOpenWireTest.java   | 66 +++++++++++++++++++++-
 7 files changed, 146 insertions(+), 30 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index bfef98c..21e9748 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -80,6 +80,8 @@ import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.LongSequenceGenerator;
 
+import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION;
+
 public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
 
    private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
@@ -133,7 +135,29 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
 
-   private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new HashMap<>();
+   protected class VirtualTopicConfig {
+      public int filterPathTerminus;
+      public boolean selectorAware;
+
+      public VirtualTopicConfig(String[] configuration) {
+         filterPathTerminus = Integer.valueOf(configuration[1]);
+         // optional config
+         for (int i = 2; i < configuration.length; i++) {
+            String[] optionPair = configuration[i].split("=");
+            consumeOption(optionPair);
+         }
+      }
+
+      private void consumeOption(String[] optionPair) {
+         if (optionPair.length == 2) {
+            if (SELECTOR_AWARE_OPTION.equals(optionPair[0])) {
+               selectorAware = Boolean.valueOf(optionPair[1]);
+            }
+         }
+      }
+   }
+
+   private final Map<DestinationFilter, VirtualTopicConfig> vtConsumerDestinationMatchers = new HashMap<>();
    protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache = new LRUCache();
 
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
@@ -622,8 +646,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
       for (String filter : virtualTopicConsumerWildcards.split(",")) {
-         String[] wildcardLimitPair = filter.split(";");
-         vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1]));
+         String[] configuration = filter.split(";");
+         vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(configuration[0])), new VirtualTopicConfig(configuration));
       }
    }
 
@@ -646,15 +670,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
          return mappedDestination;
       }
 
-      for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet()) {
+      for (Map.Entry<DestinationFilter, VirtualTopicConfig> candidate : vtConsumerDestinationMatchers.entrySet()) {
          if (candidate.getKey().matches(destination)) {
             // convert to matching FQQN
             String[] paths = DestinationPath.getDestinationPaths(destination);
             StringBuilder fqqn = new StringBuilder();
-            int filterPathTerminus = candidate.getValue();
+            VirtualTopicConfig virtualTopicConfig = candidate.getValue();
             // address - ie: topic
-            for (int i = filterPathTerminus; i < paths.length; i++) {
-               if (i > filterPathTerminus) {
+            for (int i = virtualTopicConfig.filterPathTerminus; i < paths.length; i++) {
+               if (i > virtualTopicConfig.filterPathTerminus) {
                   fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
                }
                fqqn.append(paths[i]);
@@ -667,7 +691,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
                }
                fqqn.append(paths[i]);
             }
-            mappedDestination = new ActiveMQQueue(fqqn.toString());
+            mappedDestination = new ActiveMQQueue(fqqn.toString() + ( virtualTopicConfig.selectorAware ? "?" + SELECTOR_AWARE_OPTION + "=true" : "" ));
             break;
          }
       }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 7aaeabb..e963990 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
@@ -181,7 +182,7 @@ public class AMQSession implements SessionCallback {
             openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
             SimpleString queueName = new SimpleString(convertWildcard(openWireDest));
 
-            if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
+            if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary(), OpenWireUtil.extractFilterStringOrNull(info, openWireDest))) {
                throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
             }
          }
@@ -223,6 +224,10 @@ public class AMQSession implements SessionCallback {
    }
 
    private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
+      return checkAutoCreateQueue(queueName, isTemporary, null);
+   }
+
+   private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary, String filter) throws Exception {
       boolean hasQueue = true;
       if (!connection.containsKnownDestination(queueName)) {
 
@@ -245,7 +250,7 @@ public class AMQSession implements SessionCallback {
                         routingTypeToUse = as.getDefaultAddressRoutingType();
                      }
                   }
-                  coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true));
+                  coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter));
                   connection.addKnownDestination(queueName);
                } else {
                   hasQueue = false;
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 3d22647..197e130 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 
@@ -37,6 +38,19 @@ public class OpenWireUtil {
 
    public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration();
 
+   public static final String SELECTOR_AWARE_OPTION = "selectorAware";
+
+   public static String extractFilterStringOrNull(final ConsumerInfo info, final ActiveMQDestination openWireDest) {
+      if (info.getSelector() != null) {
+         if (openWireDest.getOptions()  != null) {
+            if (Boolean.valueOf(openWireDest.getOptions().get(SELECTOR_AWARE_OPTION))) {
+               return info.getSelector();
+            }
+         }
+      }
+      return null;
+   }
+
    /**
     * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
     * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
diff --git a/docs/migration-guide/en/VirtualTopics.md b/docs/migration-guide/en/VirtualTopics.md
index b98ac80..0e97276 100644
--- a/docs/migration-guide/en/VirtualTopics.md
+++ b/docs/migration-guide/en/VirtualTopics.md
@@ -2,7 +2,7 @@ Virtual Topics
 ==============
 
 Virtual Topics (a specialisation of virtual destinations) in ActiveMQ 5.x typically address two different but related
-problems. Lets take each in turn:
+problems. Let's take each in turn:
  
 Shared access to a JMS durable topic subscription
 -------------------------------------------------
@@ -20,7 +20,7 @@ JMS 2.0 adds the possibility of shared subscriptions with new API's that are ful
 
 Fully Qualified Queue name (FQQN)
 ---------------------------------
-Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the
+Secondly, Artemis uses a queue per topic subscriber model internally, and it is possibly to directly address the
 subscription queue using its Fully Qualified Queue name (FQQN).
 
 For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription `A`:
@@ -42,7 +42,7 @@ If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcar
 mechanism on the OpenWire protocol handler that will automatically convert the consumer destination into the
 corresponding FQQN.
 The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies a filter to match
-the virtual topic consumer destination and an int that specifies the number of path matches that terminate the consumer
+the virtual topic consumer destination, and an int that specifies the number of path matches that terminate the consumer
 queue identity.
 
 E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the parameter ```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```.
@@ -55,8 +55,8 @@ In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will
 Durable topic subscribers in a network of brokers
 -------------------------------------------------
 The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a
-network, duplicate durable subs get created on each node in the network but they do not migrate. The end result can
+network, duplicate durable subs get created on each node in the network, but they do not migrate. The end result can
 result in duplicate message storage and ultimately duplicate delivery, which is not good.
-When durable subscribers map to virtual topic subscriber queues, the queues can migrate and the problem can be avoided.
+When durable subscribers map to virtual topic subscriber queues, the queues can migrate, and the problem can be avoided.
 
 In Artemis, because a durable sub is modeled as a queue, this problem does not arise.
\ No newline at end of file
diff --git a/docs/user-manual/en/openwire.md b/docs/user-manual/en/openwire.md
index 9cecfe1..b522284 100644
--- a/docs/user-manual/en/openwire.md
+++ b/docs/user-manual/en/openwire.md
@@ -35,7 +35,7 @@ are:
 
 - `useKeepAlive`
 
-  Whether or not to send a KeepAliveInfo on an idle connection to prevent it
+  Indicates whether to send a KeepAliveInfo on an idle connection to prevent it
   from timing out. Enabled by default.  Disabling the keep alive will still make
   connections time out if no data was received on the connection for the
   specified amount of time.
@@ -64,13 +64,13 @@ broker side.
 
 - `supportAdvisory`
 
-  Whether or not the broker supports advisory messages. If the value is true,
+  Indicates whether the broker supports advisory messages. If the value is true,
   advisory addresses/queues will be created.  If the value is false, no advisory
   addresses/queues are created. Default value is `true`. 
 
 - `suppressInternalManagementObjects`
 
-  Whether or not the advisory addresses/queues, if any, will be registered to
+  Indicates whether advisory addresses/queues, if any, will be registered to
   management service (e.g. JMX registry). If set to true, no advisory
   addresses/queues will be registered. If set to false, those are registered and
   will be displayed on the management console. Default value is `true`.
@@ -88,12 +88,14 @@ configure a mapping function that will translate the virtual topic consumer
 destination into a FQQN address. This address will then represents the consumer as a
 multicast binding to an address representing the virtual topic. 
 
-The configuration string property `virtualTopicConsumerWildcards` has two parts
-separated by a `;`. The first is the 5.x style destination filter that
+The configuration string list property `virtualTopicConsumerWildcards` has parts
+separated by a `;`. The first is the classic style destination filter that
 identifies the destination as belonging to a virtual topic. The second
 identifies the number of `paths` that identify the consumer queue such that it
-can be parsed from the destination. For example, the default 5.x virtual topic
-with consumer prefix of `Consumer.*.`, would require a
+can be parsed from the destination. Any subsequent parts are additional configuration
+parameters for that mapping.
+
+For example, the default virtual topic with consumer prefix of `Consumer.*.`, would require a
 `virtualTopicConsumerWildcards` filter of `Consumer.*.>;2`. As a url parameter
 this transforms to `Consumer.*.%3E%3B2` when the url significant characters
 `>;` are escaped with their hex code points. In an `acceptor` url it would be:
@@ -105,8 +107,13 @@ this transforms to `Consumer.*.%3E%3B2` when the url significant characters
 This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of
 `VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` using the int component `2` of the
 configuration to identify the consumer queue as the first two paths of the
-destination.  `virtualTopicConsumerWildcards` is multi valued using a `,`
+destination. `virtualTopicConsumerWildcards` is multi valued using a `,`
 separator.
 
+### selectorAware
+The mappings support an optional parameter, `selectorAware` which when true, transfers any selector information from the
+OpenWire consumer into a queue filter of any auto-created subscription queue. Note: the selector/filter is persisted with
+the queue binding in the normal way, such that it works independent of connected consumers.
+
 Please see Virtual Topic Mapping example contained in the OpenWire
 [examples](examples.md).
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
index 2774cd6..e060cb3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.openwire;
 
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -27,8 +28,6 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
 
@@ -36,7 +35,7 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
    LRUCache lruCacheRef;
 
    @Test
-   public void testVtAutoConversion() throws Exception {
+   public void testVtAutoConversion() {
       underTest = new OpenWireProtocolManager(null, new DummyServer()) {
          @Override
          public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) {
@@ -49,17 +48,24 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
 
       final int maxCacheSize = 10;
       underTest.setVirtualTopicConsumerLruCacheMax(10);
-      underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3");
+      underTest.setVirtualTopicConsumerWildcards("A.>;1;selectorAware=true,B.*.>;2,C.*.*.*.EE;3;selectorAware=false");
 
       ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic");
       assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A.SomeTopic"), underTest.virtualTopicConsumerToFQQN(A));
 
+      ActiveMQDestination checkOption = underTest.virtualTopicConsumerToFQQN(A);
+      assertNotNull(checkOption.getOptions());
+      assertTrue(Boolean.parseBoolean(checkOption.getOptions().get(OpenWireUtil.SELECTOR_AWARE_OPTION)));
+
       ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B");
       assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b.SomeTopic.B"), underTest.virtualTopicConsumerToFQQN(B));
 
       ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE");
       assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c.SomeTopic.EE"), underTest.virtualTopicConsumerToFQQN(C));
 
+      checkOption = underTest.virtualTopicConsumerToFQQN(C);
+      assertNull(checkOption.getOptions());
+
       for (int i = 0; i < maxCacheSize; i++) {
          ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity" + i);
          assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
index 228c904..26d5940 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
@@ -25,23 +25,27 @@ import javax.jms.TextMessage;
 import java.util.Set;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.utils.Wait;
 import org.junit.Test;
 
 public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
 
    @Override
    protected void extraServerConfig(Configuration serverConfig) {
-      Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
+      Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
       for (TransportConfiguration tc : acceptors) {
          if (tc.getName().equals("netty")) {
-            tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2");
+            tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2,C.*.>;2;selectorAware=true");
             tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000");
-
          }
       }
+      serverConfig.setJMXManagementEnabled(true);
    }
 
    @Test
@@ -51,6 +55,7 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
       SimpleString topic = new SimpleString("VirtualTopic.Orders");
       this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
       this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoDeleteQueues(false);
 
       try {
          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
@@ -222,4 +227,59 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
          }
       }
    }
+
+
+   @Test
+   public void testSelectorAwareVT() throws Exception {
+      Connection connection = null;
+
+      SimpleString topic = new SimpleString("SVT.Orders.A");
+
+      this.server.getAddressSettingsRepository().getMatch("SVT.#").setAutoCreateQueues(true);
+      this.server.getAddressSettingsRepository().getMatch("SVT.#").setAutoCreateAddresses(true);
+
+      try {
+         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
+         activeMQConnectionFactory.setWatchTopicAdvisories(false);
+         connection = activeMQConnectionFactory.createConnection();
+         connection.start();
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createTopic(topic.toString());
+
+         MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("C.A." + topic.toString()), "stuff = 'A'");
+         MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("C.B." + topic.toString()), "stuff = 'B'");
+
+         MessageProducer producer = session.createProducer(destination);
+         TextMessage message = session.createTextMessage("This is a text message");
+         for (String stuffValue : new String[] {"A", "B", "C"}) {
+            message.setStringProperty("stuff", stuffValue);
+            producer.send(message);
+         }
+
+         TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
+         TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
+
+         assertTrue((messageReceivedA != null && messageReceivedB != null));
+         String text = messageReceivedA.getText();
+         assertEquals("This is a text message", text);
+
+         assertEquals("A", messageReceivedA.getStringProperty("stuff"));
+         assertEquals("B", messageReceivedB.getStringProperty("stuff"));
+
+         // verify C message got dropped
+
+         final QueueControl queueControlA = ManagementControlHelper.createQueueControl(topic, SimpleString.toSimpleString("C.A." + topic.toString()), RoutingType.MULTICAST, mbeanServer);
+         Wait.assertEquals(0, () -> queueControlA.countMessages());
+
+         final QueueControl queueControlB = ManagementControlHelper.createQueueControl(topic, SimpleString.toSimpleString("C.B." + topic.toString()), RoutingType.MULTICAST, mbeanServer);
+         Wait.assertEquals(0, () -> queueControlB.countMessages());
+
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
 }