You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/01 12:12:00 UTC

qpid-broker-j git commit: QPID-7998: [Broker-J][AMQP 1.0] Add switch to disable global shared durable subscriptions and disable them by by default

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 0ca7b8af8 -> c348adfbb


QPID-7998: [Broker-J][AMQP 1.0] Add switch to disable global shared durable subscriptions and disable them by by default


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c348adfb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c348adfb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c348adfb

Branch: refs/heads/master
Commit: c348adfbbcaf77ba7c66a23387c61e2ee49f2bb4
Parents: 0ca7b8a
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 1 12:10:46 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 1 12:10:46 2017 +0000

----------------------------------------------------------------------
 .../server/virtualhost/AbstractVirtualHost.java |  9 +++-
 .../virtualhost/QueueManagingVirtualHost.java   | 10 +++++
 .../v1_0/ExchangeSendingDestination.java        | 45 ++++++++++++--------
 .../server/protocol/v1_0/Session_1_0Test.java   | 10 ++++-
 .../subscription/SharedSubscriptionTest.java    |  9 +++-
 5 files changed, 61 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c2f53cc..9e2889e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -274,6 +274,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     private Collection<VirtualHostLogger> _virtualHostLoggersToClose;
     private PreferenceStore _preferenceStore;
     private long _flowToDiskCheckPeriod;
+    private volatile boolean _isGlobalSharedDurableSubscriptionDisabled;
 
     public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
     {
@@ -596,7 +597,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
         _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
         _flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
-
+        _isGlobalSharedDurableSubscriptionDisabled = getContextValue(Boolean.class, GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED);
 
         QpidServiceLoader serviceLoader = new QpidServiceLoader();
         for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
@@ -2187,6 +2188,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
+    public boolean isGlobalSharedDurableSubscriptionDisabled()
+    {
+        return _isGlobalSharedDurableSubscriptionDisabled;
+    }
+
+    @Override
     public long getStoreTransactionIdleTimeoutClose()
     {
         return _storeTransactionIdleTimeoutClose;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index ba85821..c4418c0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -96,6 +96,13 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
     @ManagedContextDefault(name = VIRTUALHOST_STATISTICS_REPORING_PERIOD)
     int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
 
+
+    String GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = "qpid.feature.disabled:globalSharedDurableSubscription";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
+                           description = "Flag to disable global shared durable subscriptions.")
+    boolean DEFAULT_GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = true;
+
     @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_STATISTICS_REPORING_PERIOD + "}", description = "Period (in seconds) of the statistic report.")
     int getStatisticsReportingPeriod();
 
@@ -140,6 +147,9 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
                                      + "need to be flowed to disk in order to free memory." )
     long getFlowToDiskCheckPeriod();
 
+    @DerivedAttribute( description = "Indicates whether global shared durable subscriptions are disabled")
+    boolean isGlobalSharedDurableSubscriptionDisabled();
+
     String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
     @SuppressWarnings("unused")
     @ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
index 92c395f..a4a1de3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -142,10 +142,30 @@ public class ExchangeSendingDestination extends StandardSendingDestination
     private static Queue<?> getQueue(Exchange<?> exchange, Source source, String subscriptionName, BindingInfo bindingInfo)
             throws AmqpErrorException
     {
-        Queue<?> queue;
-        final Map<String, Object> attributes = new HashMap<>();
         boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
         boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
+        boolean isGlobal = hasCapability(source.getCapabilities(), GLOBAL_CAPABILITY);
+
+        QueueManagingVirtualHost virtualHost;
+        if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
+        {
+            virtualHost = (QueueManagingVirtualHost) exchange.getAddressSpace();
+        }
+        else
+        {
+            throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
+                                                   "Address space of unexpected type"));
+        }
+
+        if (isDurable && isShared && isGlobal && virtualHost.isGlobalSharedDurableSubscriptionDisabled())
+        {
+            throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED,
+                                                   "Support for global shared durable subscription is disabled."));
+        }
+
+
+        Queue<?> queue;
+        final Map<String, Object> attributes = new HashMap<>();
 
         ExclusivityPolicy exclusivityPolicy;
         if (isShared)
@@ -168,22 +188,11 @@ public class ExchangeSendingDestination extends StandardSendingDestination
         Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
         try
         {
-            if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
-            {
-                try
-                {
-                    queue = ((QueueManagingVirtualHost) exchange.getAddressSpace()).getSubscriptionQueue(exchange.getName(), attributes, bindings);
-                }
-                catch (NotFoundException e)
-                {
-                    throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
-                }
-            }
-            else
-            {
-                throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
-                                                       "Address space of unexpected type"));
-            }
+            queue = virtualHost.getSubscriptionQueue(exchange.getName(), attributes, bindings);
+        }
+        catch (NotFoundException e)
+        {
+            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
         }
         catch(IllegalStateException e)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 2dacac3..e466ccf 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -27,11 +27,11 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -76,6 +76,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class Session_1_0Test extends QpidTestCase
@@ -102,7 +104,11 @@ public class Session_1_0Test extends QpidTestCase
     public void setUp() throws Exception
     {
         super.setUp();
-        _virtualHost = BrokerTestHelper.createVirtualHost("testVH");
+        Map<String, Object> virtualHostAttributes = new HashMap<>();
+        virtualHostAttributes.put(QueueManagingVirtualHost.NAME, "testVH");
+        virtualHostAttributes.put(QueueManagingVirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, false));
+        virtualHostAttributes.put(QueueManagingVirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+        _virtualHost = BrokerTestHelper.createVirtualHost(virtualHostAttributes);
         _taskExecutor = new CurrentThreadTaskExecutor();
         _taskExecutor.start();
         _connection = createAmqpConnection_1_0("testContainerId");

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index ae252f2..d89f14f 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.systests.jms_2_0.subscription;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,8 +31,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.systest.rest.RestTestHelper;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
 
 public class SharedSubscriptionTest extends QpidBrokerTestCase
 {
@@ -41,7 +44,11 @@ public class SharedSubscriptionTest extends QpidBrokerTestCase
     @Override
     public void setUp() throws Exception
     {
-        getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+        TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
+        brokerConfiguration.addHttpManagementConfiguration();
+        brokerConfiguration.setBrokerAttribute("context",
+                                               Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
+                                                                        false));
         super.setUp();
         _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org