You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/07 17:13:00 UTC

qpid-broker-j git commit: QPID-7996: [Broker-J] Invoke link registry operations in configuration thread

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 7a816d58d -> c862a92b6


QPID-7996: [Broker-J] Invoke link registry operations in configuration thread


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c862a92b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c862a92b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c862a92b

Branch: refs/heads/master
Commit: c862a92b650fbb9bcc5665fdc03e652f3c54fc0c
Parents: 7a816d5
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 7 17:12:46 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 7 17:12:46 2018 +0000

----------------------------------------------------------------------
 .../qpid/server/model/NamedAddressSpace.java    |   4 +
 ...stractNonConnectionAcceptingVirtualHost.java |   5 +
 .../server/virtualhost/AbstractVirtualHost.java | 115 ++++++++++++++++++-
 .../server/virtualhost/LinkRegistryModel.java   |  24 +++-
 .../qpid/server/protocol/v1_0/LinkImpl.java     |  20 ++--
 .../qpid/server/protocol/v1_0/LinkRegistry.java |   6 +-
 .../server/protocol/v1_0/LinkRegistryImpl.java  |  77 ++++++++-----
 .../protocol/v1_0/SendingLinkEndpoint.java      |  61 ++++------
 .../management/amqp/ManagementAddressSpace.java |  10 +-
 9 files changed, 235 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index a87e860..ba8542c9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 
 public interface NamedAddressSpace extends Named
 {
@@ -62,9 +63,12 @@ public interface NamedAddressSpace extends Named
 
     <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
     <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+    @Deprecated
     <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
                                                          final Pattern linkNamePattern);
 
+    <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor);
+
     boolean authoriseCreateConnection(AMQPConnection<?> connection);
 
     DtxRegistry getDtxRegistry();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index 7efb208..cfadb6e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -164,6 +164,11 @@ public abstract class AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
     }
 
     @Override
+    public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
+    {
+    }
+
+    @Override
     public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         return false;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index b6b0bbd..de2b11e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1755,22 +1755,129 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
-    public <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName)
+    public <T extends LinkModel> T getSendingLink( String remoteContainerId, String linkName)
     {
-        return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+        return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
+        {
+            @Override
+            public ListenableFuture<T> execute()
+            {
+                return Futures.immediateFuture((T)_linkRegistry.getSendingLink(remoteContainerId, linkName));
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractVirtualHost.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "getSendingLink";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
+            }
+        }));
     }
 
     @Override
     public <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName)
     {
-        return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+        return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
+        {
+            @Override
+            public ListenableFuture<T> execute()
+            {
+                return Futures.immediateFuture((T)_linkRegistry.getReceivingLink(remoteContainerId, linkName));
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractVirtualHost.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "getReceivingLink";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
+            }
+        }));
     }
 
     @Override
     public <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
                                                                 final Pattern linkNamePattern)
     {
-        return _linkRegistry.findSendingLinks(containerIdPattern, linkNamePattern);
+        return doSync(doOnConfigThread(new Task<ListenableFuture<Collection<T>>, RuntimeException>()
+        {
+            @Override
+            public ListenableFuture<Collection<T>> execute()
+            {
+                return Futures.immediateFuture(_linkRegistry.findSendingLinks(containerIdPattern, linkNamePattern));
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractVirtualHost.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "findSendingLinks";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return String.format("containerIdPattern='%s', linkNamePattern='%s'", containerIdPattern, linkNamePattern);
+            }
+        }));
+    }
+
+    @Override
+    public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
+    {
+        doSync(doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>()
+        {
+            @Override
+            public ListenableFuture<Void> execute()
+            {
+                _linkRegistry.visitSendingLinks(visitor);
+                return Futures.immediateFuture(null);
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractVirtualHost.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "visitSendingLinks";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return String.format("visitor='%s'", visitor);
+            }
+        }));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
index 6939510..e0af033 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
@@ -25,18 +25,32 @@ import java.util.regex.Pattern;
 
 import org.apache.qpid.server.protocol.LinkModel;
 
