You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/06/07 13:18:42 UTC
svn commit: r1132959 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/exchange/
test/java/org/apache/qpid/server/queue/
Author: robbie
Date: Tue Jun 7 11:18:41 2011
New Revision: 1132959
URL: http://svn.apache.org/viewvc?rev=1132959&view=rev
Log:
QPID-3219: update handling of QueueEntries to exclude use of entries in the intermediate 'dequeued' state, simplify logic in general.
Applied patch from Oleksandr Rudyy <or...@gmail.com>
Added:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Tue Jun 7 11:18:41 2011
@@ -60,7 +60,7 @@ public class AMQPriorityQueue extends Si
{
// check that all subscriptions are not in advance of the entry
SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && !entry.isAcquired())
+ while(subIter.advance() && entry.isAvailable())
{
final Subscription subscription = subIter.getNode().getSubscription();
if(!subscription.isClosed())
@@ -70,7 +70,7 @@ public class AMQPriorityQueue extends Si
{
QueueEntry subnode = context._lastSeenEntry;
QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Jun 7 11:18:41 2011
@@ -52,6 +52,17 @@ public interface QueueEntry extends Comp
}
public abstract State getState();
+
+ /**
+ * Returns true if state is either DEQUEUED or DELETED.
+ *
+ * @return true if state is either DEQUEUED or DELETED.
+ */
+ public boolean isDispensed()
+ {
+ State currentState = getState();
+ return currentState == State.DEQUEUED || currentState == State.DELETED;
+ }
}
@@ -207,4 +218,18 @@ public interface QueueEntry extends Comp
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
+
+ /**
+ * Returns true if entry is in DEQUEUED state, otherwise returns false.
+ *
+ * @return true if entry is in DEQUEUED state, otherwise returns false
+ */
+ boolean isDequeued();
+
+ /**
+ * Returns true if entry is either DEQUED or DELETED state.
+ *
+ * @return true if entry is either DEQUED or DELETED state
+ */
+ boolean isDispensed();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Jun 7 11:18:41 2011
@@ -499,7 +499,7 @@ public class QueueEntryImpl implements Q
{
QueueEntryImpl next = nextNode();
- while(next != null && next.isDeleted())
+ while(next != null && next.isDispensed() )
{
final QueueEntryImpl newNext = next.nextNode();
@@ -547,4 +547,16 @@ public class QueueEntryImpl implements Q
return _queueEntryList;
}
+ @Override
+ public boolean isDequeued()
+ {
+ return _state == DEQUEUED_STATE;
+ }
+
+ @Override
+ public boolean isDispensed()
+ {
+ return _state.isDispensed();
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jun 7 11:18:41 2011
@@ -629,7 +629,7 @@ public class SimpleAMQQueue implements A
// this catches the case where we *just* miss an update
int loops = 2;
- while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+ while (entry.isAvailable() && loops != 0)
{
if (nextNode == null)
{
@@ -648,7 +648,7 @@ public class SimpleAMQQueue implements A
}
- if (!(entry.isAcquired() || entry.isDeleted()))
+ if (entry.isAvailable())
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -942,7 +942,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDeleted())
+ if (node != null && !node.isDispensed())
{
entryList.add(node);
}
@@ -1046,7 +1046,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && filter.accept(node))
+ if (!node.isDispensed() && filter.accept(node))
{
entryList.add(node);
}
@@ -1240,7 +1240,6 @@ public class SimpleAMQQueue implements A
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
- && !node.isDeleted()
&& node.acquire())
{
dequeueEntry(node);
@@ -1270,7 +1269,7 @@ public class SimpleAMQQueue implements A
while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
+ if (node.acquire())
{
dequeueEntry(node);
noDeletes = false;
@@ -1300,7 +1299,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
+ if (node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
@@ -1654,7 +1653,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && !(node.isAcquired() || node.isDeleted()))
+ if (node != null && node.isAvailable())
{
if (sub.hasInterest(node))
{
@@ -1715,7 +1714,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
{
if (expired)
{
@@ -1884,8 +1883,8 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- // Only process nodes that are not currently deleted
- if (!node.isDeleted())
+ // Only process nodes that are not currently deleted and not dequeued
+ if (!node.isDispensed())
{
// If the node has exired then aquire it
if (node.expired() && node.acquire())
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue Jun 7 11:18:41 2011
@@ -1,6 +1,5 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -156,7 +155,7 @@ public class SimpleQueueEntryList implem
if(!atTail())
{
QueueEntryImpl nextNode = _lastNode.nextNode();
- while(nextNode.isDeleted() && nextNode.nextNode() != null)
+ while(nextNode.isDispensed() && nextNode.nextNode() != null)
{
nextNode = nextNode.nextNode();
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Jun 7 11:18:41 2011
@@ -482,6 +482,18 @@ public class AbstractHeadersExchangeTest
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
+
+ @Override
+ public boolean isDequeued()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDispensed()
+ {
+ return false;
+ }
};
if(action != null)
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Tue Jun 7 11:18:41 2011
@@ -231,4 +231,16 @@ public class MockQueueEntry implements Q
_message = msg;
}
+ @Override
+ public boolean isDequeued()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDispensed()
+ {
+ return false;
+ }
+
}
Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?rev=1132959&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Tue Jun 7 11:18:41 2011
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import java.lang.reflect.Field;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry.EntryState;
+import org.apache.qpid.server.subscription.MockSubscription;
+
+/**
+ * Tests for {@link QueueEntryImpl}
+ *
+ */
+public class QueueEntryImplTest extends TestCase
+{
+ // tested entry
+ private QueueEntryImpl _queueEntry;
+
+ public void setUp() throws Exception
+ {
+ AMQMessage message = new MockAMQMessage(1);
+ SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ _queueEntry = new QueueEntryImpl(queueEntryList, message, 1);
+ }
+
+ public void testAquire()
+ {
+ assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
+ _queueEntry.isAvailable());
+ acquire();
+ }
+
+ public void testDequeue()
+ {
+ dequeue();
+ }
+
+ public void testDelete()
+ {
+ delete();
+ }
+
+ /**
+ * Tests release method for entry in acquired state.
+ * <p>
+ * Entry in state ACQUIRED should be released and its status should be
+ * changed to AVAILABLE.
+ */
+ public void testReleaseAquired()
+ {
+ acquire();
+ _queueEntry.release();
+ assertTrue("Queue entry should be in AVAILABLE state after invoking of release method",
+ _queueEntry.isAvailable());
+ }
+
+ /**
+ * Tests release method for entry in dequeued state.
+ * <p>
+ * Invoking release on dequeued entry should not have any effect on its
+ * state.
+ */
+ public void testReleaseDequeued()
+ {
+ dequeue();
+ _queueEntry.release();
+ EntryState state = getState();
+ assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect",
+ QueueEntry.DEQUEUED_STATE, state);
+ }
+
+ /**
+ * Tests release method for entry in deleted state.
+ * <p>
+ * Invoking release on deleted entry should not have any effect on its
+ * state.
+ */
+ public void testReleaseDeleted()
+ {
+ delete();
+ _queueEntry.release();
+ assertTrue("Invoking of release on entry in DELETED state should not have any effect",
+ _queueEntry.isDeleted());
+ }
+
+ /**
+ * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method.
+ */
+ public void testGetNext()
+ {
+ int numberOfEntries = 5;
+ QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
+ SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+
+ // create test entries
+ for(int i = 0; i < numberOfEntries ; i++)
+ {
+ AMQMessage message = null;;
+ try
+ {
+ message = new MockAMQMessage(i);
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to create a mock message:" + e.getMessage());
+ }
+ QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message);
+ entries[i] = entry;
+ }
+
+ // test getNext for not acquired entries
+ for(int i = 0; i < numberOfEntries ; i++)
+ {
+ QueueEntryImpl queueEntry = entries[i];
+ QueueEntryImpl next = queueEntry.getNext();
+ if (i < numberOfEntries - 1)
+ {
+ assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
+ }
+ else
+ {
+ assertNull("The next entry after the last should be null", next);
+ }
+ }
+
+ // delete second
+ entries[1].acquire();
+ entries[1].delete();
+
+ // dequeue third
+ entries[2].acquire();
+ entries[2].dequeue();
+
+ QueueEntryImpl next = entries[0].getNext();
+ assertEquals("expected forth entry",entries[3], next);
+ next = next.getNext();
+ assertEquals("expected fifth entry", entries[4], next);
+ next = next.getNext();
+ assertNull("The next entry after the last should be null", next);
+ }
+ /**
+ * A helper method to put tested object into deleted state and assert the state
+ */
+ private void delete()
+ {
+ _queueEntry.delete();
+ assertTrue("Queue entry should be in DELETED state after invoking of delete method",
+ _queueEntry.isDeleted());
+ }
+
+ /**
+ * A helper method to put tested entry into dequeue state and assert the sate
+ */
+ private void dequeue()
+ {
+ acquire();
+ _queueEntry.dequeue();
+ EntryState state = getState();
+ assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method",
+ QueueEntry.DEQUEUED_STATE, state);
+ }
+
+ /**
+ * A helper method to put tested entry into acquired state and assert the sate
+ */
+ private void acquire()
+ {
+ _queueEntry.acquire(new MockSubscription());
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+ }
+
+ /**
+ * A helper method to get entry state
+ *
+ * @return entry state
+ */
+ private EntryState getState()
+ {
+ EntryState state = null;
+ try
+ {
+ Field f = QueueEntryImpl.class.getDeclaredField("_state");
+ f.setAccessible(true);
+ state = (EntryState) f.get(_queueEntry);
+ }
+ catch (Exception e)
+ {
+ fail("Failure to get a state field: " + e.getMessage());
+ }
+ return state;
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Jun 7 11:18:41 2011
@@ -36,13 +36,16 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
+import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -51,6 +54,8 @@ import org.apache.qpid.server.virtualhos
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
@@ -735,6 +740,533 @@ public class SimpleAMQQueueTest extends
verifyRecievedMessages(msgListSub3, sub3.getMessages());
}
+ /**
+ * Tests that dequeued message is not present in the list returned form
+ * {@link SimpleAMQQueue#getMessagesOnTheQueue()}
+ */
+ public void testGetMessagesOnTheQueueWithDequeuedEntry()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // send test messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ // get messages on the queue
+ List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+ // assert queue entries
+ assertEquals(messageNumber - 1, entries.size());
+ int expectedId = 0;
+ for (int i = 0; i < messageNumber - 1; i++)
+ {
+ Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId();
+ if (i == dequeueMessageIndex)
+ {
+ assertFalse("Message with id " + dequeueMessageIndex
+ + " was dequeued and should not be returned by method getMessagesOnTheQueue!",
+ new Long(expectedId).equals(id));
+ expectedId++;
+ }
+ assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+ new Long(expectedId), id);
+ expectedId++;
+ }
+ }
+
+ /**
+ * Tests that dequeued message is not present in the list returned form
+ * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)}
+ */
+ public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // send test messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ // get messages on the queue with filter accepting all available messages
+ List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ @Override
+ public boolean accept(QueueEntry entry)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+ // assert entries on the queue
+ assertEquals(messageNumber - 1, entries.size());
+ int expectedId = 0;
+ for (int i = 0; i < messageNumber - 1; i++)
+ {
+ Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId();
+ if (i == dequeueMessageIndex)
+ {
+ assertFalse("Message with id " + dequeueMessageIndex
+ + " was dequeued and should not be returned by method getMessagesOnTheQueue!",
+ new Long(expectedId).equals(id));
+ expectedId++;
+ }
+ assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+ new Long(expectedId), id);
+ expectedId++;
+ }
+ }
+
+ /**
+ * Tests that dequeued message is not copied as part of invocation of
+ * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)}
+ */
+ public void testCopyMessagesWithDequeuedEntry()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+ String anotherQueueName = "testQueue2";
+
+ // put test messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ // create another queue
+ SimpleAMQQueue queue = createQueue(anotherQueueName);
+
+ // create transaction
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+ // copy messages into another queue
+ _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
+
+ // commit transaction
+ txn.commit();
+
+ // get messages on another queue
+ List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+
+ // assert another queue entries
+ assertEquals(messageNumber - 1, entries.size());
+ int expectedId = 0;
+ for (int i = 0; i < messageNumber - 1; i++)
+ {
+ Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId();
+ if (i == dequeueMessageIndex)
+ {
+ assertFalse("Message with id " + dequeueMessageIndex
+ + " was dequeued and should not been copied into another queue!",
+ new Long(expectedId).equals(id));
+ expectedId++;
+ }
+ assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+ new Long(expectedId), id);
+ expectedId++;
+ }
+ }
+
+ /**
+ * Tests that dequeued message is not moved as part of invocation of
+ * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)}
+ */
+ public void testMovedMessagesWithDequeuedEntry()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+ String anotherQueueName = "testQueue2";
+
+ // put messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ // create another queue
+ SimpleAMQQueue queue = createQueue(anotherQueueName);
+
+ // create transaction
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+ // move messages into another queue
+ _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
+
+ // commit transaction
+ txn.commit();
+
+ // get messages on another queue
+ List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+
+ // assert another queue entries
+ assertEquals(messageNumber - 1, entries.size());
+ int expectedId = 0;
+ for (int i = 0; i < messageNumber - 1; i++)
+ {
+ Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId();
+ if (i == dequeueMessageIndex)
+ {
+ assertFalse("Message with id " + dequeueMessageIndex
+ + " was dequeued and should not been copied into another queue!",
+ new Long(expectedId).equals(id));
+ expectedId++;
+ }
+ assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+ new Long(expectedId), id);
+ expectedId++;
+ }
+ }
+
+ /**
+ * Tests that messages in given range including dequeued one are deleted
+ * from the queue on invocation of
+ * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)}
+ */
+ public void testRemoveMessagesFromQueueWithDequeuedEntry()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // put messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ // remove messages
+ _queue.removeMessagesFromQueue(0, messageNumber);
+
+ // get queue entries
+ List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+ // assert queue entries
+ assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
+ assertEquals("Queue should be empty", 0, entries.size());
+ }
+
+ /**
+ * Tests that dequeued message on the top is not accounted and next message
+ * is deleted from the queue on invocation of
+ * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)}
+ */
+ public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 0;
+
+ // put messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message on top
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ //delete message from top
+ _queue.deleteMessageFromTop();
+
+ //get queue netries
+ List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+ // assert queue entries
+ assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
+ assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(),
+ messageNumber - 2, entries.size());
+ assertEquals("Expected first entry with id 2", new Long(2),
+ ((AMQMessage) entries.get(0).getMessage()).getMessageId());
+ }
+
+ /**
+ * Tests that all messages including dequeued one are deleted from the queue
+ * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)}
+ */
+ public void testClearQueueWithDequeuedEntry()
+ {
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // put messages into a test queue
+ enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+ // dequeue message on a test queue
+ dequeueMessage(_queue, dequeueMessageIndex);
+
+ // clean queue
+ try
+ {
+ _queue.clearQueue();
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to clear queue:" + e.getMessage());
+ }
+
+ // get queue entries
+ List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+ // assert queue entries
+ assertNotNull(entries);
+ assertEquals(0, entries.size());
+ }
+
+ /**
+ * Tests whether dequeued entry is sent to subscriber in result of
+ * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)}
+ */
+ public void testProcessQueueWithDequeuedEntry()
+ {
+ // total number of messages to send
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // create queue with overridden method deliverAsync
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false,
+ new AMQShortString("testOwner"), false, false, _virtualHost, null)
+ {
+ @Override
+ public void deliverAsync(Subscription sub)
+ {
+ // do nothing
+ }
+ };
+
+ // put messages
+ List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(testQueue, dequeueMessageIndex);
+
+ // latch to wait for message receipt
+ final CountDownLatch latch = new CountDownLatch(messageNumber -1);
+
+ // create a subscription
+ MockSubscription subscription = new MockSubscription()
+ {
+ /**
+ * Send a message and decrement latch
+ */
+ public void send(QueueEntry msg) throws AMQException
+ {
+ super.send(msg);
+ latch.countDown();
+ }
+ };
+
+ try
+ {
+ // subscribe
+ testQueue.registerSubscription(subscription, false);
+
+ // process queue
+ testQueue.processQueue(new QueueRunner(testQueue, 1)
+ {
+ public void run()
+ {
+ // do nothing
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to process queue:" + e.getMessage());
+ }
+ // wait up to 1 minute for message receipt
+ try
+ {
+ latch.await(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e1)
+ {
+ Thread.currentThread().interrupt();
+ }
+ List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3));
+ verifyRecievedMessages(expected, subscription.getMessages());
+ }
+
+ /**
+ * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+ */
+ public void testEqueueDequeuedEntry()
+ {
+ // create a queue where each even entry is considered a dequeued
+ SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"),
+ false, false, _virtualHost, new QueueEntryListFactory()
+ {
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ /**
+ * Override SimpleQueueEntryList to create a dequeued
+ * entries for messages with even id
+ */
+ return new SimpleQueueEntryList(queue)
+ {
+ /**
+ * Entries with even message id are considered
+ * dequeued!
+ */
+ protected QueueEntryImpl createQueueEntry(final ServerMessage message)
+ {
+ return new QueueEntryImpl(this, message)
+ {
+ public boolean isDequeued()
+ {
+ return (((AMQMessage) message).getMessageId().longValue() % 2 == 0);
+ }
+
+ public boolean isDispensed()
+ {
+ return (((AMQMessage) message).getMessageId().longValue() % 2 == 0);
+ }
+
+ public boolean isAvailable()
+ {
+ return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0);
+ }
+ };
+ }
+ };
+ }
+ }, null);
+ // create a subscription
+ MockSubscription subscription = new MockSubscription();
+
+ // register subscription
+ try
+ {
+ queue.registerSubscription(subscription, false);
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to register subscription:" + e.getMessage());
+ }
+
+ // put test messages into a queue
+ putGivenNumberOfMessages(queue, 4);
+
+ // assert received messages
+ List<QueueEntry> messages = subscription.getMessages();
+ assertEquals("Only 2 messages should be returned", 2, messages.size());
+ assertEquals("ID of first message should be 1", new Long(1),
+ ((AMQMessage) messages.get(0).getMessage()).getMessageId());
+ assertEquals("ID of second message should be 3", new Long(3),
+ ((AMQMessage) messages.get(1).getMessage()).getMessageId());
+ }
+
+ /**
+ * A helper method to create a queue with given name
+ *
+ * @param name
+ * queue name
+ * @return queue
+ */
+ private SimpleAMQQueue createQueue(String name)
+ {
+ SimpleAMQQueue queue = null;
+ try
+ {
+ AMQShortString queueName = new AMQShortString(name);
+ AMQShortString ownerName = new AMQShortString(name + "Owner");
+ queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false,
+ _virtualHost, _arguments);
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to create a queue:" + e.getMessage());
+ }
+ assertNotNull("Queue was not created", queue);
+ return queue;
+ }
+
+ /**
+ * A helper method to put given number of messages into queue
+ * <p>
+ * All messages are asserted that they are present on queue
+ *
+ * @param queue
+ * queue to put messages into
+ * @param messageNumber
+ * number of messages to put into queue
+ */
+ private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+ {
+ putGivenNumberOfMessages(queue, messageNumber);
+
+ // make sure that all enqueued messages are on the queue
+ List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+ assertEquals(messageNumber, entries.size());
+ for (int i = 0; i < messageNumber; i++)
+ {
+ assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId());
+ }
+ return entries;
+ }
+
+ /**
+ * A helper method to put given number of messages into queue
+ * <p>
+ * Queue is not checked if messages are added into queue
+ *
+ * @param queue
+ * queue to put messages into
+ * @param messageNumber
+ * number of messages to put into queue
+ * @param queue
+ * @param messageNumber
+ */
+ private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+ {
+ for (int i = 0; i < messageNumber; i++)
+ {
+ // Create message
+ Long messageId = new Long(i);
+ AMQMessage message = null;
+ try
+ {
+ message = createMessage(messageId);
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to create a test message:" + e.getMessage());
+ }
+ // Put message on queue
+ try
+ {
+ queue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to put message on queue:" + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * A helper method to dequeue an entry on queue with given index
+ *
+ * @param queue
+ * queue to dequeue message on
+ * @param dequeueMessageIndex
+ * entry index to dequeue.
+ */
+ private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
+ {
+ List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+ QueueEntry entry = entries.get(dequeueMessageIndex);
+ entry.acquire();
+ entry.dequeue();
+ assertTrue(entry.isDequeued());
+ return entry;
+ }
+
private List<QueueEntry> createEntriesList(QueueEntry... entries)
{
ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java Tue Jun 7 11:18:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
import junit.framework.TestCase;
@@ -155,5 +156,55 @@ public class SimpleQueueEntryListTest ex
assertEquals("Count should have been equal",count,remainingMessages.size());
}
-
+
+ public void testDequedMessagedNotPresentInIterator()
+ {
+ int numberOfMessages = 10;
+ SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ QueueEntry[] entries = new QueueEntry[numberOfMessages];
+
+ for(int i = 0; i < numberOfMessages ; i++)
+ {
+ AMQMessage message = null;;
+ try
+ {
+ message = new MockAMQMessage(i);
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to create a mock message:" + e.getMessage());
+ }
+ QueueEntry entry = entryList.add(message);
+ assertNotNull("QE should not be null", entry);
+ entries[i]= entry;
+ }
+
+ // dequeue all even messages
+ for (QueueEntry queueEntry : entries)
+ {
+ long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue();
+ if (i%2 == 0)
+ {
+ queueEntry.acquire();
+ queueEntry.dequeue();
+ }
+ }
+
+ // iterate and check that dequeued messages are not returned by iterator
+ QueueEntryIterator it = entryList.iterator();
+ int counter = 0;
+ int i = 1;
+ while (it.advance())
+ {
+ QueueEntry entry = it.getNode();
+ Long id = ((AMQMessage)entry.getMessage()).getMessageId();
+ assertEquals("Expected message with id " + i + " but got message with id "
+ + id, new Long(i), id);
+ counter++;
+ i += 2;
+ }
+ int expectedNumber = numberOfMessages / 2;
+ assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter,
+ expectedNumber, counter);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org