You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/17 21:48:06 UTC
svn commit: r1569109 [3/5] - in /qpid/trunk/qpid/java: ./
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/server/store/berk...
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Mon Feb 17 20:48:05 2014
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.queue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,9 +40,10 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -98,33 +94,19 @@ public class AMQQueueFactoryTest extends
private void delegateVhostQueueCreation() throws Exception
{
- final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class);
- final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class);
- final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class);
- final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class);
- final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class);
- final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class);
- when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(),
- autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then(
+ final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class);
+
+ when(_virtualHost.createQueue(any(AMQSessionModel.class), attributes.capture())).then(
new Answer<AMQQueue>()
{
@Override
public AMQQueue answer(InvocationOnMock invocation) throws Throwable
{
- return _queueFactory.createQueue(id.getValue(),
- queueName.getValue(),
- durable.getValue(),
- owner.getValue(),
- autoDelete.getValue(),
- exclusive.getValue(),
- deleteOnNoConsumer.getValue(),
- arguments.getValue());
+ return _queueFactory.createQueue(null, attributes.getValue());
}
}
- );
+ );
}
private void mockQueueRegistry()
@@ -217,17 +199,14 @@ public class AMQQueueFactoryTest extends
public void testPriorityQueueRegistration() throws Exception
{
- Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, "testPriorityQueue");
+
+ attributes.put(Queue.PRIORITIES, 5);
- AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- "testPriorityQueue",
- false,
- "owner",
- false,
- false,
- false,
- attributes);
+ AMQQueue queue = _queueFactory.createQueue(null, attributes);
assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -240,10 +219,12 @@ public class AMQQueueFactoryTest extends
String queueName = getName();
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
- false,
- false,
- null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+
+
+ AMQQueue queue = _queueFactory.createQueue(null, attributes);
assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
@@ -261,7 +242,6 @@ public class AMQQueueFactoryTest extends
*/
public void testDeadLetterQueueEnabled() throws Exception
{
- Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
@@ -270,14 +250,13 @@ public class AMQQueueFactoryTest extends
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- queueName,
- false,
- "owner",
- false,
- false,
- false,
- attributes);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+
+ AMQQueue queue = _queueFactory.createQueue(null, attributes);
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -314,14 +293,11 @@ public class AMQQueueFactoryTest extends
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- queueName,
- false,
- "owner",
- false,
- false,
- false,
- null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+
+ AMQQueue queue = _queueFactory.createQueue(null, attributes);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
@@ -348,7 +324,8 @@ public class AMQQueueFactoryTest extends
*/
public void testDeadLetterQueueDisabled() throws Exception
{
- Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
String queueName = "testDeadLetterQueueDisabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
@@ -357,14 +334,11 @@ public class AMQQueueFactoryTest extends
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- queueName,
- false,
- "owner",
- false,
- false,
- false,
- attributes);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, false);
+
+ AMQQueue queue = _queueFactory.createQueue(null, attributes);
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
@@ -382,7 +356,6 @@ public class AMQQueueFactoryTest extends
*/
public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception
{
- Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
@@ -391,16 +364,18 @@ public class AMQQueueFactoryTest extends
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
//create an autodelete queue
- AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- queueName,
- false,
- "owner",
- true,
- false,
- false,
- attributes);
- assertTrue("Queue should be autodelete", queue.isAutoDelete());
+ AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ assertEquals("Queue should be autodelete",
+ LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
+ queue.getLifetimePolicy());
//ensure that the autodelete property overrides the request to enable DLQ
assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
@@ -417,16 +392,13 @@ public class AMQQueueFactoryTest extends
*/
public void testMaximumDeliveryCount() throws Exception
{
- Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, "testMaximumDeliveryCount");
+
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- "testMaximumDeliveryCount",
- false,
- "owner",
- false,
- false,
- false,
- attributes);
+ final AMQQueue queue = _queueFactory.createQueue(null, attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
@@ -440,14 +412,11 @@ public class AMQQueueFactoryTest extends
*/
public void testMaximumDeliveryCountDefault() throws Exception
{
- final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
- "testMaximumDeliveryCount",
- false,
- "owner",
- false,
- false,
- false,
- null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
+
+ final AMQQueue queue = _queueFactory.createQueue(null, attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
@@ -462,15 +431,16 @@ public class AMQQueueFactoryTest extends
{
try
{
- _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false,
- false,
- null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+
+ _queueFactory.createQueue(null, attributes);
fail("queue with null name can not be created!");
}
catch (Exception e)
{
assertTrue(e instanceof IllegalArgumentException);
- assertEquals("Queue name must not be null", e.getMessage());
+ assertEquals("Value for attribute name is not found", e.getMessage());
}
}
@@ -486,9 +456,14 @@ public class AMQQueueFactoryTest extends
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
- Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
- _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, false, attributes);
+
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+
+ _queueFactory.createQueue(null, attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -511,9 +486,14 @@ public class AMQQueueFactoryTest extends
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
- Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
- _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, false, attributes);
+
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, queueName);
+
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+
+ _queueFactory.createQueue(null, attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -528,6 +508,7 @@ public class AMQQueueFactoryTest extends
{
Map<String,String> arguments = new HashMap<String, String>();
+
arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"mykey");
arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java Mon Feb 17 20:48:05 2014
@@ -26,9 +26,11 @@ import junit.framework.TestCase;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
public class ConflationQueueListTest extends TestCase
@@ -46,8 +48,11 @@ public class ConflationQueueListTest ext
protected void setUp() throws Exception
{
super.setUp();
- _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
- Collections.<String,Object>emptyMap(),CONFLATION_KEY);
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY);
+ _queue = new ConflationQueue(mock(VirtualHost.class), null, queueAttributes);
_list = _queue.getEntries();
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Feb 17 20:48:05 2014
@@ -26,7 +26,10 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
@@ -60,6 +63,13 @@ public class MockAMQQueue implements AMQ
_name = name;
}
+ @Override
+ public void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy)
+ throws ExistingConsumerPreventsExclusive
+ {
+
+ }
+
public boolean getDeleteOnNoConsumers()
{
return false;
@@ -131,6 +141,12 @@ public class MockAMQQueue implements AMQ
return 0;
}
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return null;
+ }
+
public int getBindingCountHigh()
{
return 0;
@@ -216,7 +232,7 @@ public class MockAMQQueue implements AMQ
}
@Override
- public void addQueueDeleteTask(final Action task)
+ public void addDeleteTask(final Action task)
{
}
@@ -345,7 +361,7 @@ public class MockAMQQueue implements AMQ
}
@Override
- public void removeQueueDeleteTask(final Action task)
+ public void removeDeleteTask(final Action task)
{
}
@@ -478,6 +494,12 @@ public class MockAMQQueue implements AMQ
return false;
}
+ @Override
+ public boolean verifySessionAccess(final AMQSessionModel session)
+ {
+ return false;
+ }
+
public Exchange getAlternateExchange()
{
return null;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Mon Feb 17 20:48:05 2014
@@ -26,10 +26,12 @@ import static org.mockito.Mockito.when;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
public class PriorityQueueListTest extends QpidTestCase
@@ -45,16 +47,11 @@ public class PriorityQueueListTest exten
protected void setUp()
{
QueueEntry[] entries = new QueueEntry[PRIORITIES.length];
-
- PriorityQueue queue = new PriorityQueue(UUID.randomUUID(),
- getName(),
- false,
- null,
- false,
- false,
- mock(VirtualHost.class),
- Collections.<String,Object>emptyMap(),
- 10);
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.PRIORITIES, 10);
+ PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), null, queueAttributes);
_list = queue.getEntries();
for (int i = 0; i < PRIORITIES.length; i++)
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Mon Feb 17 20:48:05 2014
@@ -27,11 +27,13 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.lang.reflect.Field;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import static org.mockito.Mockito.mock;
@@ -199,8 +201,10 @@ public abstract class QueueEntryImplTest
{
int numberOfEntries = 5;
QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
- StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false,
- mock(VirtualHost.class), Collections.<String,Object>emptyMap());
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, getName());
+ StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java Mon Feb 17 20:48:05 2014
@@ -29,12 +29,12 @@ import static org.mockito.Matchers.conta
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.Map;
+import java.util.*;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -51,10 +51,6 @@ import org.apache.qpid.server.util.Broke
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class);
@@ -68,7 +64,7 @@ abstract class SimpleAMQQueueTestBase<E
private DirectExchange _exchange;
private MockConsumer _consumerTarget = new MockConsumer();
private QueueConsumer _consumer;
- private Map<String,Object> _arguments = null;
+ private Map<String,Object> _arguments = Collections.emptyMap();
@Override
public void setUp() throws Exception
@@ -78,8 +74,12 @@ abstract class SimpleAMQQueueTestBase<E
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner,
- false, false, false, _arguments);
+ Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+
+ _queue = (Q) _virtualHost.createQueue(null, attributes);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
}
@@ -104,9 +104,10 @@ abstract class SimpleAMQQueueTestBase<E
_queue.stop();
try
{
- _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
- false, _owner, false,
- false, false, _arguments);
+ Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+
+ _queue = (Q) _virtualHost.createQueue(null, attributes);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -115,10 +116,10 @@ abstract class SimpleAMQQueueTestBase<E
e.getMessage().contains("name"));
}
- _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
- "differentName", false,
- _owner, false,
- false, false, _arguments);
+ Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ attributes.put(Queue.NAME, "differentName");
+ _queue = (Q) _virtualHost.createQueue(null, attributes);
assertNotNull("Queue was not created", _queue);
}
@@ -1137,15 +1138,17 @@ abstract class SimpleAMQQueueTestBase<E
{
public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost)
{
- super(UUIDGenerator.generateRandomUUID(),
- "testQueue",
- false,
- "testOwner",
- false,
- false,
- vhost,
- factory,
- null);
+ super(vhost, null, attributes(), factory);
+ }
+
+ private static Map<String,Object> attributes()
+ {
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, "test");
+ attributes.put(Queue.DURABLE, false);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+ return attributes;
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Mon Feb 17 20:48:05 2014
@@ -21,11 +21,15 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import java.util.HashMap;
+import java.util.Map;
+
public class SimpleAMQQueueThreadPoolTest extends QpidTestCase
{
@@ -50,8 +54,10 @@ public class SimpleAMQQueueThreadPoolTes
try
{
- SimpleAMQQueue queue = (SimpleAMQQueue)
- test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ attributes.put(Queue.NAME, "test");
+ AMQQueue queue = test.createQueue(null, attributes);
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Mon Feb 17 20:48:05 2014
@@ -22,8 +22,11 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import static org.mockito.Mockito.mock;
@@ -38,8 +41,10 @@ public class SimpleQueueEntryImplTest ex
public void setUp() throws Exception
{
mockLogging();
-
- StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null);
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest");
+ StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
queueEntryList = queue.getEntries();
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Mon Feb 17 20:48:05 2014
@@ -26,9 +26,13 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import static org.mockito.Matchers.eq;
@@ -74,8 +78,15 @@ public class SortedQueueEntryListTest ex
{
mockLogging();
+ Map<String,Object> attributes = new HashMap<String,Object>();
+ attributes.put(Queue.ID,UUID.randomUUID());
+ attributes.put(Queue.NAME, getName());
+ attributes.put(Queue.DURABLE, false);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+ attributes.put(Queue.SORT_KEY, "KEY");
+
// Create test list
- _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ _testQueue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Mon Feb 17 20:48:05 2014
@@ -20,11 +20,15 @@
package org.apache.qpid.server.queue;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.mockito.Matchers.eq;
@@ -42,7 +46,15 @@ public class SortedQueueEntryTest extend
public void setUp() throws Exception
{
mockLogging();
- SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+
+ Map<String,Object> attributes = new HashMap<String,Object>();
+ attributes.put(Queue.ID,UUID.randomUUID());
+ attributes.put(Queue.NAME, getName());
+ attributes.put(Queue.DURABLE, false);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+ attributes.put(Queue.SORT_KEY, "KEY");
+
+ SortedQueue queue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Mon Feb 17 20:48:05 2014
@@ -22,9 +22,10 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,8 +46,11 @@ public class StandardQueueEntryListTest
protected void setUp()
{
oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
- _testQueue = new StandardQueue(UUID.randomUUID(),getName(),false,null,false,false,mock(VirtualHost.class),
- Collections.<String,Object>emptyMap());
+
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, getName());
+ _testQueue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
_sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
@@ -86,9 +90,10 @@ public class StandardQueueEntryListTest
{
if(newList)
{
- StandardQueue queue =
- new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
- Collections.<String, Object>emptyMap());
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, getName());
+ StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
return queue.getEntries();
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Mon Feb 17 20:48:05 2014
@@ -25,18 +25,21 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
-import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+
public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
{
@@ -44,7 +47,12 @@ public class StandardQueueTest extends S
{
try
{
- setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, getOwner(), false,false, null, getArguments()));
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
+ queueAttributes.put(Queue.OWNER, "testOwner");
+
+ setQueue(new StandardQueue(null, null, queueAttributes));
assertNull("Queue was created", getQueue());
}
catch (IllegalArgumentException e)
@@ -58,7 +66,13 @@ public class StandardQueueTest extends S
public void testAutoDeleteQueue() throws Exception
{
getQueue().stop();
- setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, null, true, false, getVirtualHost(), Collections.<String,Object>emptyMap()));
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, getQname());
+ queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+ final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes);
+
+ setQueue(queue);
getQueue().setDeleteOnNoConsumers(true);
ServerMessage message = createMessage(25l);
@@ -75,8 +89,12 @@ public class StandardQueueTest extends S
public void testActiveConsumerCount() throws Exception
{
- final StandardQueue queue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
- "testOwner", false, false, getVirtualHost(), null);
+
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
+ queueAttributes.put(Queue.OWNER, "testOwner");
+ final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes);
//verify adding an active consumer increases the count
final MockConsumer consumer1 = new MockConsumer();
@@ -145,8 +163,7 @@ public class StandardQueueTest extends S
public void testEnqueueDequeuedEntry() throws Exception
{
// create a queue where each even entry is considered a dequeued
- SimpleAMQQueue queue = new DequeuedQueue(UUIDGenerator.generateRandomUUID(), "test", false,
- "testOwner", false, false, getVirtualHost(), null);
+ SimpleAMQQueue queue = new DequeuedQueue(getVirtualHost());
// create a consumer
MockConsumer consumer = new MockConsumer();
@@ -180,9 +197,11 @@ public class StandardQueueTest extends S
int messageNumber = 4;
int dequeueMessageIndex = 1;
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.ID, UUID.randomUUID());
+ queueAttributes.put(Queue.NAME, "test");
// create queue with overridden method deliverAsync
- StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test",
- false, "testOwner", false, false, getVirtualHost(), null)
+ StandardQueue testQueue = new StandardQueue(getVirtualHost(), null, queueAttributes)
{
@Override
public void deliverAsync(QueueConsumer sub)
@@ -249,16 +268,19 @@ public class StandardQueueTest extends S
private static class DequeuedQueue extends SimpleAMQQueue<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
{
- public DequeuedQueue(final UUID id,
- final String queueName,
- final boolean durable,
- final String owner,
- final boolean autoDelete,
- final boolean exclusive,
- final VirtualHost virtualHost,
- final Map<String, Object> arguments)
+ public DequeuedQueue(VirtualHost virtualHost)
+ {
+ super(virtualHost, null, attributes(), new DequeuedQueueEntryListFactory());
+ }
+
+ private static Map<String,Object> attributes()
{
- super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, "test");
+ attributes.put(Queue.DURABLE, false);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+ return attributes;
}
}
private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Mon Feb 17 20:48:05 2014
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.times;
import java.io.File;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@@ -40,6 +41,7 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -143,7 +145,7 @@ public abstract class AbstractDurableCon
verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
- org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
+ org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
}
private Map<String,Object> map(Object... vals)
@@ -220,7 +222,7 @@ public abstract class AbstractDurableCon
Map<String, Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -240,7 +242,7 @@ public abstract class AbstractDurableCon
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -258,7 +260,7 @@ public abstract class AbstractDurableCon
Map<String, Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -292,8 +294,6 @@ public abstract class AbstractDurableCon
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -320,8 +320,6 @@ public abstract class AbstractDurableCon
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
@@ -361,13 +359,19 @@ public abstract class AbstractDurableCon
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
- when(queue.getOwner()).thenReturn(queueOwner);
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
- if(arguments != null && !arguments.isEmpty())
+ final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
+ attributes.put(Queue.NAME, queueName);
+ if(exclusive)
{
- when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+ when(queue.getOwner()).thenReturn(queueOwner);
+
+ attributes.put(Queue.OWNER, queueOwner);
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
+ }
+ when(queue.getAvailableAttributes()).thenReturn(attributes.keySet());
final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
when(queue.getAttribute(requestedAttribute.capture())).then(
new Answer()
@@ -377,10 +381,9 @@ public abstract class AbstractDurableCon
public Object answer(final InvocationOnMock invocation) throws Throwable
{
String attrName = requestedAttribute.getValue();
- return arguments.get(attrName);
+ return attributes.get(attrName);
}
});
- }
return queue;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Mon Feb 17 20:48:05 2014
@@ -25,11 +25,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.net.SocketAddress;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -182,8 +184,10 @@ public class BrokerTestHelper
public static AMQQueue createQueue(String queueName, VirtualHost virtualHost)
throws QpidSecurityException, QueueExistsException
{
- AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
- false, false, false, Collections.<String, Object>emptyMap());
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ attributes.put(Queue.NAME, queueName);
+ AMQQueue queue = virtualHost.createQueue(null, attributes);
return queue;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java Mon Feb 17 20:48:05 2014
@@ -46,6 +46,7 @@ import org.apache.qpid.server.store.Conf
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -117,14 +118,11 @@ public class DurableConfigurationRecover
- final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
- final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
+ final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class);
_queueFactory = mock(QueueFactory.class);
- when(_queueFactory.restoreQueue(idArg.capture(), queueArg.capture(),
- anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+ when(_queueFactory.restoreQueue(attributesArg.capture())).then(
new Answer()
{
@@ -133,8 +131,9 @@ public class DurableConfigurationRecover
{
final AMQQueue queue = mock(AMQQueue.class);
- final String queueName = queueArg.getValue();
- final UUID queueId = idArg.getValue();
+ final Map attributes = attributesArg.getValue();
+ final String queueName = (String) attributes.get(Queue.NAME);
+ final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
when(queue.getName()).thenReturn(queueName);
when(queue.getId()).thenReturn(queueId);
@@ -153,10 +152,10 @@ public class DurableConfigurationRecover
return null;
}
}
- ).when(queue).setAlternateExchange(altExchangeArg.capture());
+ ).when(queue).setAlternateExchange(altExchangeArg.capture());
- Map args = argsArg.getValue();
- if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+ Map args = attributes;
+ if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
{
final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
@@ -470,7 +469,6 @@ public class DurableConfigurationRecover
{
queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString());
}
- queue.put(Queue.EXCLUSIVE, false);
return queue;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Mon Feb 17 20:48:05 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -152,14 +153,7 @@ public class MockVirtualHost implements
}
@Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments)
+ public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments)
{
return null;
}
@@ -314,4 +308,52 @@ public class MockVirtualHost implements
public void unblock()
{
}
+
+ @Override
+ public long getDefaultAlertThresholdMessageAge()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthMessages()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertRepeatGap()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultQueueFlowControlSizeBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultQueueFlowResumeSizeBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getDefaultMaximumDeliveryAttempts()
+ {
+ return 0;
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Feb 17 20:48:05 2014
@@ -25,6 +25,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -56,7 +58,8 @@ import static org.apache.qpid.server.log
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+ LogSubject, AuthorizationHolder
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -72,6 +75,10 @@ public class ServerConnection extends Co
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
+
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+
private volatile boolean _stopped;
public ServerConnection(final long connectionId, Broker broker)
@@ -197,7 +204,7 @@ public class ServerConnection extends Co
_onOpenTask = task;
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(ServerSession session, AMQConstant cause, String message)
{
ExecutionException ex = new ExecutionException();
ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -211,7 +218,7 @@ public class ServerConnection extends Co
}
ex.setErrorCode(code);
ex.setDescription(message);
- ((ServerSession)session).invoke(ex);
+ session.invoke(ex);
session.close(cause, message);
}
@@ -315,6 +322,7 @@ public class ServerConnection extends Co
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
+ performDeleteTasks();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -327,6 +335,14 @@ public class ServerConnection extends Co
close(replyCode, message);
}
+ protected void performDeleteTasks()
+ {
+ for(Action<? super ServerConnection> task : _taskList)
+ {
+ task.performAction(this);
+ }
+ }
+
public synchronized void block()
{
if(!_blocking)
@@ -367,12 +383,12 @@ public class ServerConnection extends Co
super.removeSession(ssn);
}
- public List<AMQSessionModel> getSessionModels()
+ public List<ServerSession> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ List<ServerSession> sessions = new ArrayList<ServerSession>();
for (Session ssn : getChannels())
{
- sessions.add((AMQSessionModel) ssn);
+ sessions.add((ServerSession) ssn);
}
return sessions;
}
@@ -475,14 +491,10 @@ public class ServerConnection extends Co
return String.valueOf(getRemoteAddress());
}
- public String getUserName()
- {
- return _authorizedPrincipal.getName();
- }
-
@Override
public void closed()
{
+ performDeleteTasks();
closeSubscriptions();
super.closed();
}
@@ -522,6 +534,12 @@ public class ServerConnection extends Co
}
@Override
+ public String getRemoteContainerName()
+ {
+ return getConnectionDelegate().getClientId();
+ }
+
+ @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
@@ -533,11 +551,6 @@ public class ServerConnection extends Co
return getConnectionDelegate().getClientProduct();
}
- public String getPrincipalAsString()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
public long getSessionCountLimit()
{
return getChannelMax();
@@ -565,4 +578,16 @@ public class ServerConnection extends Co
super.doHeartBeat();
}
+
+ @Override
+ public void addDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.remove(task);
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Feb 17 20:48:05 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -310,14 +311,18 @@ public class ServerConnectionDelegate ex
private boolean isSessionNameUnique(final byte[] name, final Connection conn)
{
final ServerConnection sconn = (ServerConnection) conn;
- final String userId = sconn.getUserName();
+ final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+ final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
final Iterator<AMQConnectionModel> connections =
((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
while(connections.hasNext())
{
- final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
- if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+ final AMQConnectionModel amqConnectionModel = connections.next();
+ final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+ ? ""
+ : amqConnectionModel.getAuthorizedPrincipal().getName();
+ if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
{
return false;
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Feb 17 20:48:05 2014
@@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.Suspen
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -88,7 +89,9 @@ import static org.apache.qpid.util.Seria
public class ServerSession extends Session
implements AuthorizationHolder,
- AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
+ AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+ Deletable<ServerSession>
+
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
@@ -132,7 +135,7 @@ public class ServerSession extends Sessi
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
+ private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -374,7 +377,7 @@ public class ServerSession extends Sessi
}
_messageDispositionListenerMap.clear();
- for (Action<ServerSession> task : _taskList)
+ for (Action<? super ServerSession> task : _taskList)
{
task.performAction(this);
}
@@ -610,12 +613,12 @@ public class ServerSession extends Sessi
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Action<ServerSession> task)
+ public void addDeleteTask(Action<? super ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Action<ServerSession> task)
+ public void removeDeleteTask(Action<? super ServerSession> task)
{
_taskList.remove(task);
}
@@ -652,7 +655,7 @@ public class ServerSession extends Sessi
return _id;
}
- public AMQConnectionModel getConnectionModel()
+ public ServerConnection getConnectionModel()
{
return getConnection();
}
@@ -922,7 +925,7 @@ public class ServerSession extends Sessi
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(ServerSession o)
{
return getId().compareTo(o.getId());
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Feb 17 20:48:05 2014
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.Exchange;
@@ -204,47 +206,12 @@ public class ServerSessionDelegate exten
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ else if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
- if(queue.isExclusive())
- {
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
-
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getExclusiveOwningSession() == session)
- {
- queue.setExclusiveOwningSession(null);
- }
- }
- });
-
- if(queue.getAuthorizationHolder() == null)
- {
- queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getAuthorizationHolder() == session)
- {
- queue.setAuthorizationHolder(null);
- }
- }
- });
- }
- }
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -302,6 +269,10 @@ public class ServerSessionDelegate exten
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
+ }
}
}
}
@@ -1197,7 +1168,7 @@ public class ServerSessionDelegate exten
exception(session, method, errorCode, description);
}
- else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ else if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1214,7 +1185,6 @@ public class ServerSessionDelegate exten
try
{
- String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
final String alternateExchangeName = method.getAlternateExchange();
@@ -1227,66 +1197,36 @@ public class ServerSessionDelegate exten
final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
- final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+ arguments.put(Queue.ID, id);
+ arguments.put(Queue.NAME, queueName);
- queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
- autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
-
- if (autoDelete && exclusive)
+ LifetimePolicy lifetime;
+ if(autoDelete)
{
- final AMQQueue q = queue;
- final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- try
- {
- virtualHost.removeQueue(q);
- }
- catch (QpidSecurityException e)
- {
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+ : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
}
- if (exclusive)
+ else
{
- final AMQQueue q = queue;
- final Action<ServerSession> removeExclusive = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
+ lifetime = LifetimePolicy.PERMANENT;
}
+
+ arguments.put(Queue.LIFETIME_POLICY, lifetime);
+
+ ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+
+
+ arguments.put(Queue.DURABLE, method.getDurable());
+
+ arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+
+ queue = virtualHost.createQueue((ServerSession)session, arguments);
+
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1347,11 +1287,7 @@ public class ServerSessionDelegate exten
}
else
{
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1424,7 +1360,7 @@ public class ServerSessionDelegate exten
result.setQueue(queue.getName());
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
- result.setAutoDelete(queue.isAutoDelete());
+ result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
Map<String, Object> arguments = new LinkedHashMap<String, Object>();
Collection<String> availableAttrs = queue.getAvailableAttributes();
@@ -1500,7 +1436,6 @@ public class ServerSessionDelegate exten
public void closed(Session session)
{
setThreadSubject(session);
-
ServerSession serverSession = (ServerSession)session;
serverSession.stopSubscriptions();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org