You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/11/07 11:27:59 UTC

[1/2] qpid-broker-j git commit: QPID-7998: [Broker-J][AMQP 1.0] By default, allow global shared subscriptions but discard their links on detach

Repository: qpid-broker-j
Updated Branches:
  refs/heads/7.0.x a2dec7573 -> 0b991f4a9


QPID-7998: [Broker-J][AMQP 1.0] By default, allow global shared subscriptions but discard their links on detach


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3651583a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3651583a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3651583a

Branch: refs/heads/7.0.x
Commit: 3651583a9474a7d7997260897a95ec1432b46956
Parents: a2dec75
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Nov 6 13:58:45 2017 +0000
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue Nov 7 10:41:00 2017 +0000

----------------------------------------------------------------------
 .../server/virtualhost/AbstractVirtualHost.java |  8 +++---
 .../virtualhost/QueueManagingVirtualHost.java   | 12 ++++-----
 .../v1_0/ExchangeSendingDestination.java        |  8 ------
 .../protocol/v1_0/SendingLinkEndpoint.java      | 28 +++++++++++++++++---
 .../server/protocol/v1_0/Session_1_0Test.java   |  1 -
 .../subscription/SharedSubscriptionTest.java    |  3 ---
 6 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 9e2889e..e4ca989 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -274,7 +274,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     private Collection<VirtualHostLogger> _virtualHostLoggersToClose;
     private PreferenceStore _preferenceStore;
     private long _flowToDiskCheckPeriod;
-    private volatile boolean _isGlobalSharedDurableSubscriptionDisabled;
+    private volatile boolean _isDiscardGlobalSharedSubscriptionLinksOnDetach;
 
     public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
     {
@@ -597,7 +597,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
         _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
         _flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
-        _isGlobalSharedDurableSubscriptionDisabled = getContextValue(Boolean.class, GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED);
+        _isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH);
 
         QpidServiceLoader serviceLoader = new QpidServiceLoader();
         for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
@@ -2188,9 +2188,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
-    public boolean isGlobalSharedDurableSubscriptionDisabled()
+    public boolean isDiscardGlobalSharedSubscriptionLinksOnDetach()
     {
-        return _isGlobalSharedDurableSubscriptionDisabled;
+        return _isDiscardGlobalSharedSubscriptionLinksOnDetach;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index c4418c0..b30373a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -96,12 +96,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
     @ManagedContextDefault(name = VIRTUALHOST_STATISTICS_REPORING_PERIOD)
     int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
 
-
-    String GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = "qpid.feature.disabled:globalSharedDurableSubscription";
+    String DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = "qpid.jms.discardGlobalSharedSubscriptionLinksOnDetach";
     @SuppressWarnings("unused")
-    @ManagedContextDefault(name = GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
-                           description = "Flag to disable global shared durable subscriptions.")
-    boolean DEFAULT_GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = true;
+    @ManagedContextDefault(name = DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH,
+                           description = "If true AMQP 1.0 links of global shared subscriptions are discarded when the"
+                                         + " link detaches. This is to avoid leaking links with the Qpid JMS client.")
+    boolean DEFAULT_DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = true;
 
     @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_STATISTICS_REPORING_PERIOD + "}", description = "Period (in seconds) of the statistic report.")
     int getStatisticsReportingPeriod();
@@ -148,7 +148,7 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
     long getFlowToDiskCheckPeriod();
 
     @DerivedAttribute( description = "Indicates whether global shared durable subscriptions are disabled")
-    boolean isGlobalSharedDurableSubscriptionDisabled();
+    boolean isDiscardGlobalSharedSubscriptionLinksOnDetach();
 
     String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
     @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
index a4a1de3..8f84530 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -144,7 +144,6 @@ public class ExchangeSendingDestination extends StandardSendingDestination
     {
         boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
         boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
-        boolean isGlobal = hasCapability(source.getCapabilities(), GLOBAL_CAPABILITY);
 
         QueueManagingVirtualHost virtualHost;
         if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
@@ -157,13 +156,6 @@ public class ExchangeSendingDestination extends StandardSendingDestination
                                                    "Address space of unexpected type"));
         }
 
