You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/15 14:52:01 UTC
[3/4] qpid-broker-j git commit: QPID-7606: Remodel alternateExchange
as alternateBinding
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
index 3b209c0..73a4597 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
@@ -19,16 +19,18 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -124,19 +126,19 @@ public class DirectExchangeTest extends QpidTestCase
Map<String, Object> attributes = new HashMap<>();
attributes.put(Queue.NAME, getTestName());
attributes.put(Queue.DURABLE, false);
- attributes.put(Queue.ALTERNATE_EXCHANGE, _exchange.getName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
Queue queue = (Queue) _vhost.createChild(Queue.class, attributes);
queue.open();
- assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateExchange());
+ assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateBindingDestination());
try
{
_exchange.delete();
- fail("Exchange deletion should fail with ExchangeIsAlternateException");
+ fail("Exchange deletion should fail with MessageDestinationIsAlternateException");
}
- catch(ExchangeIsAlternateException e)
+ catch(MessageDestinationIsAlternateException e)
{
// pass
}
@@ -145,4 +147,78 @@ public class DirectExchangeTest extends QpidTestCase
assertEquals("Unexpected desired exchange state", State.ACTIVE, _exchange.getDesiredState());
}
+ public void testAlternateBindingValidationRejectsNonExistingDestination()
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Exchange.NAME, getTestName());
+ attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, "nonExisting"));
+
+ try
+ {
+ _vhost.createChild(Exchange.class, attributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBindingValidationRejectsSelf()
+ {
+ Map<String, String> alternateBinding = Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName());
+ Map<String, Object> newAttributes = Collections.singletonMap(Exchange.ALTERNATE_BINDING, alternateBinding);
+ try
+ {
+ _exchange.setAttributes(newAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testDurableExchangeRejectsNonDurableAlternateBinding()
+ {
+ Map<String, Object> dlqAttributes = new HashMap<>();
+ String dlqName = getTestName() + "_DLQ";
+ dlqAttributes.put(Queue.NAME, dlqName);
+ dlqAttributes.put(Queue.DURABLE, false);
+ _vhost.createChild(Queue.class, dlqAttributes);
+
+ Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put(Exchange.NAME, getTestName());
+ exchangeAttributes.put(Exchange.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, dlqName));
+ exchangeAttributes.put(Exchange.DURABLE, true);
+ exchangeAttributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+ try
+ {
+ _vhost.createChild(Exchange.class, exchangeAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBinding()
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Exchange.NAME, getTestName());
+ attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
+ attributes.put(Exchange.DURABLE, false);
+
+ Exchange newExchange = _vhost.createChild(Exchange.class, attributes);
+
+ assertEquals("Unexpected alternate binding",
+ _exchange.getName(),
+ newExchange.getAlternateBinding().getDestination());
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
index f8e704d..baf5d0f 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,7 +38,7 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject
private Queue<?> _queue;
private String _routingKey;
private Exchange<?> _exchange;
- private VirtualHost _testVhost;
+ private QueueManagingVirtualHost _testVhost;
@Override
public void setUp() throws Exception
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
index 3c37728..4705053 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -34,7 +35,7 @@ public class QueueLogSubjectTest extends AbstractTestLogSubject
{
private Queue<?> _queue;
- private VirtualHost _testVhost;
+ private QueueManagingVirtualHost _testVhost;
@Override
public void setUp() throws Exception
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 003fc06..1d1578c 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.TestConsumerTarget;
@@ -59,6 +60,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
@@ -70,6 +72,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -965,6 +968,92 @@ abstract class AbstractQueueTestBase extends QpidTestCase
assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
}
+ public void testAlternateBindingValidationRejectsNonExistingDestination()
+ {
+ Map<String, Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, "nonExisting"));
+
+ try
+ {
+ _virtualHost.createChild(Queue.class, attributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBindingValidationRejectsSelf()
+ {
+ Map<String, String> alternateBinding = Collections.singletonMap(AlternateBinding.DESTINATION, _qname);
+ Map<String, Object> newAttributes = Collections.singletonMap(Queue.ALTERNATE_BINDING, alternateBinding);
+ try
+ {
+ _queue.setAttributes(newAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testDurableQueueRejectsNonDurableAlternateBinding()
+ {
+ Map<String, Object> dlqAttributes = new HashMap<>(_arguments);
+ String dlqName = getTestName() + "_DLQ";
+ dlqAttributes.put(Queue.NAME, dlqName);
+ dlqAttributes.put(Queue.DURABLE, false);
+ _virtualHost.createChild(Queue.class, dlqAttributes);
+
+ Map<String, Object> queueAttributes = new HashMap<>(_arguments);
+ queueAttributes.put(Queue.NAME, getTestName());
+ queueAttributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, dlqName));
+ queueAttributes.put(Queue.DURABLE, true);
+
+ try
+ {
+ _virtualHost.createChild(Queue.class, queueAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBinding()
+ {
+ Map<String, Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _qname));
+
+ Queue newQueue = _virtualHost.createChild(Queue.class, attributes);
+
+ assertEquals("Unexpected alternate binding", _qname, newQueue.getAlternateBinding().getDestination());
+ }
+
+ public void testDeleteOfQueueSetAsAlternate()
+ {
+ Map<String, Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _qname));
+
+ Queue newQueue = _virtualHost.createChild(Queue.class, attributes);
+ assertEquals("Unexpected alternate binding", _qname, newQueue.getAlternateBinding().getDestination());
+ try
+ {
+ _queue.delete();
+ fail("Expected exception is not thrown");
+ }
+ catch (MessageDestinationIsAlternateException e)
+ {
+ //pass
+ }
+ }
+
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index fae8ccb..53185f4 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -40,9 +40,9 @@ import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
@@ -57,28 +57,23 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName();
- private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName();
private static final String QUEUE = Queue.class.getSimpleName();
-
private static final UUID ANY_UUID = UUID.randomUUID();
private static final Map ANY_MAP = new HashMap();
- public static final String STANDARD = "standard";
-
+ private static final String STANDARD = "standard";
private String _storePath;
private String _storeName;
private ConfiguredObjectRecordHandler _handler;
- private static final String ROUTING_KEY = "routingKey";
- private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
private UUID _queueId;
private UUID _exchangeId;
@@ -89,6 +84,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private ConfiguredObjectRecord _rootRecord;
+ @Override
public void setUp() throws Exception
{
super.setUp();
@@ -102,7 +98,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
_handler = mock(ConfiguredObjectRecordHandler.class);
- _bindingArgs = new HashMap<String, Object>();
+ _bindingArgs = new HashMap<>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
String argValue = "some selector expression";
_bindingArgs.put(argKey, argValue);
@@ -126,6 +122,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
protected abstract VirtualHostNode createVirtualHostNode(String storeLocation, ConfiguredObjectFactory factory);
+ @Override
public void tearDown() throws Exception
{
try
@@ -162,7 +159,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private Map<String,Object> map(Object... vals)
{
- Map<String,Object> map = new HashMap<String, Object>();
+ Map<String,Object> map = new HashMap<>();
boolean isValue = false;
String key = null;
for(Object obj : vals)
@@ -191,15 +188,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
-
- private ConfiguredObjectRecord matchesRecord(UUID id,
- String type,
- Map<String, Object> attributes,
- final Map<String, UUID> parents)
- {
- return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents));
- }
-
private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP));
@@ -227,7 +215,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument;
- Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes());
+ Map<String,Object> arg = new HashMap<>(binding.getAttributes());
arg.remove("createdBy");
arg.remove("createdTime");
arg.remove("lastUpdatedTime");
@@ -258,7 +246,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
}
}
- public void testCreateQueueAMQQueue() throws Exception
+ public void testCreateQueue() throws Exception
{
Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, null);
_configStore.create(queue.asObjectRecord());
@@ -266,70 +254,17 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
reopenStore();
_configStore.openConfigurationStore(_handler);
- Map<String, Object> queueAttributes = new HashMap<String, Object>();
- queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- queueAttributes.put(Queue.TYPE, STANDARD);
- verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
- }
-
- public void testCreateQueueAMQQueueFieldTable() throws Exception
- {
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
- attributes.put(Queue.TYPE, STANDARD);
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
-
- _configStore.create(queue.asObjectRecord());
-
- reopenStore();
- _configStore.openConfigurationStore(_handler);
-
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
-
- queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- queueAttributes.putAll(attributes);
-
- verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
- }
-
- public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
- {
- Exchange<?> alternateExchange = createTestAlternateExchange();
-
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
- _configStore.create(queue.asObjectRecord());
-
- reopenStore();
- _configStore.openConfigurationStore(_handler);
-
- Map<String, Object> queueAttributes = new HashMap<String, Object>();
+ Map<String, Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
queueAttributes.put(Queue.TYPE, STANDARD);
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
- private Exchange<?> createTestAlternateExchange()
- {
- UUID exchUuid = UUID.randomUUID();
- Exchange<?> alternateExchange = mock(Exchange.class);
- when(alternateExchange.getId()).thenReturn(exchUuid);
- return alternateExchange;
- }
-
- public void testUpdateQueueExclusivity() throws Exception
+ public void testUpdateQueue() throws Exception
{
- // create queue
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ Map<String, Object> attributes = new HashMap<>();
attributes.put(Queue.TYPE, STANDARD);
Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
@@ -343,7 +278,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
reopenStore();
_configStore.openConfigurationStore(_handler);
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
@@ -352,43 +287,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
}
- public void testUpdateQueueAlternateExchange() throws Exception
- {
- // create queue
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
- _configStore.create(queue.asObjectRecord());
-
- // update the queue to have exclusive=false
- Exchange<?> alternateExchange = createTestAlternateExchange();
- queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
-
- _configStore.update(false, queue.asObjectRecord());
-
- reopenStore();
- _configStore.openConfigurationStore(_handler);
-
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
-
- queueAttributes.put(Queue.NAME, getName());
- queueAttributes.putAll(attributes);
- queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- queueAttributes.put(Queue.TYPE, STANDARD);
- verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
- }
public void testRemoveQueue() throws Exception
{
- // create queue
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, Collections.emptyMap());
_configStore.create(queue.asObjectRecord());
- // remove queue
_configStore.remove(queue.asObjectRecord());
reopenStore();
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
@@ -399,36 +303,22 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
boolean exclusive,
final Map<String, Object> arguments) throws StoreException
{
- return createTestQueue(queueName, queueOwner, exclusive, null, arguments);
- }
-
- private Queue<?> createTestQueue(String queueName,
- String queueOwner,
- boolean exclusive,
- Exchange<?> alternateExchange,
- final Map<String, Object> arguments) throws StoreException
- {
Queue queue = BrokerTestHelper.mockWithSystemPrincipal(Queue.class, mock(Principal.class));
when(queue.getName()).thenReturn(queueName);
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getType()).thenReturn(STANDARD);
- when(queue.getAlternateExchange()).thenReturn(alternateExchange);
- when(queue.getCategoryClass()).thenReturn((Class)Queue.class);
+ when(queue.getCategoryClass()).thenReturn(Queue.class);
when(queue.isDurable()).thenReturn(true);
TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
when(queue.getTaskExecutor()).thenReturn(taskExecutor);
when(queue.getChildExecutor()).thenReturn(taskExecutor);
- final VirtualHost vh = mock(VirtualHost.class);
+ final QueueManagingVirtualHost vh = mock(QueueManagingVirtualHost.class);
when(queue.getVirtualHost()).thenReturn(vh);
- final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
+ final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<>() : new LinkedHashMap<>(arguments);
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.TYPE, STANDARD);
- if(alternateExchange != null)
- {
- attributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange);
- }
if(exclusive)
{
when(queue.getOwner()).thenReturn(queueOwner);
@@ -466,7 +356,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private Exchange<?> createTestExchange()
{
Exchange exchange = mock(Exchange.class);
- Map<String,Object> actualAttributes = new HashMap<String, Object>();
+ Map<String,Object> actualAttributes = new HashMap<>();
actualAttributes.put("name", getName());
actualAttributes.put("type", getName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
index 3c574b3..f55ff86 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
@@ -49,7 +49,7 @@ public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
_upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
}
- public void testUpgradeForFlowControlFrom_6_1() throws Exception
+ public void testUpgradeFlowControlFrom_6_1() throws Exception
{
Map<String, Object> rootAttributes = new HashMap<>();
rootAttributes.put("modelVersion", "6.1");
@@ -83,6 +83,122 @@ public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
String.valueOf(upgradedAttributes.get("overflowPolicy")));
}
+ public void testUpgradeQueueAlternateExchangeFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+ Map<String, Object> queueAttributes = new HashMap<>();
+ queueAttributes.put("name", "queue");
+ queueAttributes.put("alternateExchange", "testExchange");
+
+ ConfiguredObjectRecord queueRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Queue", queueAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ final Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put("name", "testExchange");
+ ConfiguredObjectRecord exchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", exchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, queueRecord, exchangeRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(queueRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded queue record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertTrue("Attribute 'alternateBinding' was not added", upgradedAttributes.containsKey("alternateBinding"));
+ assertEquals("Unexpected alternateBinding",
+ new HashMap<>(Collections.singletonMap("destination", "testExchange")),
+ new HashMap<>(((Map<String, String>) upgradedAttributes.get("alternateBinding"))));
+ assertFalse("Attribute 'alternateExchange' was not removed", upgradedAttributes.containsKey("alternateExchange"));
+
+ }
+
+ public void testUpgradeExchangeAlternateExchangeFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+
+ final Map<String, Object> alternateExchangeAttributes = new HashMap<>();
+ alternateExchangeAttributes.put("name", "testExchange");
+ ConfiguredObjectRecord alternateExchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", alternateExchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put("name", "exchange");
+ exchangeAttributes.put("alternateExchange", "testExchange");
+
+ ConfiguredObjectRecord exchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", exchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, exchangeRecord, alternateExchangeRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(exchangeRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded exchange record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertTrue("Attribute 'alternateBinding' was not added", upgradedAttributes.containsKey("alternateBinding"));
+ assertEquals("Unexpected alternateBinding",
+ new HashMap<>(Collections.singletonMap("destination", "testExchange")),
+ new HashMap<>(((Map<String, String>) upgradedAttributes.get("alternateBinding"))));
+ assertFalse("Attribute 'alternateExchange' was not removed", upgradedAttributes.containsKey("alternateExchange"));
+
+ }
+ public void testUpgradeExchangeAlternateExchangeSpecifiedWithUUIDFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+
+ final Map<String, Object> alternateExchangeAttributes = new HashMap<>();
+ alternateExchangeAttributes.put("name", "testExchange");
+ UUID alternateExchangeId = UUID.randomUUID();
+ ConfiguredObjectRecord alternateExchangeRecord = new ConfiguredObjectRecordImpl(alternateExchangeId, "Exchange", alternateExchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+ Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put("name", "exchange");
+ exchangeAttributes.put("alternateExchange", alternateExchangeId.toString());
+
+ ConfiguredObjectRecord exchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", exchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, exchangeRecord, alternateExchangeRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(exchangeRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded exchange record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertTrue("Attribute 'alternateBinding' was not added", upgradedAttributes.containsKey("alternateBinding"));
+ assertEquals("Unexpected alternateBinding",
+ new HashMap<>(Collections.singletonMap("destination", "testExchange")),
+ new HashMap<>(((Map<String, String>) upgradedAttributes.get("alternateBinding"))));
+ assertFalse("Attribute 'alternateExchange' was not removed", upgradedAttributes.containsKey("alternateExchange"));
+ }
+
private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records)
{
for (ConfiguredObjectRecord record : records)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index 8493a03..f5fc87b 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
@@ -37,8 +36,6 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
@@ -110,13 +107,14 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
super.tearDown();
}
}
+
private VirtualHost<?> createHost()
{
- Map<String, Object> attributes = new HashMap<String, Object>();
+ Map<String, Object> attributes = new HashMap<>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
- attributes = new HashMap<String, Object>(attributes);
+ attributes = new HashMap<>(attributes);
attributes.put(VirtualHost.ID, UUID.randomUUID());
TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode);
host.create();
@@ -124,20 +122,9 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
return host;
}
- private void verifyRegisteredQueueCount(int count)
- {
- assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getChildren(Queue.class).size());
- }
-
-
- private void verifyQueueRegistered(String queueName)
- {
- assertNotNull("Queue " + queueName + " was not created", _virtualHost.getChildByName(Queue.class, queueName));
- }
-
public void testPriorityQueueRegistration() throws Exception
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testPriorityQueue");
@@ -147,304 +134,47 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertTrue("Queue not a priority queue", queue instanceof PriorityQueueImpl);
- verifyQueueRegistered("testPriorityQueue");
- verifyRegisteredQueueCount(1);
+ assertNotNull("Queue " + "testPriorityQueue" + " was not created", _virtualHost.getChildByName(Queue.class,
+ "testPriorityQueue"));
+ assertEquals("Queue was not registered in virtualhost", 1, _virtualHost.getChildren(Queue.class).size());
}
-
- public void testSimpleQueueRegistration() throws Exception
+ public void testSimpleQueueCreation() throws Exception
{
String queueName = getName();
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertTrue("Queue not a simple queue", queue instanceof StandardQueueImpl);
- verifyQueueRegistered(queueName);
-
- //verify that no alternate exchange or DLQ were produced
-
- assertNull("Queue should not have an alternate exchange as DLQ wasn't enabled", queue.getAlternateExchange());
- assertNull("The DLQ should not exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
-
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
- * cause the alternate exchange to be set and DLQ to be produced.
- */
- public void testDeadLetterQueueEnabled() throws Exception
- {
-
- String queueName = "testDeadLetterQueueEnabled";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not yet exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
-
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- Exchange<?> altExchange = queue.getAlternateExchange();
- assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
- assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
-
- assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Queue<?> dlQueue = (Queue<?>) _virtualHost.getChildByName(Queue.class, dlQueueName);
- assertNotNull("The DLQ was not registered as expected", dlQueue);
- assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
- assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
- assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts());
-
- //2 queues should have been registered
- verifyRegisteredQueueCount(2);
- }
-
- /**
- * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration
- * are not applied to the DLQ itself.
- */
- public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
- {
-
- String queueName = "testDeadLetterQueueEnabled";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not yet exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
-
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts());
- Exchange<?> altExchange = queue.getAlternateExchange();
- assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
- assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
-
- assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Queue<?> dlQueue = (Queue<?>) _virtualHost.getChildByName(Queue.class, dlQueueName);
- assertNotNull("The DLQ was not registered as expected", dlQueue);
- assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
- assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
- assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts());
-
- //2 queues should have been registered
- verifyRegisteredQueueCount(2);
- }
-
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
- * result in the alternate exchange being set and DLQ being created.
- */
- public void testDeadLetterQueueDisabled() throws Exception
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
-
- String queueName = "testDeadLetterQueueDisabled";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not exist", _virtualHost.getChildByName(Exchange.class,
- dlExchangeName));
-
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, false);
-
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
- assertNull("The alternate exchange should still not exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
- assertNull("The DLQ should still not exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
-
- //only 1 queue should have been registered
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
- * creating an auto-delete queue, does not result in the alternate exchange
- * being set and DLQ being created.
- */
- public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception
- {
-
- String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
-
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
- attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
-
- //create an autodelete queue
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
- assertEquals("Queue should be autodelete",
- LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
- queue.getLifetimePolicy());
-
- //ensure that the autodelete property overrides the request to enable DLQ
- assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
- assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getChildByName( Exchange.class, dlExchangeName));
- assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getChildByName(Queue.class, dlQueueName));
+ assertNotNull("Queue " + queueName + " was not created", _virtualHost.getChildByName(Queue.class, queueName));
- //only 1 queue should have been registered
- verifyRegisteredQueueCount(1);
+ assertEquals("Queue was not registered in virtualhost", 1, _virtualHost.getChildren(Queue.class).size());
}
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
- * the desired effect.
- */
public void testMaximumDeliveryCount() throws Exception
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testMaximumDeliveryCount");
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
final Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryAttempts());
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests that omitting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
- * that queue is created with a default maximumDeliveryCount of zero (unless set in config).
- */
- public void testMaximumDeliveryCountDefault() throws Exception
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
-
- final Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- assertNotNull("The queue was not registered as expected ", queue);
- assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryAttempts());
-
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests queue creation with queue name set to null
- */
- public void testQueueNameNullValidation()
- {
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
-
- _virtualHost.createChild(Queue.class, attributes);
- fail("queue with null name can not be created!");
- }
- catch (Exception e)
- {
- assertTrue(e instanceof IllegalArgumentException);
- assertTrue(e.getMessage().startsWith("The name attribute is mandatory"));
- }
- }
-
- /**
- * Tests queue creation with queue name length less 255 characters but
- * corresponding DLQ name length greater than 255.
- */
- public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255()
- {
- String queueName = "test-" + generateStringWithLength('a', 245);
- try
- {
- // change DLQ name to make its length bigger than exchange name
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
-
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
-
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
-
- _virtualHost.createChild(Queue.class, attributes);
- fail("queue with DLQ name having more than 255 characters can not be created!");
- }
- catch (Exception e)
- {
- assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
- assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name")
- && e.getMessage().contains("length exceeds limit of 255"));
- }
- }
-
- /**
- * Tests queue creation with queue name length less 255 characters but
- * corresponding DL exchange name length greater than 255.
- */
- public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255()
- {
- String queueName = "test-" + generateStringWithLength('a', 245);
- try
- {
- // change DLQ name to make its length bigger than exchange name
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
-
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
-
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, (Object) true);
-
- _virtualHost.createChild(Queue.class, attributes);
- fail("queue with DLE name having more than 255 characters can not be created!");
- }
- catch (Exception e)
- {
- assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
- assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name")
- && e.getMessage().contains("length exceeds limit of 255"));
- }
+ assertEquals("Queue was not registered in virtualhost", 1, _virtualHost.getChildren(Queue.class).size());
}
public void testMessageGroupQueue() throws Exception
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, getTestName());
attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey");
@@ -455,15 +185,5 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
- private String generateStringWithLength(char ch, int length)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++)
- {
- sb.append(ch);
- }
- return sb.toString();
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java b/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
index 2d764c9..68bbf8e 100644
--- a/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
+++ b/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
@@ -262,13 +262,14 @@ class LegacyAccessControlAdapter
properties.put(ObjectProperties.Property.TEMPORARY, lifeTimePolicy != LifetimePolicy.PERMANENT);
properties.put(ObjectProperties.Property.DURABLE, (Boolean)queue.getAttribute(ConfiguredObject.DURABLE));
properties.put(ObjectProperties.Property.EXCLUSIVE, queue.getAttribute(Queue.EXCLUSIVE) != ExclusivityPolicy.NONE);
- Object alternateExchange = queue.getAttribute(Queue.ALTERNATE_EXCHANGE);
- if (alternateExchange != null)
+ Object alternateBinding = queue.getAttribute(Queue.ALTERNATE_BINDING);
+ if (alternateBinding instanceof AlternateBinding)
{
- String name = alternateExchange instanceof ConfiguredObject ?
- (String)((ConfiguredObject)alternateExchange).getAttribute(ConfiguredObject.NAME) :
- String.valueOf(alternateExchange);
- properties.put(ObjectProperties.Property.ALTERNATE, name);
+ String name = ((AlternateBinding)alternateBinding).getDestination();
+ if (name != null && !"".equals(name))
+ {
+ properties.put(ObjectProperties.Property.ALTERNATE, name);
+ }
}
String owner = (String)queue.getAttribute(Queue.OWNER);
if (owner != null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java b/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
index 648154e..1985035 100644
--- a/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
+++ b/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
@@ -35,6 +35,7 @@ import java.util.Map;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
public class LegacyAccessControlAdapterTest extends QpidTestCase
@@ -45,7 +46,7 @@ public class LegacyAccessControlAdapterTest extends QpidTestCase
private static final String TEST_QUEUE = "testQueue";
private LegacyAccessControl _accessControl;
- private VirtualHost<?> _virtualHost;
+ private QueueManagingVirtualHost<?> _virtualHost;
private Broker _broker;
private VirtualHostNode<?> _virtualHostNode;
private LegacyAccessControlAdapter _adapter;
@@ -55,7 +56,7 @@ public class LegacyAccessControlAdapterTest extends QpidTestCase
{
super.setUp();
_accessControl = mock(LegacyAccessControl.class);
- _virtualHost = mock(VirtualHost.class);
+ _virtualHost = mock(QueueManagingVirtualHost.class);
when(_virtualHost.getName()).thenReturn(TEST_VIRTUAL_HOST);
@@ -260,7 +261,7 @@ public class LegacyAccessControlAdapterTest extends QpidTestCase
when(queue.getAttribute(Queue.OWNER)).thenReturn(null);
when(queue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE);
when(queue.getAttribute(Queue.DURABLE)).thenReturn(false);
- when(queue.getAttribute(Queue.ALTERNATE_EXCHANGE)).thenReturn(null);
+ when(queue.getAttribute(Queue.ALTERNATE_BINDING)).thenReturn(null);
when(queue.getCategoryClass()).thenReturn(Queue.class);
when(queue.getParent()).thenReturn(vh);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index b360249..0d8210b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
import org.apache.qpid.server.message.MessageInstance.EntryState;
@@ -473,12 +474,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
if(owningResource instanceof Queue)
{
final Queue<?> queue = (Queue<?>)owningResource;
- final Exchange alternateExchange = queue.getAlternateExchange();
+ final MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
- if(alternateExchange != null)
+ if(alternateBindingDestination != null)
{
getEventLogger().message(ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- alternateExchange.getName()));
+ alternateBindingDestination.getName()));
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index c34740e..1c85614 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -23,19 +23,20 @@ package org.apache.qpid.server.protocol.v0_10;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
@@ -52,6 +53,7 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -79,7 +81,7 @@ import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -861,6 +863,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
return;
}
}
+ String alternateExchangeName = method.getAlternateExchange();
if(nameNullOrEmpty(method.getExchange()))
{
// special case handling to fake the existence of the default exchange for 0-10
@@ -871,11 +874,11 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
+ " of type " + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ " to " + method.getType() +".");
}
- if(!nameNullOrEmpty(method.getAlternateExchange()))
+ if(!nameNullOrEmpty(alternateExchangeName))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
"Attempt to set alternate exchange of the default exchange "
- + " to " + method.getAlternateExchange() +".");
+ + " to " + alternateExchangeName + ".");
}
}
else
@@ -911,7 +914,13 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
+ if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName))
+ {
+ validateAlternateExchangeIsNotQueue(addressSpace, alternateExchangeName);
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
+ }
addressSpace.createMessageDestination(Exchange.class, attributes);;
}
catch(ReservedExchangeNameException e)
@@ -919,8 +928,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
Exchange<?> existingExchange = getExchange(session, exchangeName);
if(existingExchange == null
|| !existingExchange.getType().equals(method.getType())
- || (method.hasAlternateExchange() && (existingExchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(existingExchange.getAlternateExchange().getName()))) )
+ || (method.hasAlternateExchange() && (existingExchange.getAlternateBinding() == null ||
+ !alternateExchangeName
+ .equals(existingExchange.getAlternateBinding().getDestination()))) )
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ exchangeName + " which begins with reserved name or prefix.");
@@ -947,21 +957,23 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
+ " to " + method.getType() +".");
}
else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ && (exchange.getAlternateBinding() == null ||
+ !alternateExchangeName.equals(exchange.getAlternateBinding().getDestination())))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateBinding()
+ + " to " + alternateExchangeName + ".");
}
}
catch (AccessControlException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
-
-
+ catch (IllegalConfigurationException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, e.getMessage());
+ }
}
}
}
@@ -1095,9 +1107,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exchange.delete();
}
- catch (ExchangeIsAlternateException e)
+ catch (MessageDestinationIsAlternateException e)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate binding destination");
}
catch (RequiredExchangeException e)
{
@@ -1518,20 +1530,17 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
final String alternateExchangeName = method.getAlternateExchange();
+ final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(queueName,
+ method.getArguments());
- final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments());
-
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName))
{
- arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName);
+ validateAlternateExchangeIsNotQueue(addressSpace, alternateExchangeName);
+ arguments.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
}
- final UUID id = UUID.randomUUID();
-
- arguments.put(Queue.ID, id);
arguments.put(Queue.NAME, queueName);
-
if(!arguments.containsKey(Queue.LIFETIME_POLICY))
{
LifetimePolicy lifetime;
@@ -1578,6 +1587,20 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (IllegalConfigurationException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, e.getMessage());
+ }
+ }
+ }
+
+ private void validateAlternateExchangeIsNotQueue(final NamedAddressSpace addressSpace, final String alternateExchangeName)
+ {
+ MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);
+ if (alternateMessageDestination != null && !(alternateMessageDestination instanceof Exchange))
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Alternate exchange '%s' is not a destination of type 'exchange'.", alternateExchangeName));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 61c8c26..937e1d0 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.protocol.ErrorCodes;
@@ -91,7 +92,7 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -113,6 +114,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
return input.getMessageInstance();
}
};
+ private static final String ALTERNATE_EXCHANGE = "alternateExchange";
private final DefaultQueueAssociationClearingTask
_defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
@@ -1638,16 +1640,10 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
{
final Queue<?> queue = (Queue<?>) owningResource;
- final Exchange altExchange = queue.getAlternateExchange();
+ final MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
- if (altExchange == null)
+ if (alternateBindingDestination == null)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: "
- + deliveryTag);
- }
messageWithSubject(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
queue.getName(),
msg.getInitialRoutingAddress()));
@@ -1655,14 +1651,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
else
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
- + deliveryTag);
- }
messageWithSubject(ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- altExchange.getName()));
+ alternateBindingDestination.getName()));
}
}
}
@@ -2726,9 +2716,13 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
attributes.put(Exchange.DURABLE, durable);
attributes.put(Exchange.LIFETIME_POLICY,
autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
+
+ Object alternateExchange = attributes.remove(ALTERNATE_EXCHANGE);
+ if (alternateExchange != null)
{
- attributes.put(Exchange.ALTERNATE_EXCHANGE, null);
+ validateAlternateExchangeIsNotQueue(virtualHost, String.valueOf(alternateExchange));
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchange));
}
exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
@@ -2810,6 +2804,16 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
+ private void validateAlternateExchangeIsNotQueue(final NamedAddressSpace addressSpace, final String alternateExchangeName)
+ {
+ MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);
+ if (alternateMessageDestination != null && !(alternateMessageDestination instanceof Exchange))
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Alternate exchange '%s' is not a destination of type 'exchange'.", alternateExchangeName));
+ }
+ }
+
@Override
public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
{
@@ -2858,9 +2862,9 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
}
}
- catch (ExchangeIsAlternateException e)
+ catch (MessageDestinationIsAlternateException e)
{
- closeChannel(ErrorCodes.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ closeChannel(ErrorCodes.NOT_ALLOWED, "Exchange in use as an alternate binding destination");
}
catch (RequiredExchangeException e)
{
@@ -3064,9 +3068,17 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
try
{
- Map<String, Object> attributes =
- QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
final String queueNameString = AMQShortString.toString(queueName);
+ Map<String, Object> wireArguments = FieldTable.convertToMap(arguments);
+ Object alternateExchange = wireArguments.get(ALTERNATE_EXCHANGE);
+ if (alternateExchange != null)
+ {
+ validateAlternateExchangeIsNotQueue(virtualHost, String.valueOf(alternateExchange));
+ }
+
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments);
+
attributes.put(Queue.NAME, queueNameString);
attributes.put(Queue.DURABLE, durable);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index beb1a78..a535fd2 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
@@ -601,9 +602,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
final Queue<?> queue = (Queue<?>) owningResource;
- final Exchange altExchange = queue.getAlternateExchange();
+ final MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
- if (altExchange == null)
+ if (alternateBindingDestination == null)
{
eventLogger.message(logSubject,
ChannelMessages.DISCARDMSG_NOALTEXCH(message.getMessageNumber(),
@@ -614,7 +615,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
eventLogger.message(logSubject,
ChannelMessages.DISCARDMSG_NOROUTE(message.getMessageNumber(),
- altExchange.getName()));
+ alternateBindingDestination.getName()));
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 48af203..a98c8e6 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
@@ -67,8 +68,8 @@ import org.apache.qpid.server.txn.DtxNotSupportedException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
-import org.apache.qpid.server.virtualhost.LinkRegistryModel;
import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
public class ManagementAddressSpace implements NamedAddressSpace
@@ -167,6 +168,12 @@ public class ManagementAddressSpace implements NamedAddressSpace
return null;
}
+ @Override
+ public MessageDestination getAttainedMessageDestination(final String name, final boolean mayCreate)
+ {
+ return getAttainedMessageDestination(name);
+ }
+
ProxyMessageSource getProxyNode(final String name)
{
LOGGER.debug("RG: looking for proxy source {}", name);
@@ -413,5 +420,21 @@ public class ManagementAddressSpace implements NamedAddressSpace
{
}
+
+ @Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public void removeReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
+ public void addReference(final DestinationReferrer destinationReferrer)
+ {
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org