You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/11/07 11:27:59 UTC
[1/2] qpid-broker-j git commit: QPID-7998: [Broker-J][AMQP 1.0] By
default, allow global shared subscriptions but discard their links on detach
Repository: qpid-broker-j
Updated Branches:
refs/heads/7.0.x a2dec7573 -> 0b991f4a9
QPID-7998: [Broker-J][AMQP 1.0] By default, allow global shared subscriptions but discard their links on detach
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3651583a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3651583a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3651583a
Branch: refs/heads/7.0.x
Commit: 3651583a9474a7d7997260897a95ec1432b46956
Parents: a2dec75
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Nov 6 13:58:45 2017 +0000
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue Nov 7 10:41:00 2017 +0000
----------------------------------------------------------------------
.../server/virtualhost/AbstractVirtualHost.java | 8 +++---
.../virtualhost/QueueManagingVirtualHost.java | 12 ++++-----
.../v1_0/ExchangeSendingDestination.java | 8 ------
.../protocol/v1_0/SendingLinkEndpoint.java | 28 +++++++++++++++++---
.../server/protocol/v1_0/Session_1_0Test.java | 1 -
.../subscription/SharedSubscriptionTest.java | 3 ---
6 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 9e2889e..e4ca989 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -274,7 +274,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private Collection<VirtualHostLogger> _virtualHostLoggersToClose;
private PreferenceStore _preferenceStore;
private long _flowToDiskCheckPeriod;
- private volatile boolean _isGlobalSharedDurableSubscriptionDisabled;
+ private volatile boolean _isDiscardGlobalSharedSubscriptionLinksOnDetach;
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -597,7 +597,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
_flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
- _isGlobalSharedDurableSubscriptionDisabled = getContextValue(Boolean.class, GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED);
+ _isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH);
QpidServiceLoader serviceLoader = new QpidServiceLoader();
for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
@@ -2188,9 +2188,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- public boolean isGlobalSharedDurableSubscriptionDisabled()
+ public boolean isDiscardGlobalSharedSubscriptionLinksOnDetach()
{
- return _isGlobalSharedDurableSubscriptionDisabled;
+ return _isDiscardGlobalSharedSubscriptionLinksOnDetach;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index c4418c0..b30373a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -96,12 +96,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
@ManagedContextDefault(name = VIRTUALHOST_STATISTICS_REPORING_PERIOD)
int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
-
- String GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = "qpid.feature.disabled:globalSharedDurableSubscription";
+ String DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = "qpid.jms.discardGlobalSharedSubscriptionLinksOnDetach";
@SuppressWarnings("unused")
- @ManagedContextDefault(name = GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
- description = "Flag to disable global shared durable subscriptions.")
- boolean DEFAULT_GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = true;
+ @ManagedContextDefault(name = DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH,
+ description = "If true AMQP 1.0 links of global shared subscriptions are discarded when the"
+ + " link detaches. This is to avoid leaking links with the Qpid JMS client.")
+ boolean DEFAULT_DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = true;
@ManagedAttribute( defaultValue = "${" + VIRTUALHOST_STATISTICS_REPORING_PERIOD + "}", description = "Period (in seconds) of the statistic report.")
int getStatisticsReportingPeriod();
@@ -148,7 +148,7 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
long getFlowToDiskCheckPeriod();
@DerivedAttribute( description = "Indicates whether global shared durable subscriptions are disabled")
- boolean isGlobalSharedDurableSubscriptionDisabled();
+ boolean isDiscardGlobalSharedSubscriptionLinksOnDetach();
String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
index a4a1de3..8f84530 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -144,7 +144,6 @@ public class ExchangeSendingDestination extends StandardSendingDestination
{
boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
- boolean isGlobal = hasCapability(source.getCapabilities(), GLOBAL_CAPABILITY);
QueueManagingVirtualHost virtualHost;
if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
@@ -157,13 +156,6 @@ public class ExchangeSendingDestination extends StandardSendingDestination
"Address space of unexpected type"));
}
- if (isDurable && isShared && isGlobal && virtualHost.isGlobalSharedDurableSubscriptionDisabled())
- {
- throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED,
- "Support for global shared durable subscription is disabled."));
- }
-
-
Queue<?> queue;
final Map<String, Object> attributes = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 98f68f8..9c783b8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -797,14 +797,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
Source source = getSource();
TerminusExpiryPolicy expiryPolicy = source.getExpiryPolicy();
+ NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
+ List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());
+
if (close
|| TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
|| ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
|| (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
-
Error closingError = null;
- NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
if (getDestination() instanceof ExchangeSendingDestination
&& addressSpace instanceof QueueManagingVirtualHost)
{
@@ -813,7 +814,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue(
((ExchangeSendingDestination) getDestination()).getQueue().getName());
- List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());
TerminusDurability sourceDurability = source.getDurable();
if (sourceDurability != null
@@ -858,6 +858,28 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
LOGGER.warn("Unexpected error on detaching endpoint {}: {}", getLinkName(), error);
}
}
+ else if (addressSpace instanceof QueueManagingVirtualHost
+ && ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach()
+ && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
+ && sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
+ && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
+ {
+ // For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around.
+ // However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription.
+ if (!getLinkName().endsWith("|global"))
+ {
+ getLink().linkClosed();
+ }
+ else
+ {
+ Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
+ final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
+ if (links.size() > 1)
+ {
+ getLink().linkClosed();
+ }
+ }
+ }
super.detach(error, close);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index e466ccf..b5ba0aa 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -106,7 +106,6 @@ public class Session_1_0Test extends QpidTestCase
super.setUp();
Map<String, Object> virtualHostAttributes = new HashMap<>();
virtualHostAttributes.put(QueueManagingVirtualHost.NAME, "testVH");
- virtualHostAttributes.put(QueueManagingVirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, false));
virtualHostAttributes.put(QueueManagingVirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
_virtualHost = BrokerTestHelper.createVirtualHost(virtualHostAttributes);
_taskExecutor = new CurrentThreadTaskExecutor();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index d89f14f..8842ebe 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -46,9 +46,6 @@ public class SharedSubscriptionTest extends QpidBrokerTestCase
{
TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
brokerConfiguration.addHttpManagementConfiguration();
- brokerConfiguration.setBrokerAttribute("context",
- Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
- false));
super.setUp();
_restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-7998: [Broker-J][AMQP 1.0] When
removing global shared subscription links on detach synchronize on the
VirtualHost
Posted by lq...@apache.org.
QPID-7998: [Broker-J][AMQP 1.0] When removing global shared subscription links on detach synchronize on the VirtualHost
This is done to avoid a race which would lead to
* a client-side 'not-found' error on unsubscribe and
* a leaked subscription queue on the broker which might accumulate messages.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0b991f4a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0b991f4a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0b991f4a
Branch: refs/heads/7.0.x
Commit: 0b991f4a9ee462fb7e721df9ebdc683f38ebd858
Parents: 3651583
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue Nov 7 11:27:36 2017 +0000
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue Nov 7 11:27:36 2017 +0000
----------------------------------------------------------------------
.../qpid/server/protocol/v1_0/SendingLinkEndpoint.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0b991f4a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 9c783b8..c4b4dd6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -873,10 +873,17 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
else
{
Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
- final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
- if (links.size() > 1)
+ // TODO: locking on the addressSpace is not nice! Without we have a race which could mean that the last
+ // two links from different connections detach concurrently and both get removed from the registry.
+ // If the client then tries to do a null-source lookup (for unsubscribe) the attach would be rejected
+ // with 'not-found' and the subscription queue would leak potentially accumulating messages.
+ synchronized (addressSpace)
{
- getLink().linkClosed();
+ final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
+ if (links.size() > 1)
+ {
+ getLink().linkClosed();
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org