-        if (isDurable && isShared && isGlobal && virtualHost.isGlobalSharedDurableSubscriptionDisabled())
-        {
-            throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED,
-                                                   "Support for global shared durable subscription is disabled."));
-        }
-
-
         Queue<?> queue;
         final Map<String, Object> attributes = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 98f68f8..9c783b8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -797,14 +797,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 
         Source source = getSource();
         TerminusExpiryPolicy expiryPolicy = source.getExpiryPolicy();
+        NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
+        List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());
+
         if (close
             || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
             || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
             || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
-
             Error closingError = null;
-            NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
             if (getDestination() instanceof ExchangeSendingDestination
                 && addressSpace instanceof QueueManagingVirtualHost)
             {
@@ -813,7 +814,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                 {
                     ((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue(
                             ((ExchangeSendingDestination) getDestination()).getQueue().getName());
-                    List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());
 
                     TerminusDurability sourceDurability = source.getDurable();
                     if (sourceDurability != null
@@ -858,6 +858,28 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                 LOGGER.warn("Unexpected error on detaching endpoint {}: {}", getLinkName(), error);
             }
         }
+        else if (addressSpace instanceof QueueManagingVirtualHost
+                 && ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach()
+                 && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
+                 && sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
+                 && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
+        {
+            // For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around.
+            // However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription.
+            if (!getLinkName().endsWith("|global"))
+            {
+                getLink().linkClosed();
+            }
+            else
+            {
+                Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
+                final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
+                if (links.size() > 1)
+                {
+                    getLink().linkClosed();
+                }
+            }
+        }
         super.detach(error, close);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index e466ccf..b5ba0aa 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -106,7 +106,6 @@ public class Session_1_0Test extends QpidTestCase
         super.setUp();
         Map<String, Object> virtualHostAttributes = new HashMap<>();
         virtualHostAttributes.put(QueueManagingVirtualHost.NAME, "testVH");
-        virtualHostAttributes.put(QueueManagingVirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, false));
         virtualHostAttributes.put(QueueManagingVirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
         _virtualHost = BrokerTestHelper.createVirtualHost(virtualHostAttributes);
         _taskExecutor = new CurrentThreadTaskExecutor();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3651583a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index d89f14f..8842ebe 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -46,9 +46,6 @@ public class SharedSubscriptionTest extends QpidBrokerTestCase
     {
         TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
         brokerConfiguration.addHttpManagementConfiguration();
-        brokerConfiguration.setBrokerAttribute("context",
-                                               Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
-                                                                        false));
         super.setUp();
         _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-7998: [Broker-J][AMQP 1.0] When removing global shared subscription links on detach synchronize on the VirtualHost

Posted by lq...@apache.org.
QPID-7998: [Broker-J][AMQP 1.0] When removing global shared subscription links on detach synchronize on the VirtualHost

This is done to avoid a race which would lead to
 * a client-side 'not-found' error on unsubscribe and
 * a leaked subscription queue on the broker which might accumulate messages.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0b991f4a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0b991f4a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0b991f4a

Branch: refs/heads/7.0.x
Commit: 0b991f4a9ee462fb7e721df9ebdc683f38ebd858
Parents: 3651583
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue Nov 7 11:27:36 2017 +0000
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue Nov 7 11:27:36 2017 +0000

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/SendingLinkEndpoint.java | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0b991f4a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 9c783b8..c4b4dd6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -873,10 +873,17 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
             else
             {
                 Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$");
-                final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
-                if (links.size() > 1)
+                // TODO: locking on the addressSpace is not nice! Without we have a race which could mean that the last
+                // two links from different connections detach concurrently and both get removed from the registry.
+                // If the client then tries to do a null-source lookup (for unsubscribe) the attach would be rejected
+                // with 'not-found' and the subscription queue would leak potentially accumulating messages.
+                synchronized (addressSpace)
                 {
-                    getLink().linkClosed();
+                    final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
+                    if (links.size() > 1)
+                    {
+                        getLink().linkClosed();
+                    }
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org