You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2017/11/20 08:38:37 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1365 Advisory consumers listed in Console

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3c04de3ab -> 9a8055bd3


ARTEMIS-1365 Advisory consumers listed in Console

Openwire clients create consumers to advisory topics to receive
notifications. As a result there are internal queues created
on advisory topics. Those consumer shouldn't be exposed via
management APIs which are used by the Console

To fix that the broker doesn't register any queues from
advisory addresses.

Also refactors a code to remove Openwire specific contants
from AddressInfo class.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5fe88e1c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5fe88e1c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5fe88e1c

Branch: refs/heads/master
Commit: 5fe88e1c667870a398d85e427d324542399f9dfe
Parents: 3c04de3
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Nov 17 17:39:18 2017 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Fri Nov 17 17:39:40 2017 +0800

----------------------------------------------------------------------
 .../activemq/artemis/utils/PrefixUtil.java      |  13 +--
 .../protocol/openwire/OpenWireConnection.java   |  23 +++-
 .../openwire/OpenWireProtocolManager.java       |  24 ++++
 .../core/protocol/openwire/amq/AMQConsumer.java |  17 ++-
 .../core/protocol/openwire/amq/AMQSession.java  |  10 +-
 .../stomp/VersionedStompFrameHandler.java       |   3 +-
 .../artemis/core/server/ActiveMQServer.java     |   4 +
 .../artemis/core/server/ServerSession.java      |  22 +++-
 .../core/server/impl/ActiveMQServerImpl.java    | 113 +++++++++++++++++++
 .../artemis/core/server/impl/AddressInfo.java   |  36 ++++--
 .../core/server/impl/ServerSessionImpl.java     |  68 +++++++----
 .../management/impl/ManagementServiceImpl.java  |  22 +++-
 .../core/settings/HierarchicalRepository.java   |   1 +
 .../en/protocols-interoperability.md            |  27 +++++
 .../management/OpenWireManagementTest.java      | 101 ++++++++++++++++-
 15 files changed, 424 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
index e3fc5e6..5313a8a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
@@ -26,17 +26,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 
 public class PrefixUtil {
 
-   public static Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
-                                                                   RoutingType defaultRoutingType,
-                                                                   Map<SimpleString, RoutingType> prefixes) {
-      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
-         if (address.startsWith(entry.getKey())) {
-            return new Pair<>(removePrefix(address, entry.getKey()), entry.getValue());
-         }
-      }
-      return new Pair<>(address, defaultRoutingType);
-   }
-
    public static Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
                                                                           Set<RoutingType> defaultRoutingTypes,
                                                                           Map<SimpleString, RoutingType> prefixes) {
@@ -59,7 +48,7 @@ public class PrefixUtil {
       return address;
    }
 
-   private static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
+   public static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
       return string.subSeq(prefix.length(), string.length());
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f8d5f57..19d158d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.TempQueueObserver;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -726,15 +727,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    public void addDestination(DestinationInfo info) throws Exception {
       boolean created = false;
       ActiveMQDestination dest = info.getDestination();
+      if (!protocolManager.isSupportAdvisory() && AdvisorySupport.isAdvisoryTopic(dest)) {
+         return;
+      }
 
       SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName());
       if (server.locateQueue(qName) == null) {
          AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName());
+         AddressInfo addressInfo = new AddressInfo(qName, dest.isTopic() ? RoutingType.MULTICAST : RoutingType.ANYCAST);
+         if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) {
+            addressInfo.setInternal(true);
+         }
          if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) {
-            internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
+            internalSession.createQueue(addressInfo, qName, null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
             created = true;
          } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) {
-            internalSession.createAddress(qName, RoutingType.MULTICAST, !dest.isTemporary());
+            internalSession.createAddress(addressInfo, !dest.isTemporary());
             created = true;
          }
       }
@@ -783,6 +791,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          }
 
          List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection());
+         if (consumersList.size() == 0) {
+            return;
+         }
 
          this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
          ss.addConsumer(info);
@@ -878,6 +889,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       return state.getTempDestinations();
    }
 
