You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/31 12:18:50 UTC

qpid-broker-j git commit: QPID-7994: [Broker-J][AMQP 1.0] Allow link recovery of global durable shared subscriptions

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 319edfe69 -> 9502604d8


QPID-7994: [Broker-J][AMQP 1.0] Allow link recovery of global durable shared subscriptions


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/9502604d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/9502604d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/9502604d

Branch: refs/heads/master
Commit: 9502604d8c36eee079507da52e0e91b3c5c2e2d4
Parents: 319edfe
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Oct 30 16:26:39 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Oct 31 12:17:48 2017 +0000

----------------------------------------------------------------------
 .../qpid/server/model/NamedAddressSpace.java    |  1 +
 ...stractNonConnectionAcceptingVirtualHost.java |  7 ++++
 .../server/virtualhost/AbstractVirtualHost.java |  6 ++++
 .../server/virtualhost/LinkRegistryModel.java   |  3 ++
 .../server/protocol/v1_0/LinkRegistryImpl.java  | 11 ++++++
 .../protocol/v1_0/SendingLinkEndpoint.java      | 35 ++++++++++++++++++--
 .../management/amqp/ManagementAddressSpace.java |  6 ++++
 test-profiles/Java10Excludes                    |  3 --
 8 files changed, 67 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index 8a1785d..081ae6b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -61,6 +61,7 @@ public interface NamedAddressSpace extends Named
 
     <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
     <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+    <T extends LinkModel> Collection<T> findSendingLinks(String linkName);
 
     boolean authoriseCreateConnection(AMQPConnection<?> connection);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index ed251f1..c9f35ef 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -155,6 +155,13 @@ public abstract class AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
     }
 
     @Override
+    public <T extends LinkModel> Collection<T> findSendingLinks(final String linkName)
+    {
+        throwUnsupported();
+        return null;
+    }
+
+    @Override
     public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         return false;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index d6bc757..7eb9f99 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1724,6 +1724,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
+    public <T extends LinkModel> Collection<T> findSendingLinks(final String linkName)
+    {
+        return _linkRegistry.findSendingLinks(linkName);
+    }
+
+    @Override
     public DtxRegistry getDtxRegistry()
     {
         return _dtxRegistry;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
index 1610cd0..14dbd7e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.Collection;
+
 import org.apache.qpid.server.protocol.LinkModel;
 
 public interface LinkRegistryModel
 {
     <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
     <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+    <T extends LinkModel> Collection<T> findSendingLinks(String linkName);
 
     void open();
     void close();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
index caef52f..836418d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,6 +111,16 @@ public class LinkRegistryImpl implements LinkRegistry
     }
 
     @Override
+    public Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>> findSendingLinks(final String linkName)
+    {
+        return _sendingLinkRegistry.entrySet()
+                                   .stream()
+                                   .filter(e -> e.getKey().getLinkName().equals(linkName))
+                                   .map(Map.Entry::getValue)
+                                   .collect(Collectors.toList());
+    }
+
+    @Override
     public void open()
     {
         Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index ec6939b..743715a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -46,6 +48,8 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -355,12 +359,39 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     @Override
     protected void recoverLink(final Attach attach) throws AmqpErrorException
     {
-        if (getSource() == null)
+        Source source = getSource();
+        if (source == null && attach.getDesiredCapabilities() != null)
+        {
+            List<Symbol> capabilities = Arrays.asList(attach.getDesiredCapabilities());
+            if (capabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
+                && capabilities.contains(Session_1_0.SHARED_CAPABILITY)
+                && getLinkName().endsWith("|global"))
+            {
+                NamedAddressSpace namedAddressSpace = getSession().getConnection().getAddressSpace();
+                Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>>
+                        links = namedAddressSpace.findSendingLinks(getLinkName());
+                for (Link_1_0<? extends BaseSource, ? extends BaseTarget> link : links)
+                {
+                    if (link.getSource() != null)
+                    {
+                        BaseSource baseSource = link.getSource();
+                        if (baseSource instanceof Source)
+                        {
+                            source = ((Source) baseSource);
+                            getLink().setSource(source);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        if (source == null)
         {
             throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
         }
 
-        attach.setSource(getSource());
+        attach.setSource(source);
         receiveAttach(attach);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index ee61dcb..30ecd2a 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -250,6 +250,12 @@ public class ManagementAddressSpace implements NamedAddressSpace
     }
 
     @Override
+    public <T extends LinkModel> Collection<T> findSendingLinks(final String linkName)
+    {
+        return _linkRegistry.findSendingLinks(linkName);
+    }
+
+    @Override
     public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
     {
         _broker.authorise(Operation.PERFORM_ACTION("manage"));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index d94c72c..be5b6df 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -219,6 +219,3 @@ org.apache.qpid.server.security.acl.Amqp0xMessagingACLTest#*
 // run as system as so avoid the ACL check.
 org.apache.qpid.server.security.acl.MessagingACLTest#testCreateTemporaryQueueFailed
 
-
-// QPID-7994 : Temporary excluded due to a bug
-org.apache.qpid.systests.jms_2_0.subscription.SharedSubscriptionTest#testUnsubscribeForGlobalSharedDurableSubscription


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org