You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/08 18:52:07 UTC
svn commit: r1566069 [4/4] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/m...
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java (from r1565124, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java&r1=1565124&r2=1566069&rev=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java Sat Feb 8 17:52:05 2014
@@ -45,7 +45,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
@@ -56,14 +55,13 @@ import org.apache.qpid.test.utils.QpidTe
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-public class SimpleAMQQueueTest extends QpidTestCase
+abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase
{
- private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTest.class);
+ private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class);
- private SimpleAMQQueue _queue;
+
+ private Q _queue;
private VirtualHost _virtualHost;
private String _qname = "qname";
private String _owner = "owner";
@@ -81,7 +79,7 @@ public class SimpleAMQQueueTest extends
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner,
+ _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner,
false, false, false, _arguments);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
@@ -107,7 +105,7 @@ public class SimpleAMQQueueTest extends
_queue.stop();
try
{
- _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
+ _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
false, _owner, false,
false, false, _arguments);
assertNull("Queue was created", _queue);
@@ -118,24 +116,14 @@ public class SimpleAMQQueueTest extends
e.getMessage().contains("name"));
}
- try
- {
- _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
- assertNull("Queue was created", _queue);
- }
- catch (IllegalArgumentException e)
- {
- assertTrue("Exception was not about missing vhost",
- e.getMessage().contains("Host"));
- }
-
- _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
+ _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
"differentName", false,
_owner, false,
false, false, _arguments);
assertNotNull("Queue was not created", _queue);
}
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
@@ -150,11 +138,11 @@ public class SimpleAMQQueueTest extends
assertTrue("Queue was not bound to key",
_exchange.isBound(_routingKey,_queue));
assertEquals("Exchange binding count", 1,
- _queue.getBindings().size());
+ _queue.getBindings().size());
assertEquals("Wrong exchange bound", _routingKey,
_queue.getBindings().get(0).getBindingKey());
assertEquals("Wrong exchange bound", _exchange,
- _queue.getBindings().get(0).getExchange());
+ _queue.getBindings().get(0).getExchange());
_exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP);
assertFalse("Routing key was still bound",
@@ -495,22 +483,6 @@ public class SimpleAMQQueueTest extends
assertNotNull(ex);
}
- public void testAutoDeleteQueue() throws Exception
- {
- _queue.stop();
- _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
- _queue.setDeleteOnNoConsumers(true);
-
- ServerMessage message = createMessage(new Long(25));
- _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
-
- _queue.enqueue(message, null);
- _consumer.close();
- assertTrue("Queue was not deleted when consumer was removed",
- _queue.isDeleted());
- }
public void testResend() throws Exception
{
@@ -520,12 +492,12 @@ public class SimpleAMQQueueTest extends
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
- _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>()
+ _queue.enqueue(message, new Action<MessageInstance<?,? extends Consumer>>()
{
@Override
- public void performAction(final MessageInstance<? extends Consumer> object)
+ public void performAction(final MessageInstance<?,? extends Consumer> object)
{
- QueueEntry entry = (QueueEntry) object;
+ QueueEntryImpl entry = (QueueEntryImpl) object;
entry.setRedelivered();
try
{
@@ -612,7 +584,7 @@ public class SimpleAMQQueueTest extends
// Get non-existent 0th QueueEntry & check returned list was empty
// (the position parameters in this method are indexed from 1)
- List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
+ List<E> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
assertTrue(entries.size() == 0);
// Check that when 'from' is 0 it is ignored and the range continues from 1
@@ -681,22 +653,12 @@ public class SimpleAMQQueueTest extends
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
- TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
- SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testQueue", false,"testOwner",
- false, false, _virtualHost, factory, null)
- {
- @Override
- public void deliverAsync(QueueConsumer sub)
- {
- // do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new consumers
- }
- };
+ SimpleAMQQueue testQueue = createNonAsyncDeliverQueue();
// retrieve the QueueEntryList the queue creates and insert the test
// messages, thus avoiding straight-through delivery attempts during
//enqueue() process.
- QueueEntryList list = factory.getQueueEntryList();
+ QueueEntryList list = testQueue.getEntries();
assertNotNull("QueueEntryList should have been created", list);
QueueEntry msg1 = list.add(createMessage(1L));
@@ -748,6 +710,12 @@ public class SimpleAMQQueueTest extends
verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
}
+ private SimpleAMQQueue createNonAsyncDeliverQueue()
+ {
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ return new NonAsyncDeliverQueue(factory, getVirtualHost());
+ }
+
/**
* Tests that dequeued message is not present in the list returned form
* {@link SimpleAMQQueue#getMessagesOnTheQueue()}
@@ -764,7 +732,7 @@ public class SimpleAMQQueueTest extends
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+ List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertEquals(messageNumber - 1, entries.size());
@@ -801,9 +769,9 @@ public class SimpleAMQQueueTest extends
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue with filter accepting all available messages
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
return true;
}
@@ -855,12 +823,12 @@ public class SimpleAMQQueueTest extends
_queue.deleteMessageFromTop();
//get queue entries
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+ List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(),
- messageNumber - 2, entries.size());
+ messageNumber - 2, entries.size());
assertEquals("Expected first entry with id 2", 2l,
(entries.get(0).getMessage()).getMessageNumber());
}
@@ -891,241 +859,13 @@ public class SimpleAMQQueueTest extends
}
// get queue entries
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+ List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull(entries);
assertEquals(0, entries.size());
}
- /**
- * Tests whether dequeued entry is sent to subscriber in result of
- * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)}
- */
- public void testProcessQueueWithDequeuedEntry()
- {
- // total number of messages to send
- int messageNumber = 4;
- int dequeueMessageIndex = 1;
-
- // create queue with overridden method deliverAsync
- SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "test",
- false, "testOwner", false, false, _virtualHost, null)
- {
- @Override
- public void deliverAsync(QueueConsumer sub)
- {
- // do nothing
- }
- };
-
- // put messages
- List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber);
-
- // dequeue message
- dequeueMessage(testQueue, dequeueMessageIndex);
-
- // latch to wait for message receipt
- final CountDownLatch latch = new CountDownLatch(messageNumber -1);
-
- // create a consumer
- MockConsumer consumer = new MockConsumer()
- {
- /**
- * Send a message and decrement latch
- * @param entry
- * @param batch
- */
- public void send(MessageInstance entry, boolean batch) throws AMQException
- {
- super.send(entry, batch);
- latch.countDown();
- }
- };
-
- try
- {
- // subscribe
- testQueue.addConsumer(consumer,
- null,
- entries.get(0).getMessage().getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
-
- // process queue
- testQueue.processQueue(new QueueRunner(testQueue)
- {
- public void run()
- {
- // do nothing
- }
- });
- }
- catch (AMQException e)
- {
- fail("Failure to process queue:" + e.getMessage());
- }
- // wait up to 1 minute for message receipt
- try
- {
- latch.await(1, TimeUnit.MINUTES);
- }
- catch (InterruptedException e1)
- {
- Thread.currentThread().interrupt();
- }
- List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
- verifyReceivedMessages(expected, consumer.getMessages());
- }
-
- /**
- * Tests that entry in dequeued state are not enqueued and not delivered to consumer
- */
- public void testEnqueueDequeuedEntry()
- {
- // create a queue where each even entry is considered a dequeued
- SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "test", false,
- "testOwner", false, false, _virtualHost, new QueueEntryListFactory()
- {
- public QueueEntryList createQueueEntryList(AMQQueue queue)
- {
- /**
- * Override SimpleQueueEntryList to create a dequeued
- * entries for messages with even id
- */
- return new SimpleQueueEntryList(queue)
- {
- /**
- * Entries with even message id are considered
- * dequeued!
- */
- protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message)
- {
- return new SimpleQueueEntryImpl(this, message)
- {
-
- public boolean isDeleted()
- {
- return (message.getMessageNumber() % 2 == 0);
- }
-
- public boolean isAvailable()
- {
- return !(message.getMessageNumber() % 2 == 0);
- }
-
- @Override
- public boolean acquire(QueueConsumer sub)
- {
- if(message.getMessageNumber() % 2 == 0)
- {
- return false;
- }
- else
- {
- return super.acquire(sub);
- }
- }
- };
- }
- };
- }
- }, null);
- // create a consumer
- MockConsumer consumer = new MockConsumer();
-
- // register consumer
- try
- {
- queue.addConsumer(consumer,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
- }
- catch (AMQException e)
- {
- fail("Failure to register consumer:" + e.getMessage());
- }
-
- // put test messages into a queue
- putGivenNumberOfMessages(queue, 4);
-
- // assert received messages
- List<MessageInstance> messages = consumer.getMessages();
- assertEquals("Only 2 messages should be returned", 2, messages.size());
- assertEquals("ID of first message should be 1", 1l,
- (messages.get(0).getMessage()).getMessageNumber());
- assertEquals("ID of second message should be 3", 3l,
- (messages.get(1).getMessage()).getMessageNumber());
- }
-
- public void testActiveConsumerCount() throws Exception
- {
- final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
- "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
-
- //verify adding an active consumer increases the count
- final MockConsumer consumer1 = new MockConsumer();
- consumer1.setActive(true);
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.addConsumer(consumer1,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify adding an inactive consumer doesn't increase the count
- final MockConsumer consumer2 = new MockConsumer();
- consumer2.setActive(false);
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.addConsumer(consumer2,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify behaviour in face of expected state changes:
-
- //verify a consumer going suspended->active increases the count
- consumer2.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
-
- //verify a consumer going active->suspended decreases the count
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a consumer going suspended->closed doesn't change the count
- consumer2.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a consumer going active->active doesn't change the count
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- //verify a consumer going suspended->suspended doesn't change the count
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a consumer going active->closed decreases the count
- consumer1.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- }
public void testNotificationFiredOnEnqueue() throws Exception
{
@@ -1170,12 +910,12 @@ public class SimpleAMQQueueTest extends
* @param messageNumber
* number of messages to put into queue
*/
- private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+ protected List<E> enqueueGivenNumberOfMessages(Q queue, int messageNumber)
{
putGivenNumberOfMessages(queue, messageNumber);
// make sure that all enqueued messages are on the queue
- List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+ List<E> entries = queue.getMessagesOnTheQueue();
assertEquals(messageNumber, entries.size());
for (int i = 0; i < messageNumber; i++)
{
@@ -1196,16 +936,15 @@ public class SimpleAMQQueueTest extends
* @param queue
* @param messageNumber
*/
- private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+ protected <T extends SimpleAMQQueue> void putGivenNumberOfMessages(T queue, int messageNumber)
{
for (int i = 0; i < messageNumber; i++)
{
// Create message
- Long messageId = new Long(i);
ServerMessage message = null;
try
{
- message = createMessage(messageId);
+ message = createMessage((long)i);
}
catch (AMQException e)
{
@@ -1239,7 +978,7 @@ public class SimpleAMQQueueTest extends
* @param dequeueMessageIndex
* entry index to dequeue.
*/
- private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
+ protected QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
{
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
QueueEntry entry = entries.get(dequeueMessageIndex);
@@ -1259,7 +998,7 @@ public class SimpleAMQQueueTest extends
return entriesList;
}
- private void verifyReceivedMessages(List<MessageInstance> expected,
+ protected void verifyReceivedMessages(List<MessageInstance> expected,
List<MessageInstance> delivered)
{
assertEquals("Consumer did not receive the expected number of messages",
@@ -1272,11 +1011,16 @@ public class SimpleAMQQueueTest extends
}
}
- public SimpleAMQQueue getQueue()
+ public Q getQueue()
{
return _queue;
}
+ protected void setQueue(Q queue)
+ {
+ _queue = queue;
+ }
+
public MockConsumer getConsumer()
{
return _consumerTarget;
@@ -1310,7 +1054,7 @@ public class SimpleAMQQueueTest extends
return message;
}
- private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>>
+ private static class EntryListAddingAction implements Action<MessageInstance<?,? extends Consumer>>
{
private final ArrayList<QueueEntry> _queueEntries;
@@ -1319,25 +1063,122 @@ public class SimpleAMQQueueTest extends
_queueEntries = queueEntries;
}
- public void performAction(MessageInstance<? extends Consumer> entry)
+ public void performAction(MessageInstance<?,? extends Consumer> entry)
{
_queueEntries.add((QueueEntry) entry);
}
}
- class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public String getQname()
+ {
+ return _qname;
+ }
+
+ public String getOwner()
+ {
+ return _owner;
+ }
+
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public DirectExchange getExchange()
+ {
+ return _exchange;
+ }
+
+ public MockConsumer getConsumerTarget()
+ {
+ return _consumerTarget;
+ }
+
+
+ static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+
+ @Override
+ public NonAsyncDeliverList createQueueEntryList(final NonAsyncDeliverQueue queue)
+ {
+ return new NonAsyncDeliverList(queue);
+ }
+ }
+
+ private static class NonAsyncDeliverEntry extends OrderedQueueEntry<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+
+ public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
+ final ServerMessage message,
+ final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message)
+ {
+ super(queueEntryList, message);
+ }
+ }
+
+ private static class NonAsyncDeliverList extends OrderedQueueEntryList<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
{
- QueueEntryList _list;
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ private static final HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> HEAD_CREATOR =
+ new HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>()
+ {
+
+ @Override
+ public NonAsyncDeliverEntry createHead(final NonAsyncDeliverList list)
+ {
+ return new NonAsyncDeliverEntry(list);
+ }
+ };
+
+ public NonAsyncDeliverList(final NonAsyncDeliverQueue queue)
+ {
+ super(queue, HEAD_CREATOR);
+ }
+
+ @Override
+ protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message)
+ {
+ return new NonAsyncDeliverEntry(this,message);
+ }
+ }
+
+
+ private static class NonAsyncDeliverQueue extends SimpleAMQQueue<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+ public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost)
{
- _list = new SimpleQueueEntryList(queue);
- return _list;
+ super(UUIDGenerator.generateRandomUUID(),
+ "testQueue",
+ false,
+ "testOwner",
+ false,
+ false,
+ vhost,
+ factory,
+ null);
}
- public QueueEntryList getQueueEntryList()
+ @Override
+ public void deliverAsync(QueueConsumer sub)
{
- return _list;
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new consumers
}
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Sat Feb 8 17:52:05 2014
@@ -21,8 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.UUID;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -30,7 +36,20 @@ import static org.mockito.Mockito.when;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
{
- private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ private OrderedQueueEntryList queueEntryList;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ mockLogging();
+
+ StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null);
+
+ queueEntryList = queue.getEntries();
+
+ super.setUp();
+ }
+
public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Sat Feb 8 17:52:05 2014
@@ -21,20 +21,26 @@ package org.apache.qpid.server.queue;
import java.util.Collections;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
+import java.util.UUID;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SortedQueueEntryListTest extends QueueEntryListTestBase
+public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>>
{
private static SelfValidatingSortedQueueEntryList _sqel;
+
public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144",
" 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81",
"195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45",
@@ -62,16 +68,30 @@ public class SortedQueueEntryListTest ex
private final static String keysSorted[] = keys.clone();
+ private SortedQueue _testQueue;
+
@Override
protected void setUp() throws Exception
{
+ mockLogging();
+
+ // Create test list
+ _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ {
+
+ @Override
+ public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+ {
+ return new SelfValidatingSortedQueueEntryList(queue, "KEY");
+ }
+ });
+ _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries();
+
super.setUp();
// Create result array
Arrays.sort(keysSorted);
- // Create test list
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
// Build test list
long messageId = 0L;
@@ -83,14 +103,22 @@ public class SortedQueueEntryListTest ex
}
+ protected void mockLogging()
+ {
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.setDefault(logActor);
+ }
+
+
@Override
- public QueueEntryList getTestList()
+ public SortedQueueEntryList getTestList()
{
return getTestList(false);
}
@Override
- public QueueEntryList getTestList(boolean newList)
+ public SortedQueueEntryList getTestList(boolean newList)
{
if(newList)
{
@@ -117,6 +145,12 @@ public class SortedQueueEntryListTest ex
return generateTestMessage(1, "test value");
}
+ @Override
+ protected SortedQueue getTestQueue()
+ {
+ return _testQueue;
+ }
+
private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException
{
final ServerMessage message = mock(ServerMessage.class);
@@ -138,7 +172,7 @@ public class SortedQueueEntryListTest ex
super.testIterator();
// Test sorted order of list
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count = 0;
while(iter.advance())
{
@@ -147,12 +181,12 @@ public class SortedQueueEntryListTest ex
}
}
- private Object getSortedKeyValue(QueueEntryIterator<?> iter)
+ private Object getSortedKeyValue(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
{
return (iter.getNode()).getMessage().getMessageHeader().getHeader("KEY");
}
- private Long getMessageId(QueueEntryIterator<?> iter)
+ private Long getMessageId(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
{
return (iter.getNode()).getMessage().getMessageNumber();
}
@@ -169,7 +203,7 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -190,12 +224,13 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter));
- assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); }
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));
+ }
}
public void testAscendingSortKeys() throws Exception
@@ -211,7 +246,7 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -234,7 +269,7 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -251,7 +286,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "B");
@@ -271,7 +306,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "A");
@@ -290,7 +325,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "C");
@@ -322,7 +357,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "D");
@@ -362,7 +397,7 @@ public class SortedQueueEntryListTest ex
validateEntry(entry, "D", 2);
}
- private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId)
+ private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)
{
assertEquals("Sorted queue entry value is not as expected",
expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (from r1565732, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java&r1=1565732&r2=1566069&rev=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Sat Feb 8 17:52:05 2014
@@ -20,21 +20,41 @@
package org.apache.qpid.server.queue;
import java.util.Collections;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SortedQueueEntryImplTest extends QueueEntryImplTestBase
+public class SortedQueueEntryTest extends QueueEntryImplTestBase
{
public final static String keys[] = { "CCC", "AAA", "BBB" };
- private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY");
+ private SelfValidatingSortedQueueEntryList _queueEntryList;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ mockLogging();
+ SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ {
+
+ @Override
+ public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+ {
+ return new SelfValidatingSortedQueueEntryList(queue, "KEY");
+ }
+ });
+ _queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries();
+ super.setUp();
+ }
public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
{
@@ -48,7 +68,7 @@ public class SortedQueueEntryImplTest ex
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
- return queueEntryList.add(message);
+ return _queueEntryList.add(message);
}
public void testCompareTo()
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (from r1565732, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java&r1=1565732&r2=1566069&rev=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Sat Feb 8 17:52:05 2014
@@ -23,16 +23,21 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collections;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SimpleQueueEntryListTest extends QueueEntryListTestBase
+public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList, QueueConsumer<?,StandardQueueEntry, StandardQueue, StandardQueueEntryList>>
{
- private SimpleQueueEntryList _sqel;
+
+ private StandardQueue _testQueue;
+ private StandardQueueEntryList _sqel;
private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
private String oldScavengeValue = null;
@@ -41,7 +46,10 @@ public class SimpleQueueEntryListTest ex
protected void setUp()
{
oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
- _sqel = new SimpleQueueEntryList(_testQueue);
+ _testQueue = new StandardQueue(UUID.randomUUID(),getName(),false,null,false,false,mock(VirtualHost.class),
+ Collections.<String,Object>emptyMap());
+
+ _sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
{
final ServerMessage message = mock(ServerMessage.class);
@@ -69,17 +77,21 @@ public class SimpleQueueEntryListTest ex
}
@Override
- public QueueEntryList getTestList()
+ public StandardQueueEntryList getTestList()
{
return getTestList(false);
}
@Override
- public QueueEntryList getTestList(boolean newList)
+ public StandardQueueEntryList getTestList(boolean newList)
{
if(newList)
{
- return new SimpleQueueEntryList(_testQueue);
+ StandardQueue queue =
+ new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
+ Collections.<String, Object>emptyMap());
+
+ return queue.getEntries();
}
else
{
@@ -107,9 +119,15 @@ public class SimpleQueueEntryListTest ex
return msg;
}
+ @Override
+ protected StandardQueue getTestQueue()
+ {
+ return _testQueue;
+ }
+
public void testScavenge() throws Exception
{
- SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(null);
ConcurrentHashMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
@@ -126,7 +144,7 @@ public class SimpleQueueEntryListTest ex
entriesMap.put(i,bleh);
}
- SimpleQueueEntryImpl head = sqel.getHead();
+ OrderedQueueEntry head = sqel.getHead();
//We shall now delete some specific messages mid-queue that will lead to
//requiring a scavenge once the requested threshold of 9 deletes is passed
@@ -172,10 +190,10 @@ public class SimpleQueueEntryListTest ex
return entry.isDeleted() && !wasDeleted;
}
- private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId)
+ private void verifyDeletedButPresentBeforeScavenge(OrderedQueueEntry head, long messageId)
{
//Use the head to get the initial entry in the queue
- SimpleQueueEntryImpl entry = head.getNextNode();
+ OrderedQueueEntry entry = head.getNextNode();
for(long i = 1; i < messageId ; i++)
{
@@ -186,10 +204,10 @@ public class SimpleQueueEntryListTest ex
assertTrue("Entry should have been deleted", entry.isDeleted());
}
- private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
+ private void verifyAllDeletedMessagedNotPresent(OrderedQueueEntry head, Map<Integer,QueueEntry> remainingMessages)
{
//Use the head to get the initial entry in the queue
- SimpleQueueEntryImpl entry = head.getNextNode();
+ OrderedQueueEntry entry = head.getNextNode();
assertNotNull("Initial entry should not have been null", entry);
@@ -211,8 +229,8 @@ public class SimpleQueueEntryListTest ex
public void testGettingNextElement()
{
final int numberOfEntries = 5;
- final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries];
- final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ final OrderedQueueEntry[] entries = new OrderedQueueEntry[numberOfEntries];
+ final OrderedQueueEntryList queueEntryList = getTestList(true);
// create test entries
for(int i = 0; i < numberOfEntries; i++)
@@ -228,7 +246,7 @@ public class SimpleQueueEntryListTest ex
// test getNext for not acquired entries
for(int i = 0; i < numberOfEntries; i++)
{
- final SimpleQueueEntryImpl next = entries[i].getNextValidEntry();
+ final OrderedQueueEntry next = entries[i].getNextValidEntry();
if(i < numberOfEntries - 1)
{
@@ -248,7 +266,7 @@ public class SimpleQueueEntryListTest ex
entries[2].acquire();
entries[2].delete();
- SimpleQueueEntryImpl next = entries[2].getNextValidEntry();
+ OrderedQueueEntry next = entries[2].getNextValidEntry();
assertEquals("expected forth entry", entries[3], next);
next = next.getNextValidEntry();
assertEquals("expected fifth entry", entries[4], next);
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1566069&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Sat Feb 8 17:52:05 2014
@@ -0,0 +1,363 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+{
+
+ public void testCreationFailsWithNoVhost()
+ {
+ try
+ {
+ setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, getOwner(), false,false, null, getArguments()));
+ assertNull("Queue was created", getQueue());
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
+ }
+ }
+
+
+ public void testAutoDeleteQueue() throws Exception
+ {
+ getQueue().stop();
+ setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, null, true, false, getVirtualHost(), Collections.<String,Object>emptyMap()));
+ getQueue().setDeleteOnNoConsumers(true);
+
+ ServerMessage message = createMessage(25l);
+ QueueConsumer consumer =
+ getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ getQueue().enqueue(message, null);
+ consumer.close();
+ assertTrue("Queue was not deleted when consumer was removed",
+ getQueue().isDeleted());
+ }
+
+ public void testActiveConsumerCount() throws Exception
+ {
+ final StandardQueue queue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
+ "testOwner", false, false, getVirtualHost(), null);
+
+ //verify adding an active consumer increases the count
+ final MockConsumer consumer1 = new MockConsumer();
+ consumer1.setActive(true);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+ queue.addConsumer(consumer1,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify adding an inactive consumer doesn't increase the count
+ final MockConsumer consumer2 = new MockConsumer();
+ consumer2.setActive(false);
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ queue.addConsumer(consumer2,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of expected state changes:
+
+ //verify a consumer going suspended->active increases the count
+ consumer2.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
+
+ //verify a consumer going active->suspended decreases the count
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a consumer going suspended->closed doesn't change the count
+ consumer2.setState(ConsumerTarget.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a consumer going active->active doesn't change the count
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ //verify a consumer going suspended->suspended doesn't change the count
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a consumer going active->closed decreases the count
+ consumer1.setState(ConsumerTarget.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ }
+
+
+ /**
+ * Tests that entry in dequeued state are not enqueued and not delivered to consumer
+ */
+ public void testEnqueueDequeuedEntry()
+ {
+ // create a queue where each even entry is considered a dequeued
+ SimpleAMQQueue queue = new DequeuedQueue(UUIDGenerator.generateRandomUUID(), "test", false,
+ "testOwner", false, false, getVirtualHost(), null);
+ // create a consumer
+ MockConsumer consumer = new MockConsumer();
+
+ // register consumer
+ try
+ {
+ queue.addConsumer(consumer,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to register consumer:" + e.getMessage());
+ }
+
+ // put test messages into a queue
+ putGivenNumberOfMessages(queue, 4);
+
+ // assert received messages
+ List<MessageInstance> messages = consumer.getMessages();
+ assertEquals("Only 2 messages should be returned", 2, messages.size());
+ assertEquals("ID of first message should be 1", 1l,
+ (messages.get(0).getMessage()).getMessageNumber());
+ assertEquals("ID of second message should be 3", 3l,
+ (messages.get(1).getMessage()).getMessageNumber());
+ }
+
+ /**
+ * Tests whether dequeued entry is sent to subscriber in result of
+ * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)}
+ */
+ public void testProcessQueueWithDequeuedEntry()
+ {
+ // total number of messages to send
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // create queue with overridden method deliverAsync
+ StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test",
+ false, "testOwner", false, false, getVirtualHost(), null)
+ {
+ @Override
+ public void deliverAsync(QueueConsumer sub)
+ {
+ // do nothing
+ }
+ };
+
+ // put messages
+ List<StandardQueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(testQueue, dequeueMessageIndex);
+
+ // latch to wait for message receipt
+ final CountDownLatch latch = new CountDownLatch(messageNumber -1);
+
+ // create a consumer
+ MockConsumer consumer = new MockConsumer()
+ {
+ /**
+ * Send a message and decrement latch
+ * @param entry
+ * @param batch
+ */
+ public void send(MessageInstance entry, boolean batch) throws AMQException
+ {
+ super.send(entry, batch);
+ latch.countDown();
+ }
+ };
+
+ try
+ {
+ // subscribe
+ testQueue.addConsumer(consumer,
+ null,
+ entries.get(0).getMessage().getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ // process queue
+ testQueue.processQueue(new QueueRunner(testQueue)
+ {
+ public void run()
+ {
+ // do nothing
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to process queue:" + e.getMessage());
+ }
+ // wait up to 1 minute for message receipt
+ try
+ {
+ latch.await(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e1)
+ {
+ Thread.currentThread().interrupt();
+ }
+ List<MessageInstance> expected = Arrays.asList((MessageInstance) entries.get(0), entries.get(2), entries.get(3));
+ verifyReceivedMessages(expected, consumer.getMessages());
+ }
+
+
+ private static class DequeuedQueue extends SimpleAMQQueue<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
+ {
+
+ public DequeuedQueue(final UUID id,
+ final String queueName,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ final boolean exclusive,
+ final VirtualHost virtualHost,
+ final Map<String, Object> arguments)
+ {
+ super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments);
+ }
+ }
+ private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
+ {
+ public DequeuedQueueEntryList createQueueEntryList(DequeuedQueue queue)
+ {
+ /**
+ * Override SimpleQueueEntryList to create a dequeued
+ * entries for messages with even id
+ */
+ return new DequeuedQueueEntryList(queue);
+ }
+
+
+ }
+
+ private static class DequeuedQueueEntryList extends OrderedQueueEntryList<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
+ {
+ private static final HeadCreator<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList> HEAD_CREATOR =
+ new HeadCreator<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList>()
+ {
+
+ @Override
+ public DequeuedQueueEntry createHead(final DequeuedQueueEntryList list)
+ {
+ return new DequeuedQueueEntry(list);
+ }
+ };
+
+ public DequeuedQueueEntryList(final DequeuedQueue queue)
+ {
+ super(queue, HEAD_CREATOR);
+ }
+
+ /**
+ * Entries with even message id are considered
+ * dequeued!
+ */
+ protected DequeuedQueueEntry createQueueEntry(final ServerMessage message)
+ {
+ return new DequeuedQueueEntry(this, message);
+ }
+
+
+ }
+
+ private static class DequeuedQueueEntry extends OrderedQueueEntry<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList>
+ {
+
+ private final ServerMessage _message;
+
+ private DequeuedQueueEntry(final DequeuedQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ _message = null;
+ }
+
+ public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message)
+ {
+ super(list, message);
+ _message = message;
+ }
+
+ public boolean isDeleted()
+ {
+ return (_message.getMessageNumber() % 2 == 0);
+ }
+
+ public boolean isAvailable()
+ {
+ return !(_message.getMessageNumber() % 2 == 0);
+ }
+
+ @Override
+ public boolean acquire(QueueConsumer sub)
+ {
+ if(_message.getMessageNumber() % 2 == 0)
+ {
+ return false;
+ }
+ else
+ {
+ return super.acquire(sub);
+ }
+ }
+ }
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Sat Feb 8 17:52:05 2014
@@ -24,9 +24,9 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -385,7 +385,7 @@ public class AutoCommitTransactionTest e
final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
- queueEntries.add(new MockQueueEntry()
+ queueEntries.add(new MockMessageInstance()
{
@Override
@@ -395,7 +395,7 @@ public class AutoCommitTransactionTest e
}
@Override
- public AMQQueue getQueue()
+ public TransactionLogResource getOwningResource()
{
return queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Sat Feb 8 17:52:05 2014
@@ -24,9 +24,9 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -609,7 +609,7 @@ public class LocalTransactionTest extend
final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
- queueEntries.add(new MockQueueEntry()
+ queueEntries.add(new MockMessageInstance()
{
@Override
@@ -619,7 +619,7 @@ public class LocalTransactionTest extend
}
@Override
- public AMQQueue getQueue()
+ public TransactionLogResource getOwningResource()
{
return queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Sat Feb 8 17:52:05 2014
@@ -44,8 +44,8 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -179,9 +179,9 @@ public class BrokerTestHelper
return factory.createExchange("amp.direct", "direct", false, false);
}
- public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
+ public static AMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
{
- SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
+ AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
false, false, false, Collections.<String, Object>emptyMap());
return queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sat Feb 8 17:52:05 2014
@@ -928,10 +928,10 @@ public class ServerSession extends Sessi
return getId().compareTo(o.getId());
}
- private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void performAction(final MessageInstance<C> entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Feb 8 17:52:05 2014
@@ -1189,14 +1189,14 @@ public class AMQChannel implements AMQSe
}
- private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
public ImmediateAction()
{
}
- public void performAction(MessageInstance<C> entry)
+ public void performAction(MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
@@ -1261,10 +1261,10 @@ public class AMQChannel implements AMQSe
}
}
- private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void performAction(final MessageInstance<C> entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Sat Feb 8 17:52:05 2014
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -36,7 +36,7 @@ import java.util.List;
public class AcknowledgeTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -79,7 +79,7 @@ public class AcknowledgeTest extends Qpi
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Sat Feb 8 17:52:05 2014
@@ -26,10 +26,8 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -39,7 +37,7 @@ import java.util.List;
public class QueueBrowserUsesNoAckTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest e
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java Sat Feb 8 17:52:05 2014
@@ -19,9 +19,6 @@ package org.apache.qpid.server.managemen
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java Sat Feb 8 17:52:05 2014
@@ -64,7 +64,7 @@ public class FieldTableKeyEnumeratorTest
}
- public void testPropertEnu()
+ public void testPropertyEnum()
{
try
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Sat Feb 8 17:52:05 2014
@@ -47,11 +47,10 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.ConflationQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.StandardQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -573,9 +572,9 @@ public class MessageStoreTest extends Qp
if (usePriority)
{
- assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass());
assertEquals("Priority Queue does not have set priorities",
- DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
+ DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities());
}
else if (lastValueQueue)
{
@@ -584,7 +583,7 @@ public class MessageStoreTest extends Qp
}
else
{
- assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass());
+ assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass());
}
assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
Modified: qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java Sat Feb 8 17:52:05 2014
@@ -26,11 +26,9 @@ import org.apache.qpid.configuration.Cli
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.NotificationCheckTest;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueueTest;
+import org.apache.qpid.server.queue.StandardQueue;
import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -661,7 +659,7 @@ public class QueueManagementTest extends
final Object messageGroupKey = "test";
final Map<String, Object> arguments = new HashMap<String, Object>(2);
arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey);
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueue.SHARED_MSG_GROUP_ARG_VALUE);
managedBroker.createNewQueue(queueName, null, true, arguments);
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org