You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2020/06/18 14:47:23 UTC

[activemq-cli-tools] branch master updated: AMQCLI-13 AMQCLI-14 - support --virtualTopicConsumerWildcards to export consumer queues to match openwire FQQN access

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-cli-tools.git


The following commit(s) were added to refs/heads/master by this push:
     new 198cf7b  AMQCLI-13 AMQCLI-14 - support --virtualTopicConsumerWildcards to export consumer queues to match openwire FQQN access
198cf7b is described below

commit 198cf7b6b9d73363e023d7864367b1ea943c7c42
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Jun 18 15:47:03 2020 +0100

    AMQCLI-13 AMQCLI-14 - support --virtualTopicConsumerWildcards to export consumer queues to match openwire FQQN access
---
 .../artemis/schema/ArtemisJournalMarshaller.java   |   2 +-
 .../cli/kahadb/exporter/ExportConfiguration.java   |  57 +++++++++
 .../activemq/cli/kahadb/exporter/Exporter.java     |  29 ++---
 .../artemis/ArtemisXmlMetadataExporter.java        |  26 ++--
 .../artemis/OpenWireCoreMessageTypeConverter.java  |  16 ++-
 .../activemq/cli/kahadb/exporter/ExporterTest.java | 134 ++++++++++++++++++++-
 .../OpenWireCoreMessageTypeConverterTest.java      |   4 +-
 pom.xml                                            |   2 +-
 8 files changed, 237 insertions(+), 33 deletions(-)

diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
index 2fad01d..84b5364 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
@@ -29,7 +29,7 @@ import javax.xml.namespace.QName;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
 
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataConstants;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
 import org.apache.activemq.cli.schema.ActivemqJournalType;
 import org.apache.activemq.cli.schema.AddressBindingType;
 import org.apache.activemq.cli.schema.MessageType;
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java
index 3f7a7bd..004f073 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java
@@ -17,6 +17,14 @@
 package org.apache.activemq.cli.kahadb.exporter;
 
 import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.filter.DestinationPath;
 
 public class ExportConfiguration {
 
@@ -34,6 +42,8 @@ public class ExportConfiguration {
 
     private boolean overwrite;
 
+    private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new HashMap<>();
+
     public File getSource() {
         return source;
     }
@@ -90,6 +100,49 @@ public class ExportConfiguration {
         this.overwrite = overwrite;
     }
 
+    public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
+        if (virtualTopicConsumerWildcards != null) {
+            for (String filter : virtualTopicConsumerWildcards.split(",")) {
+                String[] wildcardLimitPair = filter.split(";");
+                vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1]));
+            }
+        }
+    }
+
+    public ActiveMQDestination mapToDurableSubFQQN(ActiveMQDestination destination) {
+
+        if (vtConsumerDestinationMatchers.isEmpty()) {
+            return destination;
+        }
+
+        for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet()) {
+            if (candidate.getKey().matches(destination)) {
+                // convert to matching FQQN
+                String[] paths = DestinationPath.getDestinationPaths(destination);
+                StringBuilder fqqn = new StringBuilder();
+                int filterPathTerminus = candidate.getValue();
+                // address - ie: topic
+                for (int i = filterPathTerminus; i < paths.length; i++) {
+                    if (i > filterPathTerminus) {
+                        fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
+                    }
+                    fqqn.append(paths[i]);
+                }
+                fqqn.append(CompositeAddress.SEPARATOR);
+                // consumer queue - the full vt queue
+                for (int i = 0; i < paths.length; i++) {
+                    if (i > 0) {
+                        fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
+                    }
+                    fqqn.append(paths[i]);
+                }
+                // no need for a cache as this is called once per destination on metadata export
+                return new ActiveMQQueue(fqqn.toString());
+            }
+        }
+        return destination;
+    }
+
     public static class ExportConfigurationBuilder {
 
         private final ExportConfiguration config = new ExportConfiguration();
@@ -137,5 +190,9 @@ public class ExportConfiguration {
             return config;
         }
 
+        public ExportConfigurationBuilder setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
+            config.setVirtualTopicConsumerWildcards(virtualTopicConsumerWildcards);
+            return this;
+        }
     }
 }
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index 9d7ba93..8ea7409 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -88,6 +88,9 @@ public class Exporter {
         @Option(name = "-f", type = OptionType.COMMAND, description = "Force XML output and overwrite existing file")
         public boolean overwrite;
 
+        @Option(name = {"--vt", "--virtualTopicConsumerWildcards"}, type = OptionType.COMMAND, description = "Virtual Topic Consumer Pattern list")
+        public String virtualTopicConsumerWildcards;
+
         /* (non-Javadoc)
          * @see java.lang.Runnable#run()
          */
@@ -100,6 +103,7 @@ public class Exporter {
                         .setTarget(new File(target))
                         .setQueuePattern(queuePattern)
                         .setTopicPattern(topicPattern)
+                        .setVirtualTopicConsumerWildcards(virtualTopicConsumerWildcards)
                         .setCompress(compress)
                         .setOverwrite(overwrite)
                         .build());
@@ -128,6 +132,7 @@ public class Exporter {
                         .setTarget(new File(target))
                         .setQueuePattern(queuePattern)
                         .setTopicPattern(topicPattern)
+                        .setVirtualTopicConsumerWildcards(virtualTopicConsumerWildcards)
                         .setCompress(compress)
                         .setOverwrite(overwrite)
                         .build());
@@ -155,11 +160,9 @@ public class Exporter {
             xmlMarshaller.appendJournalOpen();
 
             if (config.isMultiKaha()) {
-                appendMultiKahaDbStore(xmlMarshaller, getMultiKahaDbAdapter(config.getSource()),
-                        config.getQueuePattern(), config.getTopicPattern());
+                appendMultiKahaDbStore(xmlMarshaller, getMultiKahaDbAdapter(config.getSource()), config);
             } else {
-                appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(config.getSource()),
-                        config.getQueuePattern(), config.getTopicPattern());
+                appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(config.getSource()), config);
             }
 
             xmlMarshaller.appendJournalClose(true);
