You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/01 10:34:31 UTC
qpid-broker-j git commit: QPID-7992: [Broker-J] Add auxiliary
operations to purge and dump link registry on VirtualHost
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 73c494fbd -> 0ca7b8af8
QPID-7992: [Broker-J] Add auxiliary operations to purge and dump link registry on VirtualHost
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0ca7b8af
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0ca7b8af
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0ca7b8af
Branch: refs/heads/master
Commit: 0ca7b8af870dabfcf23c259ad70391a3edd6b61d
Parents: 73c494f
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Oct 31 13:11:03 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 1 10:32:13 2017 +0000
----------------------------------------------------------------------
.../server/virtualhost/AbstractVirtualHost.java | 126 +++++++++++++++++++
.../server/virtualhost/LinkRegistryModel.java | 4 +
.../virtualhost/QueueManagingVirtualHost.java | 10 ++
.../server/protocol/v1_0/LinkRegistryImpl.java | 98 +++++++++++++++
.../server/protocol/v1_0/LinkRegistryTest.java | 83 ++++++++++++
5 files changed, 321 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 7263ce5..c2f53cc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2877,6 +2877,132 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ @Override
+ public Object dumpLinkRegistry()
+ {
+ return doSync(doOnConfigThread(new Task<ListenableFuture<Object>, IOException>()
+ {
+ @Override
+ public ListenableFuture<Object> execute() throws IOException
+ {
+ Object dump;
+ if (getState() == State.STOPPED)
+ {
+ _messageStore.openMessageStore(AbstractVirtualHost.this);
+ try
+ {
+ _linkRegistry.open();
+ try
+ {
+ dump = _linkRegistry.dump();
+ }
+ finally
+ {
+ _linkRegistry.close();
+ }
+ }
+ finally
+ {
+ _messageStore.closeMessageStore();
+ }
+ }
+ else if (getState() == State.ACTIVE)
+ {
+ dump = _linkRegistry.dump();
+ }
+ else
+ {
+ throw new IllegalStateException("The dumpLinkRegistry operation can only be called when the virtual host is active or stopped.");
+ }
+ return Futures.immediateFuture(dump);
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractVirtualHost.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "dumpLinkRegistry";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return null;
+ }
+ }));
+ }
+
+ @Override
+ public void purgeLinkRegistry(final String containerIdPatternString, final String role, final String linkNamePatternString)
+ {
+ doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
+ {
+ @Override
+ public ListenableFuture<Void> execute() throws IOException
+ {
+ if (getState() != State.STOPPED)
+ {
+ throw new IllegalArgumentException(
+ "The purgeLinkRegistry operation can only be called when the virtual host is stopped.");
+ }
+ Pattern containerIdPattern = Pattern.compile(containerIdPatternString);
+ Pattern linkNamePattern = Pattern.compile(linkNamePatternString);
+
+ _messageStore.openMessageStore(AbstractVirtualHost.this);
+ try
+ {
+ _linkRegistry.open();
+ try
+ {
+ if ("SENDER".equals(role) || "BOTH".equals(role))
+ {
+ _linkRegistry.purgeSendingLinks(containerIdPattern, linkNamePattern);
+ }
+ if ("RECEIVER".equals(role) || "BOTH".equals(role))
+ {
+ _linkRegistry.purgeReceivingLinks(containerIdPattern, linkNamePattern);
+ }
+ return Futures.immediateFuture(null);
+ }
+ finally
+ {
+ _linkRegistry.close();
+ }
+ }
+ finally
+ {
+ _messageStore.closeMessageStore();
+ }
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractVirtualHost.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "purgeLinkRegistry";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return String.format("containerIdPattern='%s',role='%s',linkNamePattern='%s'",
+ containerIdPatternString,
+ role,
+ linkNamePatternString);
+ }
+ }));
+ }
+
private boolean hasDifferentBindings(final Exchange<?> exchange,
final Queue queue,
final Map<String, Map<String,Object>> bindings)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
index 51402b8..6939510 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
@@ -31,8 +31,12 @@ public interface LinkRegistryModel
<T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
<T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
final Pattern linkNamePattern);
+ void purgeSendingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
+ void purgeReceivingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
void open();
void close();
void delete();
+
+ Object dump();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 8ca90f9..ba85821 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -291,6 +291,16 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
skipAclCheck = true)
SocketConnectionMetaData getConnectionMetaData();
+ @SuppressWarnings("unused")
+ @ManagedOperation(nonModifying = true, description = "Dumps link registry", changesConfiguredObjectState = false)
+ Object dumpLinkRegistry();
+
+ @SuppressWarnings("unused")
+ @ManagedOperation(description = "Removes links with the given name and containerId pattern from the link registry.", changesConfiguredObjectState = false)
+ void purgeLinkRegistry(@Param(name = "containerIdPattern", description = "Regular Expression to match the remote container id.", defaultValue = ".*") String containerIdPattern,
+ @Param(name = "role", description = "whether to remove only sending links (\"SENDER\"), receiving links (\"RECEIVER\") or both (\"BOTH\")", validValues = {"SENDER", "RECEIVER", "BOTH"}, defaultValue = "BOTH") String role,
+ @Param(name = "linkNamePattern", description = "Regular Expression to match the link names to be removed.", defaultValue = ".*") String linkNamePattern);
+
Queue<?> getSubscriptionQueue(final String exchangeName,
final Map<String, Object> attributes,
final Map<String, Map<String, Object>> bindings);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
index dd233c0..a48130d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v1_0;
import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -125,6 +127,18 @@ public class LinkRegistryImpl implements LinkRegistry
}
@Override
+ public void purgeSendingLinks(final Pattern containerIdPattern, final Pattern linkNamePattern)
+ {
+ purgeLinks(_sendingLinkRegistry, containerIdPattern, linkNamePattern);
+ }
+
+ @Override
+ public void purgeReceivingLinks(final Pattern containerIdPattern, final Pattern linkNamePattern)
+ {
+ purgeLinks(_receivingLinkRegistry, containerIdPattern, linkNamePattern);
+ }
+
+ @Override
public void open()
{
Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
@@ -170,6 +184,16 @@ public class LinkRegistryImpl implements LinkRegistry
return link;
}
+ private void purgeLinks(final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry,
+ final Pattern containerIdPattern, final Pattern linkNamePattern)
+ {
+ linkRegistry.entrySet()
+ .stream()
+ .filter(e -> containerIdPattern.matcher(e.getKey().getRemoteContainerId()).matches()
+ && linkNamePattern.matcher(e.getKey().getLinkName()).matches())
+ .forEach(e -> e.getValue().linkClosed());
+ }
+
private ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> getLinkRegistry(final Role role)
{
ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry;
@@ -189,4 +213,78 @@ public class LinkRegistryImpl implements LinkRegistry
return linkRegistry;
}
+
+ @Override
+ public LinkRegistryDump dump()
+ {
+ LinkRegistryDump dump = new LinkRegistryDump();
+ dumpRegistry(_sendingLinkRegistry, dump);
+ dumpRegistry(_receivingLinkRegistry, dump);
+ return dump;
+ }
+
+ private void dumpRegistry(ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> registry,
+ LinkRegistryDump dump)
+ {
+ for (Map.Entry<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> entry : registry.entrySet())
+ {
+ LinkKey linkKey = entry.getKey();
+ LinkRegistryDump.ContainerDump containerLinks =
+ dump._containers.computeIfAbsent(linkKey.getRemoteContainerId(), k -> new LinkRegistryDump.ContainerDump());
+
+ LinkRegistryDump.ContainerDump.LinkDump linkDump = new LinkRegistryDump.ContainerDump.LinkDump();
+ linkDump._source = String.valueOf(entry.getValue().getSource());
+ linkDump._target = String.valueOf(entry.getValue().getTarget());
+ if (linkKey.getRole().equals(Role.SENDER))
+ {
+ containerLinks._sendingLinks.put(linkKey.getLinkName(), linkDump);
+ }
+ else
+ {
+ containerLinks._receivingLinks.put(linkKey.getLinkName(), linkDump);
+ }
+ }
+ }
+
+ public static class LinkRegistryDump
+ {
+ public static class ContainerDump
+ {
+ public static class LinkDump
+ {
+ private String _source;
+ private String _target;
+
+ public String getSource()
+ {
+ return _source;
+ }
+
+ public String getTarget()
+ {
+ return _target;
+ }
+ }
+
+ private Map<String, LinkDump> _sendingLinks = new LinkedHashMap<>();
+ private Map<String, LinkDump> _receivingLinks = new LinkedHashMap<>();
+
+ public Map<String, LinkDump> getSendingLinks()
+ {
+ return Collections.unmodifiableMap(_sendingLinks);
+ }
+
+ public Map<String, LinkDump> getReceivingLinks()
+ {
+ return Collections.unmodifiableMap(_receivingLinks);
+ }
+ }
+
+ private Map<String, ContainerDump> _containers = new LinkedHashMap<>();
+
+ public Map<String, ContainerDump> getContainers()
+ {
+ return Collections.unmodifiableMap(_containers);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
index e61e124..b5832c4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
@@ -21,12 +21,15 @@ package org.apache.qpid.server.protocol.v1_0;
import static org.mockito.Mockito.mock;
+import java.util.regex.Pattern;
+
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
public class LinkRegistryTest extends QpidTestCase
{
+ public static final Pattern ANY = Pattern.compile(".*");
private LinkRegistryImpl _linkRegistry;
@Override
@@ -58,4 +61,84 @@ public class LinkRegistryTest extends QpidTestCase
assertNotNull("LinkRegistryModel#getReceivingLink should always return an object", link2);
assertSame("Two calls to LinkRegistryModel#getReceivingLink should return the same object", link, link2);
}
+
+ public void testPurgeSendingLinksFromRegistryWithEmptyRegistry() throws Exception
+ {
+ _linkRegistry.purgeSendingLinks(ANY, ANY);
+ }
+
+ public void testPurgeSendingLinksExactMatch() throws Exception
+ {
+ _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeSendingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 0);
+ }
+
+ public void testPurgeSendingLinksRegEx() throws Exception
+ {
+ _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeSendingLinks(Pattern.compile("test.*Id"), Pattern.compile("testLink.*"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 0);
+ }
+
+ public void testPurgeSendingLinksNotMatchingRegEx() throws Exception
+ {
+ _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeSendingLinks(Pattern.compile("Foo.*"), Pattern.compile(".*bar"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 1);
+ }
+
+ public void testPurgeSendingLinksDoesNotRemoveReceivingLink() throws Exception
+ {
+ _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+ _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeSendingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 1);
+ assertEquals(dump.getContainers().get("testContainerId").getReceivingLinks().size(), 1);
+ assertEquals(dump.getContainers().get("testContainerId").getSendingLinks().size(), 0);
+ }
+
+ public void testPurgeReceivingLinksFromRegistryWithEmptyRegistry() throws Exception
+ {
+ _linkRegistry.purgeReceivingLinks(ANY, ANY);
+ }
+
+ public void testPurgeReceivingLinksExactMatch() throws Exception
+ {
+ _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeReceivingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 0);
+ }
+
+ public void testPurgeReceivingLinksRegEx() throws Exception
+ {
+ _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeReceivingLinks(Pattern.compile("test.*Id"), Pattern.compile("testLink.*"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 0);
+ }
+
+ public void testPurgeReceivingLinksNotMatchingRegEx() throws Exception
+ {
+ _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeReceivingLinks(Pattern.compile("Foo.*"), Pattern.compile(".*bar"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 1);
+ }
+
+ public void testPurgeReceivingLinksDoesNotRemoveSendingLink() throws Exception
+ {
+ _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+ _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+ _linkRegistry.purgeReceivingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+ LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+ assertEquals(dump.getContainers().size(), 1);
+ assertEquals(dump.getContainers().get("testContainerId").getReceivingLinks().size(), 0);
+ assertEquals(dump.getContainers().get("testContainerId").getSendingLinks().size(), 1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org