+   public boolean isSuppressInternalManagementObjects() {
+      return protocolManager.isSuppressInternalManagementObjects();
+   }
+
+   public boolean isSuppportAdvisory() {
+      return protocolManager.isSupportAdvisory();
+   }
+
    class SlowConsumerDetection implements SlowConsumerDetectionListener {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index b552c35..87925b5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -119,6 +119,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    private long maxInactivityDurationInitalDelay = 10 * 1000L;
    private boolean useKeepAlive = true;
 
+   private boolean supportAdvisory = true;
+   //prevents advisory addresses/queues to be registered
+   //to management service
+   private boolean suppressInternalManagementObjects = true;
+
    private final OpenWireMessageConverter internalConverter;
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@@ -348,6 +353,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
                             Command command,
                             ConsumerId targetConsumerId,
                             String originalConnectionId) throws Exception {
+      if (!this.isSupportAdvisory()) {
+         return;
+      }
       ActiveMQMessage advisoryMessage = new ActiveMQMessage();
 
       if (originalConnectionId == null) {
@@ -583,4 +591,20 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    public OpenWireMessageConverter getInternalConverter() {
       return internalConverter;
    }
+
+   public boolean isSupportAdvisory() {
+      return supportAdvisory;
+   }
+
+   public void setSupportAdvisory(boolean supportAdvisory) {
+      this.supportAdvisory = supportAdvisory;
+   }
+
+   public boolean isSuppressInternalManagementObjects() {
+      return suppressInternalManagementObjects;
+   }
+
+   public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects) {
+      this.suppressInternalManagementObjects = suppressInternalManagementObjects;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 57506a2..fb50582 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -63,11 +63,15 @@ public class AMQConsumer {
    private AtomicInteger currentWindow;
    private long messagePullSequence = 0;
    private MessagePullHandler messagePullHandler;
+   //internal means we don't expose
+   //it's address/queue to management service
+   private boolean internalAddress = false;
 
    public AMQConsumer(AMQSession amqSession,
                       org.apache.activemq.command.ActiveMQDestination d,
                       ConsumerInfo info,
-                      ScheduledExecutorService scheduledPool) {
+                      ScheduledExecutorService scheduledPool,
+                      boolean internalAddress) {
       this.session = amqSession;
       this.openwireDestination = d;
       this.info = info;
@@ -77,6 +81,7 @@ public class AMQConsumer {
       if (prefetchSize == 0) {
          messagePullHandler = new MessagePullHandler();
       }
+      this.internalAddress = internalAddress;
    }
 
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
@@ -143,7 +148,10 @@ public class AMQConsumer {
       AddressInfo addressInfo = session.getCoreServer().getAddressInfo(address);
       if (addressInfo != null) {
          addressInfo.addRoutingType(RoutingType.MULTICAST);
+      } else {
+         addressInfo = new AddressInfo(address, RoutingType.MULTICAST);
       }
+      addressInfo.setInternal(internalAddress);
       if (isDurable) {
          queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
          QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
@@ -166,16 +174,15 @@ public class AMQConsumer {
                session.getCoreSession().deleteQueue(queueName);
 
                // Create the new one
-               session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, false, true);
+               session.getCoreSession().createQueue(addressInfo, queueName, selector, false, true);
             }
          } else {
-            session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, false, true);
+            session.getCoreSession().createQueue(addressInfo, queueName, selector, false, true);
          }
       } else {
          queueName = new SimpleString(UUID.randomUUID().toString());
 
-         session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, true, false);
-
+         session.getCoreSession().createQueue(addressInfo, queueName, selector, true, false);
       }
 
       return queueName;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 330ac35..057072b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -159,6 +160,13 @@ public class AMQSession implements SessionCallback {
       List<AMQConsumer> consumersList = new java.util.LinkedList<>();
 
       for (ActiveMQDestination openWireDest : dests) {
+         boolean isInternalAddress = false;
+         if (AdvisorySupport.isAdvisoryTopic(dest)) {
+            if (!connection.isSuppportAdvisory()) {
+               continue;
+            }
+            isInternalAddress = connection.isSuppressInternalManagementObjects();
+         }
          if (openWireDest.isQueue()) {
             SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
 
@@ -166,7 +174,7 @@ public class AMQSession implements SessionCallback {
                throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
             }
          }
-         AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
+         AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool, isInternalAddress);
 
          long nativeID = consumerIDGenerator.generateID();
          consumer.init(slowConsumerDetectionListener, nativeID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index bdae6fc..2259a17 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -350,7 +351,7 @@ public abstract class VersionedStompFrameHandler {
       if (typeHeader != null) {
          routingType = RoutingType.valueOf(typeHeader);
       } else {
-         routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination), null).getB();
+         routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(new AddressInfo(new SimpleString(destination))).getRoutingType();
       }
       return routingType;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 643d2be..5eeb3db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -318,6 +318,10 @@ public interface ActiveMQServer extends ServiceComponent {
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
                      Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
 
+   Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
+                     SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
+                     Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
+
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 7162acd..10de4dc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -115,6 +115,12 @@ public interface ServerSession extends SecurityAuth {
                      boolean temporary,
                      boolean durable) throws Exception;
 
+   Queue createQueue(AddressInfo address,
+                     SimpleString name,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable) throws Exception;
+
    /**
     * Create queue with default delivery mode
     *
@@ -150,6 +156,13 @@ public interface ServerSession extends SecurityAuth {
                      boolean durable,
                      boolean autoCreated) throws Exception;
 
+   Queue createQueue(AddressInfo addressInfo,
+                     SimpleString name,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable,
+                     boolean autoCreated) throws Exception;
+
    AddressInfo createAddress(SimpleString address,
                              Set<RoutingType> routingTypes,
                              boolean autoCreated) throws Exception;
@@ -158,6 +171,9 @@ public interface ServerSession extends SecurityAuth {
                              RoutingType routingType,
                              boolean autoCreated) throws Exception;
 
+   AddressInfo createAddress(AddressInfo addressInfo,
+                             boolean autoCreated) throws Exception;
+
    void deleteQueue(SimpleString name) throws Exception;
 
    ServerConsumer createConsumer(long consumerID,
@@ -270,13 +286,11 @@ public interface ServerSession extends SecurityAuth {
    /**
     * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
     *
-    * @param address the address to inspect
-    * @param defaultRoutingType the {@code org.apache.activemq.artemis.api.core.RoutingType} to return if no prefix
-    *                           match is found.
+    * @param addressInfo the address to inspect
     * @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address
     *         name and the {@code org.apache.activemq.artemis.api.core.RoutingType} corresponding to the that prefix.
     */
-   Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType);
+   AddressInfo getAddressAndRoutingType(AddressInfo addressInfo);
 
    /**
     * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 90b0f83..469828a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1674,6 +1674,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   @Deprecated
    public Queue createQueue(SimpleString address,
                             RoutingType routingType,
                             SimpleString queueName,
@@ -1688,6 +1689,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return createQueue(address, routingType, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
    }
 
+   @Override
+   public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
+   }
+
    @Deprecated
    @Override
    public Queue createQueue(final SimpleString address,
@@ -2666,6 +2672,113 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return postOffice.getAddressInfo(address);
    }
 
+   public Queue createQueue(final AddressInfo addrInfo,
+                            final SimpleString queueName,
+                            final SimpleString filterString,
+                            final SimpleString user,
+                            final boolean durable,
+                            final boolean temporary,
+                            final boolean ignoreIfExists,
+                            final boolean transientQueue,
+                            final boolean autoCreated,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean autoCreateAddress) throws Exception {
+      final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
+      if (binding != null) {
+         if (ignoreIfExists) {
+            return binding.getQueue();
+         } else {
+            throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, binding.getAddress());
+         }
+      }
+
+      final Filter filter = FilterImpl.createFilter(filterString);
+
+      final long txID = storageManager.generateID();
+      final long queueID = storageManager.generateID();
+
+      final QueueConfig.Builder queueConfigBuilder;
+
+      final SimpleString addressToUse = addrInfo == null ? queueName : addrInfo.getName();
+
+      queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressToUse);
+
+      AddressInfo info = postOffice.getAddressInfo(addressToUse);
+
+      RoutingType routingType = addrInfo == null ? null : addrInfo.getRoutingType();
+      RoutingType rt = (routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
+      if (autoCreateAddress) {
+         if (info == null) {
+            final AddressInfo addressInfo = new AddressInfo(addressToUse, rt);
+            addressInfo.setAutoCreated(true);
+            addressInfo.setInternal(addrInfo == null ? false : addrInfo.isInternal());
+            addAddressInfo(addressInfo);
+         } else if (!info.getRoutingTypes().contains(rt)) {
+            Set<RoutingType> routingTypes = new HashSet<>();
+            routingTypes.addAll(info.getRoutingTypes());
+            routingTypes.add(rt);
+            updateAddressInfo(info.getName(), routingTypes);
+         }
+      } else if (info == null) {
+         throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressToUse);
+      } else if (!info.getRoutingTypes().contains(rt)) {
+         throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(rt, info.getName().toString(), info.getRoutingTypes());
+      }
+
+      final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(addrInfo.getRoutingType()).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
+
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
+
+      final Queue queue = queueFactory.createQueueWith(queueConfig);
+
+      if (transientQueue) {
+         queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
+      } else {
+         queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
+      }
+
+      final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
+
+      if (queue.isDurable()) {
+         storageManager.addQueueBinding(txID, localQueueBinding);
+      }
+
+      try {
+         postOffice.addBinding(localQueueBinding);
+         if (queue.isDurable()) {
+            storageManager.commitBindings(txID);
+         }
+      } catch (Exception e) {
+         try {
+            if (durable) {
+               storageManager.rollbackBindings(txID);
+            }
+            final PageSubscription pageSubscription = queue.getPageSubscription();
+            try {
+               queue.close();
+            } finally {
+               if (pageSubscription != null) {
+                  pageSubscription.destroy();
+               }
+            }
+         } catch (Throwable ignored) {
+            logger.debug(ignored.getMessage(), ignored);
+         }
+         throw e;
+      }
+
+      if (addrInfo == null || !addrInfo.isInternal()) {
+         managementService.registerQueue(queue, queue.getAddress(), storageManager);
+      }
+
+      callPostQueueCreationCallbacks(queue.getName());
+
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
+
+      return queue;
+   }
+
    @Override
    public Queue createQueue(final SimpleString address,
                             final RoutingType routingType,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 3fb808f..8a7691e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,17 +16,16 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.PrefixUtil;
+
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
 public class AddressInfo {
 
-   //from openwire
-   public static final SimpleString ADVISORY_TOPIC = new SimpleString("ActiveMQ.Advisory.");
-
    private long id;
 
    private final SimpleString name;
@@ -35,6 +34,8 @@ public class AddressInfo {
 
    private Set<RoutingType> routingTypes;
 
+   private boolean internal = false;
+
    public AddressInfo(SimpleString name) {
       this(name, new HashSet<>());
    }
@@ -130,6 +131,27 @@ public class AddressInfo {
    }
 
    public boolean isInternal() {
-      return this.name.startsWith(ADVISORY_TOPIC);
+      return this.internal;
    }
+
+   public void setInternal(boolean internal) {
+      this.internal = internal;
+   }
+
+   public AddressInfo create(SimpleString name, RoutingType routingType) {
+      AddressInfo info = new AddressInfo(name, routingType);
+      info.setInternal(this.internal);
+      return info;
+   }
+
+   public AddressInfo getAddressAndRoutingType(Map<SimpleString, RoutingType> prefixes) {
+      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
+         if (this.getName().startsWith(entry.getKey())) {
+            AddressInfo newAddressInfo = this.create(PrefixUtil.removePrefix(this.getName(), entry.getKey()), entry.getValue());
+            return newAddressInfo;
+         }
+      }
+      return this;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index c48cb2e..dacc6bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -548,9 +548,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
-   public Queue createQueue(final SimpleString address,
+   public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
+      AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
+      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), false);
+   }
+
+   public Queue createQueue(final AddressInfo addressInfo,
                             final SimpleString name,
-                            final RoutingType routingType,
                             final SimpleString filterString,
                             final boolean temporary,
                             final boolean durable,
@@ -559,18 +563,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final boolean autoCreated) throws Exception {
       final SimpleString unPrefixedName = removePrefix(name);
 
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
+      AddressInfo art = getAddressAndRoutingType(addressInfo);
 
       if (durable) {
          // make sure the user has privileges to create this queue
-         securityCheck(address, name, CheckType.CREATE_DURABLE_QUEUE, this);
+         securityCheck(addressInfo.getName(), name, CheckType.CREATE_DURABLE_QUEUE, this);
       } else {
-         securityCheck(address, name, CheckType.CREATE_NON_DURABLE_QUEUE, this);
+         securityCheck(addressInfo.getName(), name, CheckType.CREATE_NON_DURABLE_QUEUE, this);
       }
 
       server.checkQueueCreationLimit(getUsername());
 
-      Queue queue = server.createQueue(art.getA(), art.getB(), unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
+      Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses());
 
       if (temporary) {
          // Temporary queue in core simply means the queue will be deleted if
@@ -591,13 +595,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       if (logger.isDebugEnabled()) {
-         logger.debug("Queue " + unPrefixedName + " created on address " + address +
-                         " with filter=" + filterString + " temporary = " +
-                         temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
+         logger.debug("Queue " + unPrefixedName + " created on address " + addressInfo.getName() +
+                 " with filter=" + filterString + " temporary = " +
+                 temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
       }
 
       return queue;
+   }
 
+   @Override
+   public Queue createQueue(final SimpleString address,
+                            final SimpleString name,
+                            final RoutingType routingType,
+                            final SimpleString filterString,
+                            final boolean temporary,
+                            final boolean durable,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean autoCreated) throws Exception {
+      return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, autoCreated);
    }
 
    @Override
@@ -613,6 +629,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, boolean autoCreated) throws Exception {
+      AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
+      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), autoCreated);
+   }
+
+   @Override
    public AddressInfo createAddress(final SimpleString address,
                                     Set<RoutingType> routingTypes,
                                     final boolean autoCreated) throws Exception {
@@ -626,10 +648,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    public AddressInfo createAddress(final SimpleString address,
                                     RoutingType routingType,
                                     final boolean autoCreated) throws Exception {
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
-      securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
-      server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
-      return server.getAddressInfo(art.getA());
+      return createAddress(new AddressInfo(address, routingType), autoCreated);
+   }
+
+   @Override
+   public AddressInfo createAddress(AddressInfo addressInfo, boolean autoCreated) throws Exception {
+      AddressInfo art = getAddressAndRoutingType(addressInfo);
+      securityCheck(art.getName(), CheckType.CREATE_ADDRESS, this);
+      server.addOrUpdateAddressInfo(art.setAutoCreated(autoCreated));
+      return server.getAddressInfo(art.getName());
    }
 
    @Override
@@ -1672,12 +1699,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             }
          } */
 
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
+      AddressInfo art = getAddressAndRoutingType(new AddressInfo(msg.getAddressSimpleString(), routingType));
 
       // Consumer
       // check the user has write access to this address.
       try {
-         securityCheck(art.getA(), CheckType.SEND, this);
+         securityCheck(art.getName(), CheckType.SEND, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends && tx != null) {
             tx.markAsRollbackOnly(e);
@@ -1695,8 +1722,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       try {
-         routingContext.setAddress(art.getA());
-         routingContext.setRoutingType(art.getB());
+         routingContext.setAddress(art.getName());
+         routingContext.setRoutingType(art.getRoutingType());
 
          result = postOffice.route(msg, routingContext, direct);
 
@@ -1738,12 +1765,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
-   public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
-                                                                   RoutingType defaultRoutingType) {
+   public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) {
       if (prefixEnabled) {
-         return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
+         return addressInfo.getAddressAndRoutingType(prefixes);
       }
-      return new Pair<>(address, defaultRoutingType);
+      return addressInfo;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 81a8e84..8a7e009 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -233,17 +233,25 @@ public class ManagementServiceImpl implements ManagementService {
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.ADDRESS + address);
    }
-   @Override
+
    public synchronized void registerQueue(final Queue queue,
-                                          final SimpleString address,
+                                          final AddressInfo addressInfo,
                                           final StorageManager storageManager) throws Exception {
-      QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
+
+      if (addressInfo.isInternal()) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("won't register internal queue: " + queue);
+         }
+         return;
+      }
+
+      QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
       if (messageCounterManager != null) {
          MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);
          messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
       }
-      ObjectName objectName = objectNameBuilder.getQueueObjectName(address, queue.getName(), queue.getRoutingType());
+      ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType());
       registerInJMX(objectName, queueControl);
       registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
 
@@ -251,6 +259,12 @@ public class ManagementServiceImpl implements ManagementService {
          logger.debug("registered queue " + objectName);
       }
    }