-public interface LinkRegistryModel
+public interface LinkRegistryModel<T extends LinkModel>
 {
-    <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
-    <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
-    <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
-                                                         final Pattern linkNamePattern);
+    T getSendingLink(String remoteContainerId, String linkName);
+
+    T getReceivingLink(String remoteContainerId, String linkName);
+
+    @Deprecated
+    Collection<T> findSendingLinks(final Pattern containerIdPattern,
+                                   final Pattern linkNamePattern);
+
+    void visitSendingLinks(LinkVisitor<T> visitor);
+
     void purgeSendingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
+
     void purgeReceivingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
 
     void open();
+
     void close();
+
     void delete();
 
     Object dump();
+
+    interface LinkVisitor<T extends LinkModel>
+    {
+        boolean visit(T link);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
index 78d0bd2..692faa6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
@@ -56,7 +56,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
     private final String _remoteContainerId;
     private final String _linkName;
     private final Role _role;
-    private final LinkRegistry _linkRegistry;
+    private final LinkRegistry<S, T> _linkRegistry;
     private final Queue<ThiefInformation> _thiefQueue = new LinkedList<>();
 
     private volatile LinkEndpoint<S, T> _linkEndpoint;
@@ -65,7 +65,10 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
     private boolean _stealingInProgress;
     private final Queue<Action<? super Link_1_0<S, T>>> _deleteTasks = new ConcurrentLinkedQueue<>();
 
-    public LinkImpl(final String remoteContainerId, final String linkName, final Role role, final LinkRegistry linkRegistry)
+    public LinkImpl(final String remoteContainerId,
+                    final String linkName,
+                    final Role role,
+                    final LinkRegistry<S, T> linkRegistry)
     {
         _remoteContainerId = remoteContainerId;
         _linkName = linkName;
@@ -73,7 +76,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
         _linkRegistry = linkRegistry;
     }
 
-    public LinkImpl(final LinkDefinition<S, T> linkDefinition, final LinkRegistry linkRegistry)
+    public LinkImpl(final LinkDefinition<S, T> linkDefinition, final LinkRegistry<S, T> linkRegistry)
     {
         this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), linkDefinition.getRole(), linkRegistry);
         setTermini(linkDefinition.getSource(), linkDefinition.getTarget());
@@ -116,7 +119,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
     }
 
     @Override
-    public void linkClosed()
+    public synchronized void linkClosed()
     {
         Iterator<Action<? super Link_1_0<S, T>>> iterator = _deleteTasks.iterator();
         while (iterator.hasNext())
@@ -130,7 +133,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
     }
 
     @Override
-    public void discardEndpoint()
+    public synchronized void discardEndpoint()
     {
         _linkEndpoint = null;
     }
