You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/07 17:13:00 UTC
qpid-broker-j git commit: QPID-7996: [Broker-J] Invoke link registry
operations in configuration thread
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 7a816d58d -> c862a92b6
QPID-7996: [Broker-J] Invoke link registry operations in configuration thread
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c862a92b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c862a92b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c862a92b
Branch: refs/heads/master
Commit: c862a92b650fbb9bcc5665fdc03e652f3c54fc0c
Parents: 7a816d5
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 7 17:12:46 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 7 17:12:46 2018 +0000
----------------------------------------------------------------------
.../qpid/server/model/NamedAddressSpace.java | 4 +
...stractNonConnectionAcceptingVirtualHost.java | 5 +
.../server/virtualhost/AbstractVirtualHost.java | 115 ++++++++++++++++++-
.../server/virtualhost/LinkRegistryModel.java | 24 +++-
.../qpid/server/protocol/v1_0/LinkImpl.java | 20 ++--
.../qpid/server/protocol/v1_0/LinkRegistry.java | 6 +-
.../server/protocol/v1_0/LinkRegistryImpl.java | 77 ++++++++-----
.../protocol/v1_0/SendingLinkEndpoint.java | 61 ++++------
.../management/amqp/ManagementAddressSpace.java | 10 +-
9 files changed, 235 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index a87e860..ba8542c9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
public interface NamedAddressSpace extends Named
{
@@ -62,9 +63,12 @@ public interface NamedAddressSpace extends Named
<T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
<T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+ @Deprecated
<T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
final Pattern linkNamePattern);
+ <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor);
+
boolean authoriseCreateConnection(AMQPConnection<?> connection);
DtxRegistry getDtxRegistry();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index 7efb208..cfadb6e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -164,6 +164,11 @@ public abstract class AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
}
@Override
+ public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
+ {
+ }
+
+ @Override
public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
return false;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index b6b0bbd..de2b11e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1755,22 +1755,129 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- public <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName)
+ public <T extends LinkModel> T getSendingLink( String remoteContainerId, String linkName)
{
- return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+ return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
+ {
+ @Override
+ public ListenableFuture<T> execute()
+ {
+ return Futures.immediateFuture((T)_linkRegistry.getSendingLink(remoteContainerId, linkName));
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractVirtualHost.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "getSendingLink";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
+ }
+ }));
}
@Override
public <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName)
{
- return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+ return doSync(doOnConfigThread(new Task<ListenableFuture<T>, RuntimeException>()
+ {
+ @Override
+ public ListenableFuture<T> execute()
+ {
+ return Futures.immediateFuture((T)_linkRegistry.getReceivingLink(remoteContainerId, linkName));
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractVirtualHost.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "getReceivingLink";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return String.format("remoteContainerId='%s', linkName='%s'", remoteContainerId, linkName);
+ }
+ }));
}
@Override
public <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
final Pattern linkNamePattern)
{
- return _linkRegistry.findSendingLinks(containerIdPattern, linkNamePattern);
+ return doSync(doOnConfigThread(new Task<ListenableFuture<Collection<T>>, RuntimeException>()
+ {
+ @Override
+ public ListenableFuture<Collection<T>> execute()
+ {
+ return Futures.immediateFuture(_linkRegistry.findSendingLinks(containerIdPattern, linkNamePattern));
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractVirtualHost.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "findSendingLinks";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return String.format("containerIdPattern='%s', linkNamePattern='%s'", containerIdPattern, linkNamePattern);
+ }
+ }));
+ }
+
+ @Override
+ public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
+ {
+ doSync(doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>()
+ {
+ @Override
+ public ListenableFuture<Void> execute()
+ {
+ _linkRegistry.visitSendingLinks(visitor);
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractVirtualHost.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "visitSendingLinks";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return String.format("visitor='%s'", visitor);
+ }
+ }));
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
index 6939510..e0af033 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
@@ -25,18 +25,32 @@ import java.util.regex.Pattern;
import org.apache.qpid.server.protocol.LinkModel;
-public interface LinkRegistryModel
+public interface LinkRegistryModel<T extends LinkModel>
{
- <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
- <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
- <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
- final Pattern linkNamePattern);
+ T getSendingLink(String remoteContainerId, String linkName);
+
+ T getReceivingLink(String remoteContainerId, String linkName);
+
+ @Deprecated
+ Collection<T> findSendingLinks(final Pattern containerIdPattern,
+ final Pattern linkNamePattern);
+
+ void visitSendingLinks(LinkVisitor<T> visitor);
+
void purgeSendingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
+
void purgeReceivingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
void open();
+
void close();
+
void delete();
Object dump();
+
+ interface LinkVisitor<T extends LinkModel>
+ {
+ boolean visit(T link);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
index 78d0bd2..692faa6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
@@ -56,7 +56,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
private final String _remoteContainerId;
private final String _linkName;
private final Role _role;
- private final LinkRegistry _linkRegistry;
+ private final LinkRegistry<S, T> _linkRegistry;
private final Queue<ThiefInformation> _thiefQueue = new LinkedList<>();
private volatile LinkEndpoint<S, T> _linkEndpoint;
@@ -65,7 +65,10 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
private boolean _stealingInProgress;
private final Queue<Action<? super Link_1_0<S, T>>> _deleteTasks = new ConcurrentLinkedQueue<>();
- public LinkImpl(final String remoteContainerId, final String linkName, final Role role, final LinkRegistry linkRegistry)
+ public LinkImpl(final String remoteContainerId,
+ final String linkName,
+ final Role role,
+ final LinkRegistry<S, T> linkRegistry)
{
_remoteContainerId = remoteContainerId;
_linkName = linkName;
@@ -73,7 +76,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
_linkRegistry = linkRegistry;
}
- public LinkImpl(final LinkDefinition<S, T> linkDefinition, final LinkRegistry linkRegistry)
+ public LinkImpl(final LinkDefinition<S, T> linkDefinition, final LinkRegistry<S, T> linkRegistry)
{
this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), linkDefinition.getRole(), linkRegistry);
setTermini(linkDefinition.getSource(), linkDefinition.getTarget());
@@ -116,7 +119,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
}
@Override
- public void linkClosed()
+ public synchronized void linkClosed()
{
Iterator<Action<? super Link_1_0<S, T>>> iterator = _deleteTasks.iterator();
while (iterator.hasNext())
@@ -130,7 +133,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
}
@Override
- public void discardEndpoint()
+ public synchronized void discardEndpoint()
{
_linkEndpoint = null;
}
@@ -266,7 +269,7 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
private ListenableFuture<LinkEndpoint<S, T>> doStealLink(final Session_1_0 session, final Attach attach)
{
final SettableFuture<LinkEndpoint<S, T>> returnFuture = SettableFuture.create();
- LinkEndpoint<S, T> linkEndpoint = _linkEndpoint;
+ final LinkEndpoint<S, T> linkEndpoint = _linkEndpoint;
// check whether linkEndpoint has been closed in the mean time
if (linkEndpoint != null)
@@ -275,9 +278,10 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
() ->
{
// check whether linkEndpoint has been closed in the mean time
- if (_linkEndpoint != null)
+ LinkEndpoint<S, T> endpoint = _linkEndpoint;
+ if (endpoint != null)
{
- _linkEndpoint.close(new Error(LinkError.STOLEN,
+ endpoint.close(new Error(LinkError.STOLEN,
String.format("Link is being stolen by connection '%s'",
session.getConnection())));
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
index 26c82ea..d140a65 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
@@ -25,11 +25,11 @@ import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.virtualhost.LinkRegistryModel;
-public interface LinkRegistry extends LinkRegistryModel
+public interface LinkRegistry<S extends BaseSource, T extends BaseTarget> extends LinkRegistryModel<Link_1_0<S, T>>
{
- void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link);
+ void linkClosed(final Link_1_0<S, T> link);
- void linkChanged(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link);
+ void linkChanged(final Link_1_0<S, T> link);
TerminusDurability getHighestSupportedTerminusDurability();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
index a48130d..ede4d5b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
@@ -45,11 +45,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-public class LinkRegistryImpl implements LinkRegistry
+public class LinkRegistryImpl<S extends BaseSource, T extends BaseTarget> implements LinkRegistry<S, T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(LinkRegistryImpl.class);
- private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> _sendingLinkRegistry = new ConcurrentHashMap<>();
- private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> _receivingLinkRegistry = new ConcurrentHashMap<>();
+ private final ConcurrentMap<LinkKey, Link_1_0<S, T>> _sendingLinkRegistry = new ConcurrentHashMap<>();
+ private final ConcurrentMap<LinkKey, Link_1_0<S, T>> _receivingLinkRegistry = new ConcurrentHashMap<>();
private final LinkStore _linkStore;
@@ -74,22 +74,21 @@ public class LinkRegistryImpl implements LinkRegistry
}
@Override
- public Link_1_0<? extends BaseSource, ? extends BaseTarget> getSendingLink(final String remoteContainerId, final String linkName)
+ public Link_1_0<S, T> getSendingLink(final String remoteContainerId, final String linkName)
{
return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER);
}
@Override
- public Link_1_0<? extends BaseSource, ? extends BaseTarget> getReceivingLink(final String remoteContainerId, final String linkName)
+ public Link_1_0<S, T> getReceivingLink(final String remoteContainerId, final String linkName)
{
return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER);
}
@Override
- public void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link)
+ public void linkClosed(final Link_1_0<S, T> link)
{
- ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry =
- getLinkRegistry(link.getRole());
+ ConcurrentMap<LinkKey, Link_1_0<S, T>> linkRegistry = getLinkRegistry(link.getRole());
linkRegistry.remove(new LinkKey(link));
if (isDurableLink(link))
{
@@ -98,7 +97,7 @@ public class LinkRegistryImpl implements LinkRegistry
}
@Override
- public void linkChanged(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link)
+ public void linkChanged(final Link_1_0<S,T> link)
{
getLinkRegistry(link.getRole()).putIfAbsent(new LinkKey(link), link);
if (isDurableLink(link))
@@ -115,7 +114,7 @@ public class LinkRegistryImpl implements LinkRegistry
}
@Override
- public Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>> findSendingLinks(final Pattern containerIdPattern,
+ public Collection<Link_1_0<S,T>> findSendingLinks(final Pattern containerIdPattern,
final Pattern linkNamePattern)
{
return _sendingLinkRegistry.entrySet()
@@ -126,6 +125,25 @@ public class LinkRegistryImpl implements LinkRegistry
.collect(Collectors.toList());
}
+
+ @Override
+ public void visitSendingLinks(final LinkVisitor<Link_1_0<S,T>> visitor)
+ {
+ visitLinks(_sendingLinkRegistry.values(), visitor);
+ }
+
+ private void visitLinks(final Collection<Link_1_0<S, T>> links,
+ final LinkVisitor<Link_1_0<S, T>> visitor)
+ {
+ for (Link_1_0<S, T> link : links)
+ {
+ if (visitor.visit(link))
+ {
+ break;
+ }
+ }
+ }
+
@Override
public void purgeSendingLinks(final Pattern containerIdPattern, final Pattern linkNamePattern)
{
@@ -142,10 +160,11 @@ public class LinkRegistryImpl implements LinkRegistry
public void open()
{
Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
- for(LinkDefinition<? extends BaseSource, ? extends BaseTarget> link: links)
+ for(LinkDefinition<Source, Target> link: links)
{
- ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry = getLinkRegistry(link.getRole());
- linkRegistry.put(new LinkKey(link), new LinkImpl<>(link, this));
+ ConcurrentMap<LinkKey, Link_1_0<S,T>> linkRegistry = getLinkRegistry(link.getRole());
+ LinkDefinition<S, T> definition = (LinkDefinition<S, T>) link;
+ linkRegistry.put(new LinkKey(link), new LinkImpl<>(definition, this));
}
}
@@ -169,14 +188,14 @@ public class LinkRegistryImpl implements LinkRegistry
&& ((Target) link.getTarget()).getDurable() != TerminusDurability.NONE);
}
- private Link_1_0<? extends BaseSource, ? extends BaseTarget> getLinkFromRegistry(final String remoteContainerId,
- final String linkName,
- final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry,
- final Role role)
+ private Link_1_0<S, T> getLinkFromRegistry(final String remoteContainerId,
+ final String linkName,
+ final ConcurrentMap<LinkKey, Link_1_0<S, T>> linkRegistry,
+ final Role role)
{
LinkKey linkKey = new LinkKey(remoteContainerId, linkName, role);
- Link_1_0<? extends BaseSource, ? extends BaseTarget> newLink = new LinkImpl(remoteContainerId, linkName, role, this);
- Link_1_0<? extends BaseSource, ? extends BaseTarget> link = linkRegistry.putIfAbsent(linkKey, newLink);
+ Link_1_0<S, T> newLink = new LinkImpl<>(remoteContainerId, linkName, role, this);
+ Link_1_0<S, T> link = linkRegistry.putIfAbsent(linkKey, newLink);
if (link == null)
{
link = newLink;
@@ -184,7 +203,7 @@ public class LinkRegistryImpl implements LinkRegistry
return link;
}
- private void purgeLinks(final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry,
+ private void purgeLinks(final ConcurrentMap<LinkKey, Link_1_0<S,T>> linkRegistry,
final Pattern containerIdPattern, final Pattern linkNamePattern)
{
linkRegistry.entrySet()
@@ -194,9 +213,9 @@ public class LinkRegistryImpl implements LinkRegistry
.forEach(e -> e.getValue().linkClosed());
}
- private ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> getLinkRegistry(final Role role)
+ private ConcurrentMap<LinkKey, Link_1_0<S,T>> getLinkRegistry(final Role role)
{
- ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry;
+ ConcurrentMap<LinkKey, Link_1_0<S,T>> linkRegistry;
if (Role.SENDER == role)
{
linkRegistry = _sendingLinkRegistry;
@@ -223,10 +242,10 @@ public class LinkRegistryImpl implements LinkRegistry
return dump;
}
- private void dumpRegistry(ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> registry,
+ private void dumpRegistry(ConcurrentMap<LinkKey, Link_1_0<S, T>> registry,
LinkRegistryDump dump)
{
- for (Map.Entry<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> entry : registry.entrySet())
+ for (Map.Entry<LinkKey, Link_1_0<S,T>> entry : registry.entrySet())
{
LinkKey linkKey = entry.getKey();
LinkRegistryDump.ContainerDump containerLinks =
@@ -246,9 +265,9 @@ public class LinkRegistryImpl implements LinkRegistry
}
}
- public static class LinkRegistryDump
+ static class LinkRegistryDump
{
- public static class ContainerDump
+ static class ContainerDump
{
public static class LinkDump
{
@@ -269,12 +288,12 @@ public class LinkRegistryImpl implements LinkRegistry
private Map<String, LinkDump> _sendingLinks = new LinkedHashMap<>();
private Map<String, LinkDump> _receivingLinks = new LinkedHashMap<>();
- public Map<String, LinkDump> getSendingLinks()
+ Map<String, LinkDump> getSendingLinks()
{
return Collections.unmodifiableMap(_sendingLinks);
}
- public Map<String, LinkDump> getReceivingLinks()
+ Map<String, LinkDump> getReceivingLinks()
{
return Collections.unmodifiableMap(_receivingLinks);
}
@@ -282,7 +301,7 @@ public class LinkRegistryImpl implements LinkRegistry
private Map<String, ContainerDump> _containers = new LinkedHashMap<>();
- public Map<String, ContainerDump> getContainers()
+ Map<String, ContainerDump> getContainers()
{
return Collections.unmodifiableMap(_containers);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index b3603d9..2fb7421 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -84,6 +84,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
@@ -391,27 +392,25 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
&& capabilities.contains(Session_1_0.SHARED_CAPABILITY)
&& getLinkName().endsWith("|global"))
{
- NamedAddressSpace namedAddressSpace = getSession().getConnection().getAddressSpace();
- Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>> links =
- namedAddressSpace.findSendingLinks(ANY_CONTAINER_ID,
- Pattern.compile("^" + Pattern.quote(getLinkName()) + "$"));
- for (Link_1_0<? extends BaseSource, ? extends BaseTarget> link : links)
- {
- BaseSource baseSource = link.getSource();
- if (baseSource instanceof Source)
+ final NamedAddressSpace namedAddressSpace = getSession().getConnection().getAddressSpace();
+ final Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
+ namedAddressSpace.visitSendingLinks((LinkRegistryModel.LinkVisitor<Link_1_0<Source, Target>>) link -> {
+ if (link.getSource() != null
+ && ANY_CONTAINER_ID.matcher(link.getRemoteContainerId()).matches()
+ && linkNamePattern.matcher(link.getName()).matches())
{
- Source linkSource = (Source) baseSource;
- source = new Source(linkSource);
- getLink().setSource(source);
- break;
+ getLink().setSource(new Source(link.getSource()));
+ return true;
}
- }
+ return false;
+ });
}
}
+ source = getSource();
if (source == null)
{
- throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
+ throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, "Link not found"));
}
attach.setSource(source);
@@ -861,18 +860,19 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
&& sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
&& sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
{
- Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
+ final Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
? ANY_CONTAINER_ID
: Pattern.compile("^" + Pattern.quote(getSession().getConnection().getRemoteContainerId()) + "$");
- Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "\\|?\\d*$");
- final Collection<LinkModel> links = addressSpace.findSendingLinks(containerIdPattern, linkNamePattern);
- for (LinkModel link : links)
- {
- if (link instanceof Link_1_0)
+ final Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "\\|?\\d*$");
+
+ addressSpace.visitSendingLinks((LinkRegistryModel.LinkVisitor<Link_1_0<Source, Target>>) link -> {
+ if (containerIdPattern.matcher(link.getRemoteContainerId()).matches()
+ && linkNamePattern.matcher(link.getName()).matches())
{
- ((Link_1_0) link).linkClosed();
+ link.linkClosed();
}
- }
+ return false;
+ });
}
}
catch (AccessControlException e)
@@ -918,23 +918,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
&& ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach()
&& sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
&& sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
- && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
+ && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)
+ && !getLinkName().endsWith("|global"))
{
// For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around.
// However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription.
- if (!getLinkName().endsWith("|global"))
- {
- getLink().linkClosed();
- }
- else
- {
- Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
- final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
- if (links.size() > 1)
- {
- getLink().linkClosed();
- }
- }
+ getLink().linkClosed();
}
super.detach(error, close);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c862a92b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 4fc2508..379ec5e 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -241,13 +241,13 @@ public class ManagementAddressSpace implements NamedAddressSpace
@Override
public <T extends LinkModel> T getSendingLink(final String remoteContainerId, final String linkName)
{
- return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+ return (T)_linkRegistry.getSendingLink(remoteContainerId, linkName);
}
@Override
public <T extends LinkModel> T getReceivingLink(final String remoteContainerId, final String linkName)
{
- return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+ return (T)_linkRegistry.getReceivingLink(remoteContainerId, linkName);
}
@Override
@@ -258,6 +258,12 @@ public class ManagementAddressSpace implements NamedAddressSpace
}
@Override
+ public <T extends LinkModel> void visitSendingLinks(final LinkRegistryModel.LinkVisitor<T> visitor)
+ {
+ _linkRegistry.visitSendingLinks(visitor);
+ }
+
+ @Override
public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
_broker.authorise(Operation.PERFORM_ACTION("manage"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org