@@ -172,9 +175,7 @@ public class Exporter {
 
 
     private static void appendMultiKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
-            final MultiKahaDBPersistenceAdapter multiAdapter, final String queuePattern,
-            final String topicPattern) throws Exception {
-
+            final MultiKahaDBPersistenceAdapter multiAdapter, final ExportConfiguration config) throws Exception {
         try {
             multiAdapter.start();
 
@@ -183,7 +184,7 @@ public class Exporter {
                     .map(adapter -> {
                         KahaDBPersistenceAdapter kahaAdapter = (KahaDBPersistenceAdapter) adapter;
                         return new KahaDBExporter(kahaAdapter,
-                              new ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller),
+                              new ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller, config),
                               new ArtemisXmlMessageRecoveryListener(kahaAdapter.getStore(), xmlMarshaller));
             }).collect(Collectors.toList());
 
@@ -195,8 +196,8 @@ public class Exporter {
 
             xmlMarshaller.appendMessagesElement();
             for (KahaDBExporter dbExporter : dbExporters) {
-                dbExporter.exportQueues(queuePattern);
-                dbExporter.exportTopics(topicPattern);
+                dbExporter.exportQueues(config.getQueuePattern());
+                dbExporter.exportTopics(config.getTopicPattern());
             }
             xmlMarshaller.appendEndElement();
         } finally {
@@ -205,21 +206,21 @@ public class Exporter {
     }
 
     private static void appendKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
-            final KahaDBPersistenceAdapter adapter, final String queuePattern, final String topicPattern) throws Exception {
+            final KahaDBPersistenceAdapter adapter, final ExportConfiguration config) throws Exception {
 
         try {
             adapter.start();
 
             final KahaDBExporter dbExporter = new KahaDBExporter(adapter,
-                    new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller),
+                    new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller, config),
                     new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
 
             xmlMarshaller.appendBindingsElement();
             dbExporter.exportMetadata();
             xmlMarshaller.appendEndElement();
             xmlMarshaller.appendMessagesElement();
-            dbExporter.exportQueues(queuePattern);
-            dbExporter.exportTopics(topicPattern);
+            dbExporter.exportQueues(config.getQueuePattern());
+            dbExporter.exportTopics(config.getTopicPattern());
             xmlMarshaller.appendEndElement();
         } finally {
             adapter.stop();
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
index d915ec9..f95603b 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
@@ -20,7 +20,9 @@ import java.io.IOException;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
+import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration;
 import org.apache.activemq.cli.kahadb.exporter.MessageStoreMetadataExporter;
 import org.apache.activemq.cli.schema.QueueBindingType;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -31,16 +33,18 @@ public class ArtemisXmlMetadataExporter implements MessageStoreMetadataExporter
 
     private final KahaDBStore store;
     private final ArtemisJournalMarshaller xmlMarshaller;
+    private final ExportConfiguration config;
 
 
     /**
      * @param xmlMarshaller
      */
     public ArtemisXmlMetadataExporter(final KahaDBStore store,
-            final ArtemisJournalMarshaller xmlMarshaller) {
+            final ArtemisJournalMarshaller xmlMarshaller, final ExportConfiguration config) {
         super();
         this.store = store;
         this.xmlMarshaller = xmlMarshaller;
+        this.config = config;
     }
 
     @Override
@@ -49,16 +53,24 @@ public class ArtemisXmlMetadataExporter implements MessageStoreMetadataExporter
         .forEach(dest -> {
             try {
                 if (dest.isQueue()) {
-                    xmlMarshaller.appendBinding(QueueBindingType.builder()
-                            .withName(dest.getPhysicalName())
-                            .withRoutingType(RoutingType.ANYCAST.toString())
-                            .withAddress(dest.getPhysicalName()).build());
+                    org.apache.activemq.command.ActiveMQDestination mappedToFQQN = config.mapToDurableSubFQQN(dest);
+                    if (dest != mappedToFQQN) {
+                        xmlMarshaller.appendBinding(QueueBindingType.builder()
+                                                       .withName(CompositeAddress.extractQueueName(mappedToFQQN.getPhysicalName()))
+                                                       .withRoutingType(RoutingType.MULTICAST.toString())
+                                                       .withAddress(CompositeAddress.extractAddressName(mappedToFQQN.getPhysicalName())).build());
+                    } else {
+                        xmlMarshaller.appendBinding(QueueBindingType.builder()
+                                                       .withName(dest.getPhysicalName())
+                                                       .withRoutingType(RoutingType.ANYCAST.toString())
+                                                       .withAddress(dest.getPhysicalName()).build());
+                    }
                 } else if (dest.isTopic()) {
                         for (SubscriptionInfo info :
                             store.createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
                             xmlMarshaller.appendBinding(QueueBindingType.builder()
-                                    .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
-                                            true, info.getClientId(), info.getSubcriptionName()))
+                                    .withName(ActiveMQDestination.createQueueNameForSubscription(
+                                            true, info.getClientId(), info.getSubcriptionName()).toString())
                                     .withRoutingType(RoutingType.MULTICAST.toString())
                                     .withAddress(dest.getPhysicalName()).build());
                         }
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java
index af4d363..bfd6233 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java
@@ -17,9 +17,11 @@
 package org.apache.activemq.cli.kahadb.exporter.artemis;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporterUtil;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration;
 import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter;
 import org.apache.activemq.cli.schema.BodyType;
 import org.apache.activemq.cli.schema.MessageType;
@@ -38,7 +40,9 @@ import org.apache.activemq.store.kahadb.KahaDBUtil;
  */
 public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter<MessageType> {
 
-    private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+    private final OpenWireMessageConverter converter = new OpenWireMessageConverter();
+    private final OpenWireFormat openWireFormat = new OpenWireFormat();
+    private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
     private final KahaDBStore store;
 
     /**
@@ -58,7 +62,7 @@ public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter
      */
     @Override
     public MessageType convert(final Message message) throws Exception {
-        final ICoreMessage serverMessage = (ICoreMessage) converter.inbound(message);
+        final ICoreMessage serverMessage = (ICoreMessage) converter.inbound(message, openWireFormat, coreMessageObjectPools);
         final MessageType messageType = convertAttributes(serverMessage);
 
         try {
@@ -103,8 +107,8 @@ public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter
 
             KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> {
                 queuesBuilder.addQueue(QueueType.builder().withName(
-                        ActiveMQDestination.createQueueNameForDurableSubscription(
-                        true, sub.getClientId(), sub.getSubcriptionName())).build());
+                        ActiveMQDestination.createQueueNameForSubscription(
+                        true, sub.getClientId(), sub.getSubcriptionName()).toString()).build());
             });
 
             return queuesBuilder.build();
@@ -112,7 +116,7 @@ public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter
     }
 
     private BodyType convertBody(final ICoreMessage serverMessage) throws Exception {
-        String value = XmlDataExporterUtil.encodeMessageBody(serverMessage);
+        String value = XmlDataExporterUtil.encodeMessageBodyBase64(serverMessage);
 
         //requires CDATA
         return BodyType.builder()
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 70a14e2..c76d962 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -33,7 +33,9 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.MapMessage;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
@@ -42,8 +44,10 @@ import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
@@ -51,6 +55,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration.ExportConfigurationBuilder;
 import org.apache.activemq.cli.schema.ActivemqJournalType;
@@ -92,6 +97,129 @@ public abstract class ExporterTest {
         testExportQueues("test.>");
     }
 
+    @Test
+    public void testExportVTQueueAsDurableSub() throws Exception {
+        File sourceDir = storeFolder.newFolder();
+        ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.T");
+        ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.T");
+
+        PersistenceAdapter adapter = getPersistenceAdapter(sourceDir);
+        adapter.start();
+        MessageStore messageStoreA = adapter.createQueueMessageStore(queueA);
+        MessageStore messageStoreB = adapter.createQueueMessageStore(queueB);
+        messageStoreA.start();
+        messageStoreB.start();
+
+        // publish messages
+        MessageId first = null;
+        IdGenerator id = new IdGenerator();
+        ConnectionContext context = new ConnectionContext();
+        for (int i = 0; i < 5; i++) {
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
+            message.setText("Test");
+            message.setProperty("MyStringProperty", "abc");
+            message.setProperty("MyIntegerProperty", 1);
+            message.setMessageId(new MessageId(id.generateId() + ":1", i));
+
+            message.setDestination(queueA);
+            messageStoreA.addMessage(context, message);
+
+            message.setDestination(queueB);
+            messageStoreB.addMessage(context, message);
+
+            if (i == 0) {
+                first = message.getMessageId();
+            }
+        }
+
+        //ack for subA only
+        MessageAck ack = new MessageAck();
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setLastMessageId(first);
+        messageStoreA.removeMessage(context,ack);
+
+        adapter.stop();
+
+        File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
+        exportStore(ExportConfigurationBuilder.newBuilder().setSource(sourceDir).setTarget(xmlFile).setVirtualTopicConsumerWildcards("Consumer.*.>;2"));
+
+        printFile(xmlFile);
+
+        validate(xmlFile, 9);
+
+        final ActiveMQServer artemisServer = buildArtemisBroker();
+        artemisServer.start();
+        artemisServer.getManagementService().enableNotifications(false);
+
+        XmlDataImporter dataImporter = new XmlDataImporter();
+        dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false);
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
+
+        Connection connection = null;
+        try {
+
+            connection = cf.createConnection();
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Queue fqqnA = session.createQueue(CompositeAddress.toFullyQualified("VirtualTopic.T", "Consumer.A.VirtualTopic.T"));
+            MessageConsumer messageConsumerA = session.createConsumer(fqqnA);
+
+            Queue fqqnB = session.createQueue(CompositeAddress.toFullyQualified("VirtualTopic.T", "Consumer.B.VirtualTopic.T"));
+            MessageConsumer messageConsumerB = session.createConsumer(fqqnB);
+
+            for (int i = 0; i < 5; i++) {
+                if (i < 4) {
+                    TextMessage messageReceived = (TextMessage) messageConsumerA.receive(1000);
+                    assertNotNull(messageReceived);
+                    assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+                    assertEquals("Test", messageReceived.getText());
+
+                    messageReceived = (TextMessage) messageConsumerB.receive(1000);
+                    assertNotNull(messageReceived);
+                    assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+                    assertEquals("Test", messageReceived.getText());
+
+                } else {
+                    // just subB gets this
+                    TextMessage messageReceived = (TextMessage) messageConsumerA.receive(100);
+                    assertNull(messageReceived);
+
+                    messageReceived = (TextMessage) messageConsumerB.receive(1000);
+                    assertNotNull(messageReceived);
+                    assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+                    assertEquals("Test", messageReceived.getText());
+                }
+            }
+
+            // verify durable topic sub semantics
+            // there is no auto create on core for a FQQN consumer so we need to configure before consumer creation!!
+            artemisServer.createQueue(new QueueConfiguration("Consumer.C.VirtualTopic.T").setAddress("VirtualTopic.T").setRoutingType(RoutingType.MULTICAST));
+            Queue fqqnC = session.createQueue(CompositeAddress.toFullyQualified("VirtualTopic.T", "Consumer.C.VirtualTopic.T"));
+            MessageConsumer messageConsumerC = session.createConsumer(fqqnC);
+
+            MessageProducer messageProducer = session.createProducer(session.createTopic("VirtualTopic.T"));
+            messageProducer.send(session.createTextMessage());
+
+            // consume from both subs
+            TextMessage messageReceived = (TextMessage) messageConsumerA.receive(1000);
+            assertNotNull(messageReceived);
+            messageReceived = (TextMessage) messageConsumerB.receive(1000);
+            assertNotNull(messageReceived);
+            messageReceived = (TextMessage) messageConsumerC.receive(1000);
+            assertNotNull(messageReceived);
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+            cf.close();
+        }
+        artemisServer.stop();
+    }
+
     /**
      *
      * @throws Exception
@@ -242,6 +370,7 @@ public abstract class ExporterTest {
                 .setTopicPattern("empty.>")
                 .setSource(kahaDbDir)
                 .setTarget(xmlFile));
+        printFile(xmlFile);
         validate(xmlFile, 0);
     }
 
@@ -279,6 +408,7 @@ public abstract class ExporterTest {
             message.setText("Test");
             message.setProperty("MyStringProperty", "abc");
             message.setProperty("MyIntegerProperty", 1);
+            message.setProperty("MyIntegerPropertyId", i+1);
             message.setDestination(topic);
             message.setMessageId(new MessageId(id.generateId() + ":1", i));
             messageStore.addMessage(context, message);
@@ -326,7 +456,7 @@ public abstract class ExporterTest {
             for (int i = 0; i < 5; i++) {
                 TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(1000);
                 if (i < 4) {
-                    assertNotNull(messageReceived1);
+                    assertNotNull("@" + i, messageReceived1);
                 } else {
                     assertNull(messageReceived1);
                 }
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
index 8b327a6..03f4943 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
@@ -19,7 +19,7 @@ package org.apache.activemq.cli.kahadb.exporter.artemis;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataConstants;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.cli.schema.MessageType;
@@ -131,7 +131,7 @@ public class OpenWireCoreMessageTypeConverterTest {
         MessageType messageType = c.convert(message);
 
         assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType());
-        assertEquals(ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId", "subName"),
+        assertEquals(ActiveMQDestination.createQueueNameForSubscription(true, "clientId", "subName").toString(),
                 messageType.getQueues().getQueue().get(0).getName());
     }
 }
diff --git a/pom.xml b/pom.xml
index 552e758..633c5f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
     <maven.compiler.target>1.8</maven.compiler.target>
 
     <activemq-version>5.14.4</activemq-version>
-    <artemis-version>2.0.0</artemis-version>
+    <artemis-version>2.14.0-SNAPSHOT</artemis-version>
     <slf4j-version>1.7.13</slf4j-version>
     <log4j-version>1.2.17</log4j-version>