You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/31 12:18:50 UTC
qpid-broker-j git commit: QPID-7994: [Broker-J][AMQP 1.0] Allow link
recovery of global durable shared subscriptions
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 319edfe69 -> 9502604d8
QPID-7994: [Broker-J][AMQP 1.0] Allow link recovery of global durable shared subscriptions
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/9502604d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/9502604d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/9502604d
Branch: refs/heads/master
Commit: 9502604d8c36eee079507da52e0e91b3c5c2e2d4
Parents: 319edfe
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Oct 30 16:26:39 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Oct 31 12:17:48 2017 +0000
----------------------------------------------------------------------
.../qpid/server/model/NamedAddressSpace.java | 1 +
...stractNonConnectionAcceptingVirtualHost.java | 7 ++++
.../server/virtualhost/AbstractVirtualHost.java | 6 ++++
.../server/virtualhost/LinkRegistryModel.java | 3 ++
.../server/protocol/v1_0/LinkRegistryImpl.java | 11 ++++++
.../protocol/v1_0/SendingLinkEndpoint.java | 35 ++++++++++++++++++--
.../management/amqp/ManagementAddressSpace.java | 6 ++++
test-profiles/Java10Excludes | 3 --
8 files changed, 67 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index 8a1785d..081ae6b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -61,6 +61,7 @@ public interface NamedAddressSpace extends Named
<T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
<T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+ <T extends LinkModel> Collection<T> findSendingLinks(String linkName);
boolean authoriseCreateConnection(AMQPConnection<?> connection);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index ed251f1..c9f35ef 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -155,6 +155,13 @@ public abstract class AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
}
@Override
+ public <T extends LinkModel> Collection<T> findSendingLinks(final String linkName)
+ {
+ throwUnsupported();
+ return null;
+ }
+
+ @Override
public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
return false;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index d6bc757..7eb9f99 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1724,6 +1724,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public <T extends LinkModel> Collection<T> findSendingLinks(final String linkName)
+ {
+ return _linkRegistry.findSendingLinks(linkName);
+ }
+
+ @Override
public DtxRegistry getDtxRegistry()
{
return _dtxRegistry;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
index 1610cd0..14dbd7e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collection;
+
import org.apache.qpid.server.protocol.LinkModel;
public interface LinkRegistryModel
{
<T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
<T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+ <T extends LinkModel> Collection<T> findSendingLinks(String linkName);
void open();
void close();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
index caef52f..836418d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v1_0;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,6 +111,16 @@ public class LinkRegistryImpl implements LinkRegistry
}
@Override
+ public Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>> findSendingLinks(final String linkName)
+ {
+ return _sendingLinkRegistry.entrySet()
+ .stream()
+ .filter(e -> e.getKey().getLinkName().equals(linkName))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
+ @Override
public void open()
{
Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index ec6939b..743715a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -46,6 +48,8 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -355,12 +359,39 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
@Override
protected void recoverLink(final Attach attach) throws AmqpErrorException
{
- if (getSource() == null)
+ Source source = getSource();
+ if (source == null && attach.getDesiredCapabilities() != null)
+ {
+ List<Symbol> capabilities = Arrays.asList(attach.getDesiredCapabilities());
+ if (capabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
+ && capabilities.contains(Session_1_0.SHARED_CAPABILITY)
+ && getLinkName().endsWith("|global"))
+ {
+ NamedAddressSpace namedAddressSpace = getSession().getConnection().getAddressSpace();
+ Collection<Link_1_0<? extends BaseSource, ? extends BaseTarget>>
+ links = namedAddressSpace.findSendingLinks(getLinkName());
+ for (Link_1_0<? extends BaseSource, ? extends BaseTarget> link : links)
+ {
+ if (link.getSource() != null)
+ {
+ BaseSource baseSource = link.getSource();
+ if (baseSource instanceof Source)
+ {
+ source = ((Source) baseSource);
+ getLink().setSource(source);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (source == null)
{
throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
}
- attach.setSource(getSource());
+ attach.setSource(source);
receiveAttach(attach);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index ee61dcb..30ecd2a 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -250,6 +250,12 @@ public class ManagementAddressSpace implements NamedAddressSpace
}
@Override
+ public <T extends LinkModel> Collection<T> findSendingLinks(final String linkName)
+ {
+ return _linkRegistry.findSendingLinks(linkName);
+ }
+
+ @Override
public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
_broker.authorise(Operation.PERFORM_ACTION("manage"));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9502604d/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index d94c72c..be5b6df 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -219,6 +219,3 @@ org.apache.qpid.server.security.acl.Amqp0xMessagingACLTest#*
// run as system as so avoid the ACL check.
org.apache.qpid.server.security.acl.MessagingACLTest#testCreateTemporaryQueueFailed
-
-// QPID-7994 : Temporary excluded due to a bug
-org.apache.qpid.systests.jms_2_0.subscription.SharedSubscriptionTest#testUnsubscribeForGlobalSharedDurableSubscription
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org