You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/01 10:34:31 UTC

qpid-broker-j git commit: QPID-7992: [Broker-J] Add auxiliary operations to purge and dump link registry on VirtualHost

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 73c494fbd -> 0ca7b8af8


QPID-7992: [Broker-J] Add auxiliary operations to purge and dump link registry on VirtualHost


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0ca7b8af
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0ca7b8af
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0ca7b8af

Branch: refs/heads/master
Commit: 0ca7b8af870dabfcf23c259ad70391a3edd6b61d
Parents: 73c494f
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Oct 31 13:11:03 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 1 10:32:13 2017 +0000

----------------------------------------------------------------------
 .../server/virtualhost/AbstractVirtualHost.java | 126 +++++++++++++++++++
 .../server/virtualhost/LinkRegistryModel.java   |   4 +
 .../virtualhost/QueueManagingVirtualHost.java   |  10 ++
 .../server/protocol/v1_0/LinkRegistryImpl.java  |  98 +++++++++++++++
 .../server/protocol/v1_0/LinkRegistryTest.java  |  83 ++++++++++++
 5 files changed, 321 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 7263ce5..c2f53cc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2877,6 +2877,132 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
     }
 
+    @Override
+    public Object dumpLinkRegistry()
+    {
+        return doSync(doOnConfigThread(new Task<ListenableFuture<Object>, IOException>()
+        {
+            @Override
+            public ListenableFuture<Object> execute() throws IOException
+            {
+                Object dump;
+                if (getState() == State.STOPPED)
+                {
+                    _messageStore.openMessageStore(AbstractVirtualHost.this);
+                    try
+                    {
+                        _linkRegistry.open();
+                        try
+                        {
+                            dump = _linkRegistry.dump();
+                        }
+                        finally
+                        {
+                            _linkRegistry.close();
+                        }
+                    }
+                    finally
+                    {
+                        _messageStore.closeMessageStore();
+                    }
+                }
+                else if (getState() == State.ACTIVE)
+                {
+                    dump = _linkRegistry.dump();
+                }
+                else
+                {
+                    throw new IllegalStateException("The dumpLinkRegistry operation can only be called when the virtual host is active or stopped.");
+                }
+                return Futures.immediateFuture(dump);
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractVirtualHost.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "dumpLinkRegistry";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return null;
+            }
+        }));
+    }
+
+    @Override
+    public void purgeLinkRegistry(final String containerIdPatternString, final String role, final String linkNamePatternString)
+    {
+        doSync(doOnConfigThread(new Task<ListenableFuture<Void>, IOException>()
+        {
+            @Override
+            public ListenableFuture<Void> execute() throws IOException
+            {
+                if (getState() != State.STOPPED)
+                {
+                    throw new IllegalArgumentException(
+                            "The purgeLinkRegistry operation can only be called when the virtual host is stopped.");
+                }
+                Pattern containerIdPattern = Pattern.compile(containerIdPatternString);
+                Pattern linkNamePattern = Pattern.compile(linkNamePatternString);
+
+                _messageStore.openMessageStore(AbstractVirtualHost.this);
+                try
+                {
+                    _linkRegistry.open();
+                    try
+                    {
+                        if ("SENDER".equals(role) || "BOTH".equals(role))
+                        {
+                            _linkRegistry.purgeSendingLinks(containerIdPattern, linkNamePattern);
+                        }
+                        if ("RECEIVER".equals(role) || "BOTH".equals(role))
+                        {
+                            _linkRegistry.purgeReceivingLinks(containerIdPattern, linkNamePattern);
+                        }
+                        return Futures.immediateFuture(null);
+                    }
+                    finally
+                    {
+                        _linkRegistry.close();
+                    }
+                }
+                finally
+                {
+                    _messageStore.closeMessageStore();
+                }
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractVirtualHost.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "purgeLinkRegistry";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return String.format("containerIdPattern='%s',role='%s',linkNamePattern='%s'",
+                                     containerIdPatternString,
+                                     role,
+                                     linkNamePatternString);
+            }
+        }));
+    }
+
     private boolean hasDifferentBindings(final Exchange<?> exchange,
                                          final Queue queue,
                                          final Map<String, Map<String,Object>> bindings)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