+   @Override
+   public synchronized void registerQueue(final Queue queue,
+                                          final SimpleString address,
+                                          final StorageManager storageManager) throws Exception {
+      registerQueue(queue, new AddressInfo(address), storageManager);
+   }
 
    @Override
    public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
index 1e1c8c4..cb2054a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
@@ -23,6 +23,7 @@ import java.util.Set;
 /**
  * allows objects to be mapped against a regex pattern and held in order in a list
  */
+//tmp comment: Can we use AddressInfo as the 'match' key?
 public interface HierarchicalRepository<T> {
 
    void disableListeners();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md
index 8755935..7f8b0f5 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -178,6 +178,33 @@ maxInactivityDurationInitalDelay. The shortest duration is taken for the connect
 
 More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html).
 
+### Disable/Enable Advisories
+
+By default, advisory topics ([ActiveMQ Advisory](http://activemq.apache.org/advisory-message.html))
+are created in order to send certain type of advisory messages to listening clients. As a result,
+advisory addresses and queues will be displayed on the management console, along with user deployed
+addresses and queues. This sometimes cause confusion because the advisory objects are internally
+managed without user being aware of them. In addition, users may not want the advisory topics at all
+(they cause extra resources and performance penalty) and it is convenient to disable them at all
+from the broker side.
+
+The protocol provides two parameters to control advisory behaviors on the broker side.
+
+* supportAdvisory
+Whether or not the broker supports advisory messages. If the value is true, advisory addresses/
+queues will be created. If the value is false, no advisory addresses/queues are created. Default
+value is true. 
+
+* suppressInternalManagementObjects
+Whether or not the advisory addresses/queues, if any, will be registered to management service
+(e.g. JMX registry). If set to true, no advisory addresses/queues will be registered. If set to
+false, those are registered and will be displayed on the management console. Default value is
+true.
+
+The two parameters are configured on openwire acceptors, via URLs or API. For example:
+
+    <acceptor name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
+
 ## MQTT
 
 MQTT is a light weight, client to server, publish / subscribe messaging protocol.  MQTT has been specifically

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
index 3ff3b6b..6a31a48 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
@@ -17,24 +17,35 @@
 package org.apache.activemq.artemis.tests.integration.openwire.management;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.advisory.ConsumerEventSource;
 import org.apache.activemq.advisory.ProducerEventSource;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQSession;
 import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
 
+@RunWith(Parameterized.class)
 public class OpenWireManagementTest extends OpenWireTestBase {
 
    private ActiveMQServerControl serverControl;
@@ -44,6 +55,27 @@ public class OpenWireManagementTest extends OpenWireTestBase {
 
    private ConnectionFactory factory;
 
+   @Parameterized.Parameters(name = "useDefault={0},supportAdvisory={1},suppressJmx={2}")
+   public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][] {
+         {true, false, false},
+         {false, true, false},
+         {false, true, true},
+         {false, false, false},
+         {false, false, true}
+      });
+   }
+
+   private boolean useDefault;
+   private boolean supportAdvisory;
+   private boolean suppressJmx;
+
+   public OpenWireManagementTest(boolean useDefault, boolean supportAdvisory, boolean suppressJmx) {
+      this.useDefault = useDefault;
+      this.supportAdvisory = supportAdvisory;
+      this.suppressJmx = suppressJmx;
+   }
+
    @Before
    @Override
    public void setUp() throws Exception {
@@ -55,6 +87,19 @@ public class OpenWireManagementTest extends OpenWireTestBase {
    @Override
    protected void extraServerConfig(Configuration serverConfig) {
       serverConfig.setJMXManagementEnabled(true);
+      if (useDefault) {
+         //don't set parameters explicitly
+         return;
+      }
+      Set<TransportConfiguration> acceptorConfigs = serverConfig.getAcceptorConfigurations();
+      for (TransportConfiguration tconfig : acceptorConfigs) {
+         if ("netty".equals(tconfig.getName())) {
+            Map<String, Object> params = tconfig.getExtraParams();
+            params.put("supportAdvisory", supportAdvisory);
+            params.put("suppressInternalManagementObjects", suppressJmx);
+            System.out.println("Now use properties: " + params);
+         }
+      }
    }
 
    @Test
@@ -67,7 +112,7 @@ public class OpenWireManagementTest extends OpenWireTestBase {
       String[] addresses = serverControl.getAddressNames();
       assertEquals(3, addresses.length);
       for (String addr : addresses) {
-         assertFalse(addr.startsWith(AddressInfo.ADVISORY_TOPIC.toString()));
+         assertFalse(addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX));
       }
 
       try (Connection connection = factory.createConnection()) {
@@ -88,11 +133,61 @@ public class OpenWireManagementTest extends OpenWireTestBase {
          //after that point several advisory addresses are created.
          //make sure they are not accessible via management api.
          addresses = serverControl.getAddressNames();
+         boolean hasInternalAddress = false;
          for (String addr : addresses) {
-            assertFalse(addr.startsWith(AddressInfo.ADVISORY_TOPIC.toString()));
+            hasInternalAddress = addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
+            if (hasInternalAddress) {
+               break;
+            }
          }
+         assertEquals(!useDefault && supportAdvisory && !suppressJmx, hasInternalAddress);
+
          consumerEventSource.stop();
          producerEventSource.stop();
       }
    }
+
+   @Test
+   public void testHiddenInternalQueue() throws Exception {
+
+      server.createQueue(queueName1, RoutingType.ANYCAST, queueName1, null, true, false, -1, false, true);
+
+      String[] queues = serverControl.getQueueNames();
+      assertEquals(1, queues.length);
+      for (String queue : queues) {
+         assertFalse(checkQueueFromInternalAddress(queue));
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createQueue(queueName1.toString());
+
+         //this causes advisory queues to be created
+         session.createProducer(destination);
+
+         queues = serverControl.getQueueNames();
+         boolean hasInternal = false;
+         String targetQueue = null;
+         for (String queue : queues) {
+            hasInternal = checkQueueFromInternalAddress(queue);
+            if (hasInternal) {
+               targetQueue = queue;
+               break;
+            }
+         }
+         assertEquals("targetQueue: " + targetQueue, !useDefault && supportAdvisory && !suppressJmx, hasInternal);
+      }
+   }
+
+   private boolean checkQueueFromInternalAddress(String queue) throws JMSException, ActiveMQException {
+      try (Connection coreConn = coreCf.createConnection()) {
+         ActiveMQSession session = (ActiveMQSession) coreConn.createSession();
+         ClientSession coreSession = session.getCoreSession();
+         ClientSession.QueueQuery query = coreSession.queueQuery(new SimpleString(queue));
+         assertTrue("Queue doesn't exist: " + queue, query.isExists());
+         SimpleString qAddr = query.getAddress();
+         return qAddr.toString().startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
+      }
+   }
 }


[2/2] activemq-artemis git commit: This closes #1647 ARTEMIS-1365 Advisory consumers listed in Console

Posted by an...@apache.org.
This closes #1647 ARTEMIS-1365 Advisory consumers listed in Console


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a8055bd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a8055bd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a8055bd

Branch: refs/heads/master
Commit: 9a8055bd3f5e408992e897712235818b2a84109f
Parents: 3c04de3 5fe88e1
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Nov 20 08:38:06 2017 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Nov 20 08:38:06 2017 +0000

----------------------------------------------------------------------
 .../activemq/artemis/utils/PrefixUtil.java      |  13 +--
 .../protocol/openwire/OpenWireConnection.java   |  23 +++-
 .../openwire/OpenWireProtocolManager.java       |  24 ++++
 .../core/protocol/openwire/amq/AMQConsumer.java |  17 ++-
 .../core/protocol/openwire/amq/AMQSession.java  |  10 +-
 .../stomp/VersionedStompFrameHandler.java       |   3 +-
 .../artemis/core/server/ActiveMQServer.java     |   4 +
 .../artemis/core/server/ServerSession.java      |  22 +++-
 .../core/server/impl/ActiveMQServerImpl.java    | 113 +++++++++++++++++++
 .../artemis/core/server/impl/AddressInfo.java   |  36 ++++--
 .../core/server/impl/ServerSessionImpl.java     |  68 +++++++----
 .../management/impl/ManagementServiceImpl.java  |  22 +++-
 .../core/settings/HierarchicalRepository.java   |   1 +
 .../en/protocols-interoperability.md            |  27 +++++
 .../management/OpenWireManagementTest.java      | 101 ++++++++++++++++-
 15 files changed, 424 insertions(+), 60 deletions(-)
----------------------------------------------------------------------