You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/01 12:12:00 UTC
qpid-broker-j git commit: QPID-7998: [Broker-J][AMQP 1.0] Add switch
to disable global shared durable subscriptions and disable them by by default
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 0ca7b8af8 -> c348adfbb
QPID-7998: [Broker-J][AMQP 1.0] Add switch to disable global shared durable subscriptions and disable them by by default
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c348adfb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c348adfb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c348adfb
Branch: refs/heads/master
Commit: c348adfbbcaf77ba7c66a23387c61e2ee49f2bb4
Parents: 0ca7b8a
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 1 12:10:46 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 1 12:10:46 2017 +0000
----------------------------------------------------------------------
.../server/virtualhost/AbstractVirtualHost.java | 9 +++-
.../virtualhost/QueueManagingVirtualHost.java | 10 +++++
.../v1_0/ExchangeSendingDestination.java | 45 ++++++++++++--------
.../server/protocol/v1_0/Session_1_0Test.java | 10 ++++-
.../subscription/SharedSubscriptionTest.java | 9 +++-
5 files changed, 61 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c2f53cc..9e2889e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -274,6 +274,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private Collection<VirtualHostLogger> _virtualHostLoggersToClose;
private PreferenceStore _preferenceStore;
private long _flowToDiskCheckPeriod;
+ private volatile boolean _isGlobalSharedDurableSubscriptionDisabled;
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -596,7 +597,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
_flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
-
+ _isGlobalSharedDurableSubscriptionDisabled = getContextValue(Boolean.class, GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED);
QpidServiceLoader serviceLoader = new QpidServiceLoader();
for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
@@ -2187,6 +2188,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public boolean isGlobalSharedDurableSubscriptionDisabled()
+ {
+ return _isGlobalSharedDurableSubscriptionDisabled;
+ }
+
+ @Override
public long getStoreTransactionIdleTimeoutClose()
{
return _storeTransactionIdleTimeoutClose;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index ba85821..c4418c0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -96,6 +96,13 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
@ManagedContextDefault(name = VIRTUALHOST_STATISTICS_REPORING_PERIOD)
int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
+
+ String GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = "qpid.feature.disabled:globalSharedDurableSubscription";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
+ description = "Flag to disable global shared durable subscriptions.")
+ boolean DEFAULT_GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = true;
+
@ManagedAttribute( defaultValue = "${" + VIRTUALHOST_STATISTICS_REPORING_PERIOD + "}", description = "Period (in seconds) of the statistic report.")
int getStatisticsReportingPeriod();
@@ -140,6 +147,9 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
+ "need to be flowed to disk in order to free memory." )
long getFlowToDiskCheckPeriod();
+ @DerivedAttribute( description = "Indicates whether global shared durable subscriptions are disabled")
+ boolean isGlobalSharedDurableSubscriptionDisabled();
+
String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
@SuppressWarnings("unused")
@ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
index 92c395f..a4a1de3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -142,10 +142,30 @@ public class ExchangeSendingDestination extends StandardSendingDestination
private static Queue<?> getQueue(Exchange<?> exchange, Source source, String subscriptionName, BindingInfo bindingInfo)
throws AmqpErrorException
{
- Queue<?> queue;
- final Map<String, Object> attributes = new HashMap<>();
boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
+ boolean isGlobal = hasCapability(source.getCapabilities(), GLOBAL_CAPABILITY);
+
+ QueueManagingVirtualHost virtualHost;
+ if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
+ {
+ virtualHost = (QueueManagingVirtualHost) exchange.getAddressSpace();
+ }
+ else
+ {
+ throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
+ "Address space of unexpected type"));
+ }
+
+ if (isDurable && isShared && isGlobal && virtualHost.isGlobalSharedDurableSubscriptionDisabled())
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED,
+ "Support for global shared durable subscription is disabled."));
+ }
+
+
+ Queue<?> queue;
+ final Map<String, Object> attributes = new HashMap<>();
ExclusivityPolicy exclusivityPolicy;
if (isShared)
@@ -168,22 +188,11 @@ public class ExchangeSendingDestination extends StandardSendingDestination
Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
try
{
- if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
- {
- try
- {
- queue = ((QueueManagingVirtualHost) exchange.getAddressSpace()).getSubscriptionQueue(exchange.getName(), attributes, bindings);
- }
- catch (NotFoundException e)
- {
- throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
- }
- }
- else
- {
- throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
- "Address space of unexpected type"));
- }
+ queue = virtualHost.getSubscriptionQueue(exchange.getName(), attributes, bindings);
+ }
+ catch (NotFoundException e)
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
}
catch(IllegalStateException e)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 2dacac3..e466ccf 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -27,11 +27,11 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -76,6 +76,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
public class Session_1_0Test extends QpidTestCase
@@ -102,7 +104,11 @@ public class Session_1_0Test extends QpidTestCase
public void setUp() throws Exception
{
super.setUp();
- _virtualHost = BrokerTestHelper.createVirtualHost("testVH");
+ Map<String, Object> virtualHostAttributes = new HashMap<>();
+ virtualHostAttributes.put(QueueManagingVirtualHost.NAME, "testVH");
+ virtualHostAttributes.put(QueueManagingVirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, false));
+ virtualHostAttributes.put(QueueManagingVirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+ _virtualHost = BrokerTestHelper.createVirtualHost(virtualHostAttributes);
_taskExecutor = new CurrentThreadTaskExecutor();
_taskExecutor.start();
_connection = createAmqpConnection_1_0("testContainerId");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c348adfb/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index ae252f2..d89f14f 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.systests.jms_2_0.subscription;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -30,8 +31,10 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class SharedSubscriptionTest extends QpidBrokerTestCase
{
@@ -41,7 +44,11 @@ public class SharedSubscriptionTest extends QpidBrokerTestCase
@Override
public void setUp() throws Exception
{
- getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+ TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
+ brokerConfiguration.addHttpManagementConfiguration();
+ brokerConfiguration.setBrokerAttribute("context",
+ Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED,
+ false));
super.setUp();
_restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org