index 51402b8..6939510 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
@@ -31,8 +31,12 @@ public interface LinkRegistryModel
     <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
     <T extends LinkModel> Collection<T> findSendingLinks(final Pattern containerIdPattern,
                                                          final Pattern linkNamePattern);
+    void purgeSendingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
+    void purgeReceivingLinks(Pattern containerIdPattern, Pattern linkNamePattern);
 
     void open();
     void close();
     void delete();
+
+    Object dump();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 8ca90f9..ba85821 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -291,6 +291,16 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
             skipAclCheck = true)
     SocketConnectionMetaData getConnectionMetaData();
 
+    @SuppressWarnings("unused")
+    @ManagedOperation(nonModifying = true, description = "Dumps link registry", changesConfiguredObjectState = false)
+    Object dumpLinkRegistry();
+
+    @SuppressWarnings("unused")
+    @ManagedOperation(description = "Removes links with the given name and containerId pattern from the link registry.", changesConfiguredObjectState = false)
+    void purgeLinkRegistry(@Param(name = "containerIdPattern", description = "Regular Expression to match the remote container id.", defaultValue = ".*") String containerIdPattern,
+                           @Param(name = "role", description = "whether to remove only sending links (\"SENDER\"), receiving links (\"RECEIVER\") or both (\"BOTH\")", validValues = {"SENDER", "RECEIVER", "BOTH"}, defaultValue = "BOTH") String role,
+                           @Param(name = "linkNamePattern", description = "Regular Expression to match the link names to be removed.", defaultValue = ".*") String linkNamePattern);
+
     Queue<?> getSubscriptionQueue(final String exchangeName,
                                   final Map<String, Object> attributes,
                                   final Map<String, Map<String, Object>> bindings);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
index dd233c0..a48130d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -125,6 +127,18 @@ public class LinkRegistryImpl implements LinkRegistry
     }
 
     @Override
+    public void purgeSendingLinks(final Pattern containerIdPattern, final Pattern linkNamePattern)
+    {
+        purgeLinks(_sendingLinkRegistry, containerIdPattern, linkNamePattern);
+    }
+
+    @Override
+    public void purgeReceivingLinks(final Pattern containerIdPattern, final Pattern linkNamePattern)
+    {
+        purgeLinks(_receivingLinkRegistry, containerIdPattern, linkNamePattern);
+    }
+
+    @Override
     public void open()
     {
         Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
@@ -170,6 +184,16 @@ public class LinkRegistryImpl implements LinkRegistry
         return link;
     }
 
+    private void purgeLinks(final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry,
+                            final Pattern containerIdPattern, final Pattern linkNamePattern)
+    {
+        linkRegistry.entrySet()
+                    .stream()
+                    .filter(e -> containerIdPattern.matcher(e.getKey().getRemoteContainerId()).matches()
+                                 && linkNamePattern.matcher(e.getKey().getLinkName()).matches())
+                    .forEach(e -> e.getValue().linkClosed());
+    }
+
     private ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> getLinkRegistry(final Role role)
     {
         ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry;
@@ -189,4 +213,78 @@ public class LinkRegistryImpl implements LinkRegistry
 
         return linkRegistry;
     }
