You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/06/07 13:18:42 UTC

svn commit: r1132959 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/exchange/ test/java/org/apache/qpid/server/queue/

Author: robbie
Date: Tue Jun  7 11:18:41 2011
New Revision: 1132959

URL: http://svn.apache.org/viewvc?rev=1132959&view=rev
Log:
QPID-3219: update handling of QueueEntries to exclude use of entries in the intermediate 'dequeued' state, simplify logic in general.

Applied patch from Oleksandr Rudyy <or...@gmail.com>

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Tue Jun  7 11:18:41 2011
@@ -60,7 +60,7 @@ public class AMQPriorityQueue extends Si
     {
         // check that all subscriptions are not in advance of the entry
         SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
-        while(subIter.advance() && !entry.isAcquired())
+        while(subIter.advance() && entry.isAvailable())
         {
             final Subscription subscription = subIter.getNode().getSubscription();
             if(!subscription.isClosed())
@@ -70,7 +70,7 @@ public class AMQPriorityQueue extends Si
                 {
                     QueueEntry subnode = context._lastSeenEntry;
                     QueueEntry released = context._releasedEntry;
-                    while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+                    while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
                         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Jun  7 11:18:41 2011
@@ -52,6 +52,17 @@ public interface QueueEntry extends Comp
         }
 
         public abstract State getState();
+
+        /**
+         * Returns true if state is either DEQUEUED or DELETED.
+         *
+         * @return true if state is either DEQUEUED or DELETED.
+         */
+        public boolean isDispensed()
+        {
+            State currentState = getState();
+            return currentState == State.DEQUEUED || currentState == State.DELETED;
+        }
     }
 
 
@@ -207,4 +218,18 @@ public interface QueueEntry extends Comp
 
     void addStateChangeListener(StateChangeListener listener);
     boolean removeStateChangeListener(StateChangeListener listener);
+
+    /**
+     * Returns true if entry is in DEQUEUED state, otherwise returns false.
+     *
+     * @return true if entry is in DEQUEUED state, otherwise returns false
+     */
+    boolean isDequeued();
+
+    /**
+     * Returns true if entry is either DEQUED or DELETED state.
+     *
+     * @return true if entry is either DEQUED or DELETED state
+     */
+    boolean isDispensed();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Jun  7 11:18:41 2011
@@ -499,7 +499,7 @@ public class QueueEntryImpl implements Q
     {
 
         QueueEntryImpl next = nextNode();
-        while(next != null && next.isDeleted())
+        while(next != null && next.isDispensed() )
         {
 
             final QueueEntryImpl newNext = next.nextNode();
@@ -547,4 +547,16 @@ public class QueueEntryImpl implements Q
         return _queueEntryList;
     }
 
+    @Override
+    public boolean isDequeued()
+    {
+        return _state == DEQUEUED_STATE;
+    }
+
+    @Override
+    public boolean isDispensed()
+    {
+        return _state.isDispensed();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jun  7 11:18:41 2011
@@ -629,7 +629,7 @@ public class SimpleAMQQueue implements A
             // this catches the case where we *just* miss an update
             int loops = 2;
 
-            while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+            while (entry.isAvailable() && loops != 0)
             {
                 if (nextNode == null)
                 {
@@ -648,7 +648,7 @@ public class SimpleAMQQueue implements A
         }
 
 
-        if (!(entry.isAcquired() || entry.isDeleted()))
+        if (entry.isAvailable())
         {
             checkSubscriptionsNotAheadOfDelivery(entry);
 
@@ -942,7 +942,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node != null && !node.isDeleted())
+            if (node != null && !node.isDispensed())
             {
                 entryList.add(node);
             }
@@ -1046,7 +1046,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && filter.accept(node))
+            if (!node.isDispensed() && filter.accept(node))
             {
                 entryList.add(node);
             }
@@ -1240,7 +1240,6 @@ public class SimpleAMQQueue implements A
 
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId)
-                    && !node.isDeleted()
                     && node.acquire())
                 {
                     dequeueEntry(node);
@@ -1270,7 +1269,7 @@ public class SimpleAMQQueue implements A
         while (noDeletes && queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && node.acquire())
+            if (node.acquire())
             {
                 dequeueEntry(node);
                 noDeletes = false;
@@ -1300,7 +1299,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && node.acquire())
+            if (node.acquire())
             {
                 dequeueEntry(node, txn);
                 if(++count == request)
@@ -1654,7 +1653,7 @@ public class SimpleAMQQueue implements A
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (node != null && !(node.isAcquired() || node.isDeleted()))
+            if (node != null && node.isAvailable())
             {
                 if (sub.hasInterest(node))
                 {
@@ -1715,7 +1714,7 @@ public class SimpleAMQQueue implements A
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
-            while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
+            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
             {
                 if (expired)
                 {
@@ -1884,8 +1883,8 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            // Only process nodes that are not currently deleted
-            if (!node.isDeleted())
+            // Only process nodes that are not currently deleted and not dequeued
+            if (!node.isDispensed())
             {
                 // If the node has exired then aquire it
                 if (node.expired() && node.acquire())

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue Jun  7 11:18:41 2011
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -156,7 +155,7 @@ public class SimpleQueueEntryList implem
             if(!atTail())
             {
                 QueueEntryImpl nextNode = _lastNode.nextNode();
-                while(nextNode.isDeleted() && nextNode.nextNode() != null)
+                while(nextNode.isDispensed() && nextNode.nextNode() != null)
                 {
                     nextNode = nextNode.nextNode();
                 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Jun  7 11:18:41 2011
@@ -482,6 +482,18 @@ public class AbstractHeadersExchangeTest
                 {
                     return 0;  //To change body of implemented methods use File | Settings | File Templates.
                 }
+
+                @Override
+                public boolean isDequeued()
+                {
+                    return false;
+                }
+
+                @Override
+                public boolean isDispensed()
+                {
+                    return false;
+                }
             };
 
             if(action != null)

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Tue Jun  7 11:18:41 2011
@@ -231,4 +231,16 @@ public class MockQueueEntry implements Q
         _message = msg;
     }
 
+    @Override
+    public boolean isDequeued()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isDispensed()
+    {
+        return false;
+    }
+
 }

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?rev=1132959&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Tue Jun  7 11:18:41 2011
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import java.lang.reflect.Field;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry.EntryState;
+import org.apache.qpid.server.subscription.MockSubscription;
+
+/**
+ * Tests for {@link QueueEntryImpl}
+ *
+ */
+public class QueueEntryImplTest extends TestCase
+{
+    // tested entry
+    private QueueEntryImpl _queueEntry;
+
+    public void setUp() throws Exception
+    {
+        AMQMessage message = new MockAMQMessage(1);
+        SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+        _queueEntry = new QueueEntryImpl(queueEntryList, message, 1);
+    }
+
+    public void testAquire()
+    {
+        assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
+                _queueEntry.isAvailable());
+        acquire();
+    }
+
+    public void testDequeue()
+    {
+        dequeue();
+    }
+
+    public void testDelete()
+    {
+        delete();
+    }
+
+    /**
+     * Tests release method for entry in acquired state.
+     * <p>
+     * Entry in state ACQUIRED should be released and its status should be
+     * changed to AVAILABLE.
+     */
+    public void testReleaseAquired()
+    {
+        acquire();
+        _queueEntry.release();
+        assertTrue("Queue entry should be in AVAILABLE state after invoking of release method",
+                _queueEntry.isAvailable());
+    }
+
+    /**
+     * Tests release method for entry in dequeued state.
+     * <p>
+     * Invoking release on dequeued entry should not have any effect on its
+     * state.
+     */
+    public void testReleaseDequeued()
+    {
+        dequeue();
+        _queueEntry.release();
+        EntryState state = getState();
+        assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect",
+                QueueEntry.DEQUEUED_STATE, state);
+    }
+
+    /**
+     * Tests release method for entry in deleted state.
+     * <p>
+     * Invoking release on deleted entry should not have any effect on its
+     * state.
+     */
+    public void testReleaseDeleted()
+    {
+        delete();
+        _queueEntry.release();
+        assertTrue("Invoking of release on entry in DELETED state should not have any effect",
+                _queueEntry.isDeleted());
+    }
+
+    /**
+     * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method.
+     */
+    public void testGetNext()
+    {
+        int numberOfEntries = 5;
+        QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
+        SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+
+        // create test entries
+        for(int i = 0; i < numberOfEntries ; i++)
+        {
+            AMQMessage message = null;;
+            try
+            {
+                message = new MockAMQMessage(i);
+            }
+            catch (AMQException e)
+            {
+                fail("Failure to create a mock message:" + e.getMessage());
+            }
+            QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message);
+            entries[i] = entry;
+        }
+
+        // test getNext for not acquired entries
+        for(int i = 0; i < numberOfEntries ; i++)
+        {
+            QueueEntryImpl queueEntry = entries[i];
+            QueueEntryImpl next = queueEntry.getNext();
+            if (i < numberOfEntries - 1)
+            {
+                assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
+            }
+            else
+            {
+                assertNull("The next entry after the last should be null", next);
+            }
+        }
+
+        // delete second
+        entries[1].acquire();
+        entries[1].delete();
+
+        // dequeue third
+        entries[2].acquire();
+        entries[2].dequeue();
+
+        QueueEntryImpl next = entries[0].getNext();
+        assertEquals("expected forth entry",entries[3], next);
+        next = next.getNext();
+        assertEquals("expected fifth entry", entries[4], next);
+        next = next.getNext();
+        assertNull("The next entry after the last should be null", next);
+    }
+    /**
+     * A helper method to put tested object into deleted state and assert the state
+     */
+    private void delete()
+    {
+        _queueEntry.delete();
+        assertTrue("Queue entry should be in DELETED state after invoking of delete method",
+                _queueEntry.isDeleted());
+    }
+
+    /**
+     * A helper method to put tested entry into dequeue state and assert the sate
+     */
+    private void dequeue()
+    {
+        acquire();
+        _queueEntry.dequeue();
+        EntryState state = getState();
+        assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method",
+                QueueEntry.DEQUEUED_STATE, state);
+    }
+
+    /**
+     * A helper method to put tested entry into acquired state and assert the sate
+     */
+    private void acquire()
+    {
+        _queueEntry.acquire(new MockSubscription());
+        assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+                _queueEntry.isAcquired());
+    }
+
+    /**
+     * A helper method to get entry state
+     *
+     * @return entry state
+     */
+    private EntryState getState()
+    {
+        EntryState state = null;
+        try
+        {
+            Field f = QueueEntryImpl.class.getDeclaredField("_state");
+            f.setAccessible(true);
+            state = (EntryState) f.get(_queueEntry);
+        }
+        catch (Exception e)
+        {
+            fail("Failure to get a state field: " + e.getMessage());
+        }
+        return state;
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Jun  7 11:18:41 2011
@@ -36,13 +36,16 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
+import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.InternalBrokerBaseCase;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -51,6 +54,8 @@ import org.apache.qpid.server.virtualhos
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class SimpleAMQQueueTest extends InternalBrokerBaseCase
 {
@@ -735,6 +740,533 @@ public class SimpleAMQQueueTest extends 
         verifyRecievedMessages(msgListSub3, sub3.getMessages());
     }
 
+    /**
+     * Tests that dequeued message is not present in the list returned form
+     * {@link SimpleAMQQueue#getMessagesOnTheQueue()}
+     */
+    public void testGetMessagesOnTheQueueWithDequeuedEntry()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+
+        // send test messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        // get messages on the queue
+        List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+        // assert queue entries
+        assertEquals(messageNumber - 1, entries.size());
+        int expectedId = 0;
+        for (int i = 0; i < messageNumber - 1; i++)
+        {
+            Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId();
+            if (i == dequeueMessageIndex)
+            {
+                assertFalse("Message with id " + dequeueMessageIndex
+                        + " was dequeued and should not be returned by method getMessagesOnTheQueue!",
+                        new Long(expectedId).equals(id));
+                expectedId++;
+            }
+            assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+                    new Long(expectedId), id);
+            expectedId++;
+        }
+    }
+
+    /**
+     * Tests that dequeued message is not present in the list returned form
+     * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)}
+     */
+    public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+
+        // send test messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        // get messages on the queue with filter accepting all available messages
+        List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter()
+        {
+
+            @Override
+            public boolean accept(QueueEntry entry)
+            {
+                return true;
+            }
+
+            @Override
+            public boolean filterComplete()
+            {
+                return false;
+            }
+        });
+
+        // assert entries on the queue
+        assertEquals(messageNumber - 1, entries.size());
+        int expectedId = 0;
+        for (int i = 0; i < messageNumber - 1; i++)
+        {
+            Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId();
+            if (i == dequeueMessageIndex)
+            {
+                assertFalse("Message with id " + dequeueMessageIndex
+                        + " was dequeued and should not be returned by method getMessagesOnTheQueue!",
+                        new Long(expectedId).equals(id));
+                expectedId++;
+            }
+            assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+                    new Long(expectedId), id);
+            expectedId++;
+        }
+    }
+
+    /**
+     * Tests that dequeued message is not copied as part of invocation of
+     * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)}
+     */
+    public void testCopyMessagesWithDequeuedEntry()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+        String anotherQueueName = "testQueue2";
+
+        // put test messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        // create another queue
+        SimpleAMQQueue queue = createQueue(anotherQueueName);
+
+        // create transaction
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+        // copy messages into another queue
+        _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
+
+        // commit transaction
+        txn.commit();
+
+        // get messages on another queue
+        List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+
+        // assert another queue entries
+        assertEquals(messageNumber - 1, entries.size());
+        int expectedId = 0;
+        for (int i = 0; i < messageNumber - 1; i++)
+        {
+            Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId();
+            if (i == dequeueMessageIndex)
+            {
+                assertFalse("Message with id " + dequeueMessageIndex
+                        + " was dequeued and should not been copied into another queue!",
+                        new Long(expectedId).equals(id));
+                expectedId++;
+            }
+            assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+                    new Long(expectedId), id);
+            expectedId++;
+        }
+    }
+
+    /**
+     * Tests that dequeued message is not moved as part of invocation of
+     * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)}
+     */
+    public void testMovedMessagesWithDequeuedEntry()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+        String anotherQueueName = "testQueue2";
+
+        // put messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        // create another queue
+        SimpleAMQQueue queue = createQueue(anotherQueueName);
+
+        // create transaction
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+        // move messages into another queue
+        _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
+
+        // commit transaction
+        txn.commit();
+
+        // get messages on another queue
+        List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+
+        // assert another queue entries
+        assertEquals(messageNumber - 1, entries.size());
+        int expectedId = 0;
+        for (int i = 0; i < messageNumber - 1; i++)
+        {
+            Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId();
+            if (i == dequeueMessageIndex)
+            {
+                assertFalse("Message with id " + dequeueMessageIndex
+                        + " was dequeued and should not been copied into another queue!",
+                        new Long(expectedId).equals(id));
+                expectedId++;
+            }
+            assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
+                    new Long(expectedId), id);
+            expectedId++;
+        }
+    }
+
+    /**
+     * Tests that messages in given range including dequeued one are deleted
+     * from the queue on invocation of
+     * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)}
+     */
+    public void testRemoveMessagesFromQueueWithDequeuedEntry()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+
+        // put messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        // remove messages
+        _queue.removeMessagesFromQueue(0, messageNumber);
+
+        // get queue entries
+        List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+        // assert queue entries
+        assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
+        assertEquals("Queue should be empty", 0, entries.size());
+    }
+
+    /**
+     * Tests that dequeued message on the top is not accounted and next message
+     * is deleted from the queue on invocation of
+     * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)}
+     */
+    public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 0;
+
+        // put messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message on top
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        //delete message from top
+        _queue.deleteMessageFromTop();
+
+        //get queue netries
+        List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+        // assert queue entries
+        assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
+        assertEquals("Expected " + (messageNumber - 2) + " number of messages  but recieved " + entries.size(),
+                messageNumber - 2, entries.size());
+        assertEquals("Expected first entry with id 2", new Long(2),
+                ((AMQMessage) entries.get(0).getMessage()).getMessageId());
+    }
+
+    /**
+     * Tests that all messages including dequeued one are deleted from the queue
+     * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)}
+     */
+    public void testClearQueueWithDequeuedEntry()
+    {
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+
+        // put messages into a test queue
+        enqueueGivenNumberOfMessages(_queue, messageNumber);
+
+        // dequeue message on a test queue
+        dequeueMessage(_queue, dequeueMessageIndex);
+
+        // clean queue
+        try
+        {
+            _queue.clearQueue();
+        }
+        catch (AMQException e)
+        {
+            fail("Failure to clear queue:" + e.getMessage());
+        }
+
+        // get queue entries
+        List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+
+        // assert queue entries
+        assertNotNull(entries);
+        assertEquals(0, entries.size());
+    }
+
+    /**
+     * Tests whether dequeued entry is sent to subscriber in result of
+     * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)}
+     */
+    public void testProcessQueueWithDequeuedEntry()
+    {
+        // total number of messages to send
+        int messageNumber = 4;
+        int dequeueMessageIndex = 1;
+
+        // create queue with overridden method deliverAsync
+        SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false,
+                new AMQShortString("testOwner"), false, false, _virtualHost, null)
+        {
+            @Override
+            public void deliverAsync(Subscription sub)
+            {
+                // do nothing
+            }
+        };
+
+        // put messages
+        List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber);
+
+        // dequeue message
+        dequeueMessage(testQueue, dequeueMessageIndex);
+
+        // latch to wait for message receipt
+        final CountDownLatch latch = new CountDownLatch(messageNumber -1);
+
+        // create a subscription
+        MockSubscription subscription = new MockSubscription()
+        {
+            /**
+             * Send a message and decrement latch
+             */
+            public void send(QueueEntry msg) throws AMQException
+            {
+                super.send(msg);
+                latch.countDown();
+            }
+        };
+
+        try
+        {
+            // subscribe
+            testQueue.registerSubscription(subscription, false);
+
+            // process queue
+            testQueue.processQueue(new QueueRunner(testQueue, 1)
+            {
+                public void run()
+                {
+                    // do nothing
+                }
+            });
+        }
+        catch (AMQException e)
+        {
+            fail("Failure to process queue:" + e.getMessage());
+        }
+        // wait up to 1 minute for message receipt
+        try
+        {
+            latch.await(1, TimeUnit.MINUTES);
+        }
+        catch (InterruptedException e1)
+        {
+            Thread.currentThread().interrupt();
+        }
+        List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3));
+        verifyRecievedMessages(expected, subscription.getMessages());
+    }
+
+    /**
+     * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+     */
+    public void testEqueueDequeuedEntry()
+    {
+        // create a queue where each even entry is considered a dequeued
+        SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"),
+                false, false, _virtualHost, new QueueEntryListFactory()
+                {
+                    public QueueEntryList createQueueEntryList(AMQQueue queue)
+                    {
+                        /**
+                         * Override SimpleQueueEntryList to create a dequeued
+                         * entries for messages with even id
+                         */
+                        return new SimpleQueueEntryList(queue)
+                        {
+                            /**
+                             * Entries with even message id are considered
+                             * dequeued!
+                             */
+                            protected QueueEntryImpl createQueueEntry(final ServerMessage message)
+                            {
+                                return new QueueEntryImpl(this, message)
+                                {
+                                    public boolean isDequeued()
+                                    {
+                                        return (((AMQMessage) message).getMessageId().longValue() % 2 == 0);
+                                    }
+
+                                    public boolean isDispensed()
+                                    {
+                                        return (((AMQMessage) message).getMessageId().longValue() % 2 == 0);
+                                    }
+
+                                    public boolean isAvailable()
+                                    {
+                                        return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0);
+                                    }
+                                };
+                            }
+                        };
+                    }
+                }, null);
+        // create a subscription
+        MockSubscription subscription = new MockSubscription();
+
+        // register subscription
+        try
+        {
+            queue.registerSubscription(subscription, false);
+        }
+        catch (AMQException e)
+        {
+            fail("Failure to register subscription:" + e.getMessage());
+        }
+
+        // put test messages into a queue
+        putGivenNumberOfMessages(queue, 4);
+
+        // assert received messages
+        List<QueueEntry> messages = subscription.getMessages();
+        assertEquals("Only 2 messages should be returned", 2, messages.size());
+        assertEquals("ID of first message should be 1", new Long(1),
+                ((AMQMessage) messages.get(0).getMessage()).getMessageId());
+        assertEquals("ID of second message should be 3", new Long(3),
+                ((AMQMessage) messages.get(1).getMessage()).getMessageId());
+    }
+
+    /**
+     * A helper method to create a queue with given name
+     *
+     * @param name
+     *            queue name
+     * @return queue
+     */
+    private SimpleAMQQueue createQueue(String name)
+    {
+        SimpleAMQQueue queue = null;
+        try
+        {
+            AMQShortString queueName = new AMQShortString(name);
+            AMQShortString ownerName = new AMQShortString(name + "Owner");
+            queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false,
+                    _virtualHost, _arguments);
+        }
+        catch (AMQException e)
+        {
+            fail("Failure to create a queue:" + e.getMessage());
+        }
+        assertNotNull("Queue was not created", queue);
+        return queue;
+    }
+
+    /**
+     * A helper method to put given number of messages into queue
+     * <p>
+     * All messages are asserted that they are present on queue
+     *
+     * @param queue
+     *            queue to put messages into
+     * @param messageNumber
+     *            number of messages to put into queue
+     */
+    private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+    {
+        putGivenNumberOfMessages(queue, messageNumber);
+
+        // make sure that all enqueued messages are on the queue
+        List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+        assertEquals(messageNumber, entries.size());
+        for (int i = 0; i < messageNumber; i++)
+        {
+            assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId());
+        }
+        return entries;
+    }
+
+    /**
+     * A helper method to put given number of messages into queue
+     * <p>
+     * Queue is not checked if messages are added into queue
+     *
+     * @param queue
+     *            queue to put messages into
+     * @param messageNumber
+     *            number of messages to put into queue
+     * @param queue
+     * @param messageNumber
+     */
+    private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+    {
+        for (int i = 0; i < messageNumber; i++)
+        {
+            // Create message
+            Long messageId = new Long(i);
+            AMQMessage message = null;
+            try
+            {
+                message = createMessage(messageId);
+            }
+            catch (AMQException e)
+            {
+                fail("Failure to create a test message:" + e.getMessage());
+            }
+            // Put message on queue
+            try
+            {
+                queue.enqueue(message);
+            }
+            catch (AMQException e)
+            {
+                fail("Failure to put message on queue:" + e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * A helper method to dequeue an entry on queue with given index
+     *
+     * @param queue
+     *            queue to dequeue message on
+     * @param dequeueMessageIndex
+     *            entry index to dequeue.
+     */
+    private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
+    {
+        List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+        QueueEntry entry = entries.get(dequeueMessageIndex);
+        entry.acquire();
+        entry.dequeue();
+        assertTrue(entry.isDequeued());
+        return entry;
+    }
+
     private List<QueueEntry> createEntriesList(QueueEntry... entries)
     {
         ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java?rev=1132959&r1=1132958&r2=1132959&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java Tue Jun  7 11:18:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.AMQMessage;
 
 import junit.framework.TestCase;
@@ -155,5 +156,55 @@ public class SimpleQueueEntryListTest ex
         
         assertEquals("Count should have been equal",count,remainingMessages.size());
     }
-    
+
+    public void testDequedMessagedNotPresentInIterator()
+    {
+        int numberOfMessages = 10;
+        SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+        QueueEntry[] entries = new QueueEntry[numberOfMessages];
+
+        for(int i = 0; i < numberOfMessages ; i++)
+        {
+            AMQMessage message = null;;
+            try
+            {
+                message = new MockAMQMessage(i);
+            }
+            catch (AMQException e)
+            {
+                fail("Failure to create a mock message:" + e.getMessage());
+            }
+            QueueEntry entry = entryList.add(message);
+            assertNotNull("QE should not be null", entry);
+            entries[i]= entry;
+        }
+
+        // dequeue all even messages
+        for (QueueEntry queueEntry : entries)
+        {
+            long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue();
+            if (i%2 == 0)
+            {
+                queueEntry.acquire();
+                queueEntry.dequeue();
+            }
+        }
+
+        // iterate and check that dequeued messages are not returned by iterator
+        QueueEntryIterator it = entryList.iterator();
+        int counter = 0;
+        int i = 1;
+        while (it.advance())
+        {
+            QueueEntry entry = it.getNode();
+            Long id = ((AMQMessage)entry.getMessage()).getMessageId();
+            assertEquals("Expected message with id " + i + " but got message with id "
+                    + id, new Long(i), id);
+            counter++;
+            i += 2;
+        }
+        int expectedNumber = numberOfMessages / 2;
+        assertEquals("Expected  " + expectedNumber + " number of entries in iterator but got " + counter,
+                expectedNumber, counter);
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org