@@ -266,7 +269,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
     private ListenableFuture<LinkEndpoint<S, T>> doStealLink(final Session_1_0 session, final Attach attach)
     {
         final SettableFuture<LinkEndpoint<S, T>> returnFuture = SettableFuture.create();
-        LinkEndpoint<S, T> linkEndpoint = _linkEndpoint;
+        final LinkEndpoint<S, T> linkEndpoint = _linkEndpoint;
 
         // check whether linkEndpoint has been closed in the mean time
         if (linkEndpoint != null)
@@ -275,9 +278,10 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
                     () ->
                     {
                         // check whether linkEndpoint has been closed in the mean time
-                        if (_linkEndpoint != null)
+                        LinkEndpoint<S, T> endpoint = _linkEndpoint;
+                        if (endpoint != null)
                         {
-                            _linkEndpoint.close(new Error(LinkError.STOLEN,
+                            endpoint.close(new Error(LinkError.STOLEN,
                                                           String.format("Link is being stolen by connection '%s'",
                                                                         session.getConnection())));
                         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
index 26c82ea..d140a65 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
@@ -25,11 +25,11 @@ import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 
-public interface LinkRegistry extends LinkRegistryModel
+public interface LinkRegistry<S extends BaseSource, T extends BaseTarget> extends LinkRegistryModel<Link_1_0<S, T>>
 {
-    void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link);
+    void linkClosed(final Link_1_0<S, T> link);
 
-    void linkChanged(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link);
+    void linkChanged(final Link_1_0<S, T> link);
 
     TerminusDurability getHighestSupportedTerminusDurability();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
index a48130d..ede4d5b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
@@ -45,11 +45,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
-public class LinkRegistryImpl implements LinkRegistry
+public class LinkRegistryImpl<S extends BaseSource, T extends BaseTarget> implements LinkRegistry<S, T>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(LinkRegistryImpl.class);
-    private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> _sendingLinkRegistry = new ConcurrentHashMap<>();
-    private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> _receivingLinkRegistry = new ConcurrentHashMap<>();
+    private final ConcurrentMap<LinkKey, Link_1_0<S, T>> _sendingLinkRegistry = new ConcurrentHashMap<>();
+    private final ConcurrentMap<LinkKey, Link_1_0<S, T>> _receivingLinkRegistry = new ConcurrentHashMap<>();
 
     private final LinkStore _linkStore;
 
@@ -74,22 +74,21 @@ public class LinkRegistryImpl implements LinkRegistry
     }
 
     @Override
-    public Link_1_0<? extends BaseSource, ? extends BaseTarget> getSendingLink(final String remoteContainerId, final String linkName)
+    public Link_1_0<S, T> getSendingLink(final String remoteContainerId, final String linkName)
     {
         return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER);
     }
 
     @Override
-    public Link_1_0<? extends BaseSource, ? extends BaseTarget> getReceivingLink(final String remoteContainerId, final String linkName)
+    public Link_1_0<S, T> getReceivingLink(final String remoteContainerId, final String linkName)
     {
         return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER);
     }
 
     @Override
-    public void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget>  link)
+    public void linkClosed(final Link_1_0<S, T> link)
     {
-        ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry =
-                getLinkRegistry(link.getRole());
+        ConcurrentMap<LinkKey, Link_1_0<S, T>> linkRegistry = getLinkRegistry(link.getRole());
         linkRegistry.remove(new LinkKey(link));
         if (isDurableLink(link))
         {
@@ -98,7 +97,7 @@ public class LinkRegistryImpl implements LinkRegistry
     }
 
     @Override
-    public void linkChanged(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link)
+    public void linkChanged(final Link_1_0<S,T> link)
     {
         getLinkRegistry(link.getRole()).putIfAbsent(new LinkKey(link), link);
         if (isDurableLink(link))
@@ -115,7 +114,7 @@ public class LinkRegistryImpl implements LinkRegistry
     }
 
     @Override
-    public Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>> findSendingLinks(final Pattern containerIdPattern,
+    public Collection<Link_1_0<S,T>> findSendingLinks(final Pattern containerIdPattern,
                                                                                              final Pattern linkNamePattern)
     {
         return _sendingLinkRegistry.entrySet()
@@ -126,6 +125,25 @@ public class LinkRegistryImpl implements LinkRegistry
                                    .collect(Collectors.toList());
     }
 
+
+    @Override
+    public void visitSendingLinks(final LinkVisitor<Link_1_0<S,T>> visitor)
+    {
+        visitLinks(_sendingLinkRegistry.values(), visitor);
+    }
+
+    private void visitLinks(final Collection<Link_1_0<S, T>> links,
+                            final LinkVisitor<Link_1_0<S, T>> visitor)
+    {
+        for (Link_1_0<S, T> link : links)
+        {
+            if (visitor.visit(link))
+            {
+                break;
+            }
+        }
+    }
+
     @Override
     public void purgeSendingLinks(final Pattern containerIdPattern, final Pattern linkNamePattern)
     {
@@ -142,10 +160,11 @@ public class LinkRegistryImpl implements LinkRegistry
     public void open()
     {
         Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
-        for(LinkDefinition<? extends BaseSource, ? extends BaseTarget> link: links)
+        for(LinkDefinition<Source, Target> link: links)
         {
-            ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry = getLinkRegistry(link.getRole());
-            linkRegistry.put(new LinkKey(link), new LinkImpl<>(link, this));
+            ConcurrentMap<LinkKey, Link_1_0<S,T>> linkRegistry = getLinkRegistry(link.getRole());
+            LinkDefinition<S, T> definition = (LinkDefinition<S, T>) link;
+            linkRegistry.put(new LinkKey(link), new LinkImpl<>(definition, this));
         }
     }
 
@@ -169,14 +188,14 @@ public class LinkRegistryImpl implements LinkRegistry
                    && ((Target) link.getTarget()).getDurable() != TerminusDurability.NONE);
     }
 
-    private Link_1_0<? extends BaseSource, ? extends BaseTarget> getLinkFromRegistry(final String remoteContainerId,
-                                                                                     final String linkName,
-                                                                                     final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry,
-                                                                                     final Role role)
+    private Link_1_0<S, T> getLinkFromRegistry(final String remoteContainerId,
+                                               final String linkName,
+                                               final ConcurrentMap<LinkKey, Link_1_0<S, T>> linkRegistry,
+                                               final Role role)
     {
         LinkKey linkKey = new LinkKey(remoteContainerId, linkName, role);
-        Link_1_0<? extends BaseSource, ? extends BaseTarget> newLink = new LinkImpl(remoteContainerId, linkName, role, this);
-        Link_1_0<? extends BaseSource, ? extends BaseTarget> link = linkRegistry.putIfAbsent(linkKey, newLink);
+        Link_1_0<S, T> newLink = new LinkImpl<>(remoteContainerId, linkName, role, this);
+        Link_1_0<S, T> link = linkRegistry.putIfAbsent(linkKey, newLink);
         if (link == null)
         {
             link = newLink;
@@ -184,7 +203,7 @@ public class LinkRegistryImpl implements LinkRegistry
         return link;
     }
 
-    private void purgeLinks(final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry,
+    private void purgeLinks(final ConcurrentMap<LinkKey, Link_1_0<S,T>> linkRegistry,
                             final Pattern containerIdPattern, final Pattern linkNamePattern)
     {
         linkRegistry.entrySet()
@@ -194,9 +213,9 @@ public class LinkRegistryImpl implements LinkRegistry
                     .forEach(e -> e.getValue().linkClosed());
     }
 
-    private ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> getLinkRegistry(final Role role)
+    private ConcurrentMap<LinkKey, Link_1_0<S,T>> getLinkRegistry(final Role role)
     {
-        ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry;
+        ConcurrentMap<LinkKey, Link_1_0<S,T>> linkRegistry;
         if (Role.SENDER == role)
         {
             linkRegistry = _sendingLinkRegistry;
@@ -223,10 +242,10 @@ public class LinkRegistryImpl implements LinkRegistry
         return dump;
     }
 
-    private void dumpRegistry(ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> registry,
+    private void dumpRegistry(ConcurrentMap<LinkKey, Link_1_0<S, T>> registry,
                               LinkRegistryDump dump)
     {
-        for (Map.Entry<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> entry : registry.entrySet())
+        for (Map.Entry<LinkKey, Link_1_0<S,T>> entry : registry.entrySet())
         {
             LinkKey linkKey = entry.getKey();
             LinkRegistryDump.ContainerDump containerLinks =
@@ -246,9 +265,9 @@ public class LinkRegistryImpl implements LinkRegistry
         }
     }
 
-    public static class LinkRegistryDump
+    static class LinkRegistryDump
     {
-        public static class ContainerDump
+        static class ContainerDump
         {
             public static class LinkDump
             {
@@ -269,12 +288,12 @@ public class LinkRegistryImpl implements LinkRegistry
             private Map<String, LinkDump> _sendingLinks = new LinkedHashMap<>();
             private Map<String, LinkDump> _receivingLinks = new LinkedHashMap<>();
 
-            public Map<String, LinkDump> getSendingLinks()
+            Map<String, LinkDump> getSendingLinks()
             {
                 return Collections.unmodifiableMap(_sendingLinks);
             }
 
-            public Map<String, LinkDump> getReceivingLinks()
+            Map<String, LinkDump> getReceivingLinks()
             {
                 return Collections.unmodifiableMap(_receivingLinks);
             }
@@ -282,7 +301,7 @@ public class LinkRegistryImpl implements LinkRegistry
 
         private Map<String, ContainerDump> _containers = new LinkedHashMap<>();
 
-        public Map<String, ContainerDump> getContainers()
+        Map<String, ContainerDump> getContainers()
         {
             return Collections.unmodifiableMap(_containers);
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index b3603d9..2fb7421 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -84,6 +84,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
@@ -391,27 +392,25 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                 && capabilities.contains(Session_1_0.SHARED_CAPABILITY)
                 && getLinkName().endsWith("|global"))
             {
-                NamedAddressSpace namedAddressSpace = getSession().getConnection().getAddressSpace();
-                Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>> links =
-                        namedAddressSpace.findSendingLinks(ANY_CONTAINER_ID,
-                                                           Pattern.compile("^" + Pattern.quote(getLinkName()) + "$"));
-                for (Link_1_0<? extends BaseSource, ? extends BaseTarget> link : links)
-                {
-                    BaseSource baseSource = link.getSource();
-                    if (baseSource instanceof Source)
+                final NamedAddressSpace namedAddressSpace = getSession().getConnection().getAddressSpace();
+                final Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
+                namedAddressSpace.visitSendingLinks((LinkRegistryModel.LinkVisitor<Link_1_0<Source, Target>>) link -> {
+                    if (link.getSource() != null
+                        && ANY_CONTAINER_ID.matcher(link.getRemoteContainerId()).matches()
+                        && linkNamePattern.matcher(link.getName()).matches())
                     {
-                        Source linkSource = (Source) baseSource;
-                        source = new Source(linkSource);
-                        getLink().setSource(source);
-                        break;
+                        getLink().setSource(new Source(link.getSource()));
+                        return true;
                     }
-                }
+                    return false;
+                });
             }
         }
 
+        source = getSource();
         if (source == null)
         {
-            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
+            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, "Link not found"));
         }
 
         attach.setSource(source);
@@ -861,18 +860,19 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                         && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
                         && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
                     {
-                        Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
+                        final Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
                                 ? ANY_CONTAINER_ID
                                 : Pattern.compile("^" + Pattern.quote(getSession().getConnection().getRemoteContainerId()) + "$");
-                        Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "\\|?\\d*$");
-                        final Collection<LinkModel> links = addressSpace.findSendingLinks(containerIdPattern, linkNamePattern);
-                        for (LinkModel link : links)
-                        {
-                            if (link instanceof Link_1_0)
+                        final Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "\\|?\\d*$");
+
+                        addressSpace.visitSendingLinks((LinkRegistryModel.LinkVisitor<Link_1_0<Source, Target>>) link -> {
+                            if (containerIdPattern.matcher(link.getRemoteContainerId()).matches()
+                                && linkNamePattern.matcher(link.getName()).matches())
                             {
-                                ((Link_1_0) link).linkClosed();
+                                link.linkClosed();
                             }
-                        }
+                            return false;
+                        });
                     }
                 }
                 catch (AccessControlException e)
@@ -918,23 +918,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                  && ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach()
                  && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
                  && sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
-                 && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
+                 && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)
+                 && !getLinkName().endsWith("|global"))
         {
             // For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around.
             // However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription.
-            if (!getLinkName().endsWith("|global"))
-            {
-                getLink().linkClosed();
-            }
-            else
-            {
-                Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
-                final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
-                if (links.size() > 1)
-                {
-                    getLink().linkClosed();
-                }
-            }
+            getLink().linkClosed();
         }
         super.detach(error, close);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 4fc2508..379ec5e 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -241,13 +241,13 @@ public class ManagementAddressSpace implements NamedAddressSpace
     @Override
     public <T extends LinkModel> T getSendingLink(final String remoteContainerId, final String linkName)
     {
-        return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+        return (T)_linkRegistry.getSendingLink(remoteContainerId, linkName);
     }
 
     @Override
     public <T extends LinkModel> T getReceivingLink(final String remoteContainerId, final String linkName)
     {
-        return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+        return (T)_linkRegistry.getReceivingLink(remoteContainerId, linkName);
     }
 
     @Override
@@ -258,6 +258,12 @@ public class ManagementAddressSpace implements NamedAddressSpace
     }
 
     @Override
+    public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
+    {
+        _linkRegistry.visitSendingLinks(visitor);
+    }
+
+    @Override
     public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         _broker.authorise(Operation.PERFORM_ACTION("manage"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org