+
+    @Override
+    public LinkRegistryDump dump()
+    {
+        LinkRegistryDump dump = new LinkRegistryDump();
+        dumpRegistry(_sendingLinkRegistry, dump);
+        dumpRegistry(_receivingLinkRegistry, dump);
+        return dump;
+    }
+
+    private void dumpRegistry(ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> registry,
+                              LinkRegistryDump dump)
+    {
+        for (Map.Entry<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> entry : registry.entrySet())
+        {
+            LinkKey linkKey = entry.getKey();
+            LinkRegistryDump.ContainerDump containerLinks =
+                    dump._containers.computeIfAbsent(linkKey.getRemoteContainerId(), k -> new LinkRegistryDump.ContainerDump());
+
+            LinkRegistryDump.ContainerDump.LinkDump linkDump = new LinkRegistryDump.ContainerDump.LinkDump();
+            linkDump._source = String.valueOf(entry.getValue().getSource());
+            linkDump._target = String.valueOf(entry.getValue().getTarget());
+            if (linkKey.getRole().equals(Role.SENDER))
+            {
+                containerLinks._sendingLinks.put(linkKey.getLinkName(), linkDump);
+            }
+            else
+            {
+                containerLinks._receivingLinks.put(linkKey.getLinkName(), linkDump);
+            }
+        }
+    }
+
+    public static class LinkRegistryDump
+    {
+        public static class ContainerDump
+        {
+            public static class LinkDump
+            {
+                private String _source;
+                private String _target;
+
+                public String getSource()
+                {
+                    return _source;
+                }
+
+                public String getTarget()
+                {
+                    return _target;
+                }
+            }
+
+            private Map<String, LinkDump> _sendingLinks = new LinkedHashMap<>();
+            private Map<String, LinkDump> _receivingLinks = new LinkedHashMap<>();
+
+            public Map<String, LinkDump> getSendingLinks()
+            {
+                return Collections.unmodifiableMap(_sendingLinks);
+            }
+
+            public Map<String, LinkDump> getReceivingLinks()
+            {
+                return Collections.unmodifiableMap(_receivingLinks);
+            }
+        }
+
+        private Map<String, ContainerDump> _containers = new LinkedHashMap<>();
+
+        public Map<String, ContainerDump> getContainers()
+        {
+            return Collections.unmodifiableMap(_containers);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0ca7b8af/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
index e61e124..b5832c4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
@@ -21,12 +21,15 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import static org.mockito.Mockito.mock;
 
+import java.util.regex.Pattern;
+
 import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class LinkRegistryTest extends QpidTestCase
 {
+    public static final Pattern ANY = Pattern.compile(".*");
     private LinkRegistryImpl _linkRegistry;
 
     @Override
@@ -58,4 +61,84 @@ public class LinkRegistryTest extends QpidTestCase
         assertNotNull("LinkRegistryModel#getReceivingLink should always return an object", link2);
         assertSame("Two calls to LinkRegistryModel#getReceivingLink should return the same object", link, link2);
     }
+
+    public void testPurgeSendingLinksFromRegistryWithEmptyRegistry() throws Exception
+    {
+        _linkRegistry.purgeSendingLinks(ANY, ANY);
+    }
+
+    public void testPurgeSendingLinksExactMatch() throws Exception
+    {
+        _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeSendingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 0);
+    }
+
+    public void testPurgeSendingLinksRegEx() throws Exception
+    {
+        _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeSendingLinks(Pattern.compile("test.*Id"), Pattern.compile("testLink.*"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 0);
+    }
+
+    public void testPurgeSendingLinksNotMatchingRegEx() throws Exception
+    {
+        _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeSendingLinks(Pattern.compile("Foo.*"), Pattern.compile(".*bar"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 1);
+    }
+
+    public void testPurgeSendingLinksDoesNotRemoveReceivingLink() throws Exception
+    {
+        _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+        _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeSendingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 1);
+        assertEquals(dump.getContainers().get("testContainerId").getReceivingLinks().size(), 1);
+        assertEquals(dump.getContainers().get("testContainerId").getSendingLinks().size(), 0);
+    }
+
+    public void testPurgeReceivingLinksFromRegistryWithEmptyRegistry() throws Exception
+    {
+        _linkRegistry.purgeReceivingLinks(ANY, ANY);
+    }
+
+    public void testPurgeReceivingLinksExactMatch() throws Exception
+    {
+        _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeReceivingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 0);
+    }
+
+    public void testPurgeReceivingLinksRegEx() throws Exception
+    {
+        _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeReceivingLinks(Pattern.compile("test.*Id"), Pattern.compile("testLink.*"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 0);
+    }
+
+    public void testPurgeReceivingLinksNotMatchingRegEx() throws Exception
+    {
+        _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeReceivingLinks(Pattern.compile("Foo.*"), Pattern.compile(".*bar"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 1);
+    }
+
+    public void testPurgeReceivingLinksDoesNotRemoveSendingLink() throws Exception
+    {
+        _linkRegistry.getSendingLink("testContainerId", "testLinkName");
+        _linkRegistry.getReceivingLink("testContainerId", "testLinkName");
+        _linkRegistry.purgeReceivingLinks(Pattern.compile("testContainerId"), Pattern.compile("testLinkName"));
+        LinkRegistryImpl.LinkRegistryDump dump = _linkRegistry.dump();
+        assertEquals(dump.getContainers().size(), 1);
+        assertEquals(dump.getContainers().get("testContainerId").getReceivingLinks().size(), 0);
+        assertEquals(dump.getContainers().get("testContainerId").getSendingLinks().size(), 1);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org