You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/02/08 11:23:06 UTC

svn commit: r1068315 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/store/ test/java/org/apache/qpid/server/subscription/

Author: robbie
Date: Tue Feb  8 10:23:05 2011
New Revision: 1068315

URL: http://svn.apache.org/viewvc?rev=1068315&view=rev
Log:
QPID-2900: Changed SimpleAMQQueue to avoid race condition in the updating atomic QueueContext._releasedEntry. Race was between thread SubFlushRunner (or QueueRunner) executing method SimpleAMQQueue.setLastSeenEntry and the thread executing the MessageRelase command executing method SimpleAMQQueue.updateSubRequeueEntry. Bolstered the unit tests surrounding the area of change to reduce risk of regression. Overrode TestableMemoryMessageStore#close() to avoid a NPE during tearDown silently cluttering some unit test logs (including SimpleAMQQueueTest).

Applied patch from Keith Wall <ke...@gmail.com>

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1068315&r1=1068314&r2=1068315&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Feb  8 10:23:05 2011
@@ -54,7 +54,6 @@ import org.apache.qpid.server.virtualhos
 
 import javax.management.JMException;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -742,12 +741,12 @@ public class SimpleAMQQueue implements A
     private void deliverMessage(final Subscription sub, final QueueEntry entry)
             throws AMQException
     {
+        setLastSeenEntry(sub, entry);
+
         _deliveredMessages.incrementAndGet();
         incrementUnackedMsgCount();
 
         sub.send(entry);
-
-        setLastSeenEntry(sub,entry);
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java?rev=1068315&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java Tue Feb  8 10:23:05 2011
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+/**
+ *
+ * Tests QueueEntry
+ *
+ */
+public class QueueEntryTest extends QpidTestCase
+{
+    private QueueEntryImpl _queueEntry1 = null;
+    private QueueEntryImpl _queueEntry2 = null;
+    private QueueEntryImpl _queueEntry3 = null;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        int i = 0;
+
+        SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null);
+        _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
+        _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
+        _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
+    }
+
+    public void testCompareTo()
+    {
+        assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0);
+        assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0);
+        assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0);
+    }
+
+    /**
+     * Tests that the getNext() can be used to traverse the list.
+     */
+    public void testTraverseWithNoDeletedEntries()
+    {
+        QueueEntryImpl current = _queueEntry1;
+
+        current = current.getNext();
+        assertSame("Unexpected current entry",_queueEntry2, current);
+
+        current = current.getNext();
+        assertSame("Unexpected current entry",_queueEntry3, current);
+
+        current = current.getNext();
+        assertNull(current);
+
+    }
+
+    /**
+     * Tests that the getNext() can be used to traverse the list but deleted
+     * entries are skipped and de-linked from the chain of entries.
+     */
+    public void testTraverseWithDeletedEntries()
+    {
+        // Delete 2nd queue entry
+        _queueEntry2.delete();
+        assertTrue(_queueEntry2.isDeleted());
+
+
+        QueueEntryImpl current = _queueEntry1;
+
+        current = current.getNext();
+        assertSame("Unexpected current entry",_queueEntry3, current);
+
+        current = current.getNext();
+        assertNull(current);
+
+        // Assert the side effects of getNext()
+        assertSame("Next node of entry 1 should now be entry 3",
+                _queueEntry3, _queueEntry1.nextNode());
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1068315&r1=1068314&r2=1068315&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Feb  8 10:23:05 2011
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.queue;
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.server.queue;
  *
  */
 
+package org.apache.qpid.server.queue;
 
 import org.apache.commons.configuration.PropertiesConfiguration;
 
@@ -36,6 +36,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -170,7 +171,7 @@ public class SimpleAMQQueueTest extends 
 
     }
 
-    public void testSubscription() throws AMQException
+    public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
     {
         // Check adding a subscription adds it to the queue
         _queue.registerSubscription(_subscription, false);
@@ -185,6 +186,7 @@ public class SimpleAMQQueueTest extends 
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+        assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
 
         // Check removing the subscription removes it's information from the queue
         _queue.unregisterSubscription(_subscription);
@@ -199,13 +201,269 @@ public class SimpleAMQQueueTest extends 
 
     }
 
-    public void testQueueNoSubscriber() throws AMQException, InterruptedException
+    public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
     {
         AMQMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+        assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+    }
+
+    /**
+     * Tests enqueuing two messages.
+     */
+    public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+    {
+        AMQMessage messageA = createMessage(new Long(24));
+        AMQMessage messageB = createMessage(new Long(25));
+        _queue.enqueue(messageA);
+        _queue.enqueue(messageB);
+        _queue.registerSubscription(_subscription, false);
+        Thread.sleep(150);
+        assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+        assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+    }
+
+    /**
+     * Tests that a re-queued message is resent to the subscriber.  Verifies also that the
+     * QueueContext._releasedEntry is reset to null after the entry has been reset.
+     */
+    public void testRequeuedMessageIsResentToSubscriber() throws Exception
+    {
+        _queue.registerSubscription(_subscription, false);
+
+        final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        {
+            public void onEnqueue(QueueEntry entry)
+            {
+                queueEntries.add(entry);
+            }
+        };
+
+        AMQMessage messageA = createMessage(new Long(24));
+        AMQMessage messageB = createMessage(new Long(25));
+        AMQMessage messageC = createMessage(new Long(26));
+
+        /* Enqueue three messages */
+
+        _queue.enqueue(messageA, postEnqueueAction);
+        _queue.enqueue(messageB, postEnqueueAction);
+        _queue.enqueue(messageC, postEnqueueAction);
+
+        Thread.sleep(150);  // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
+
+        /* Now requeue the first message only */
+
+        queueEntries.get(0).release();
+        _queue.requeue(queueEntries.get(0));
+
+        Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size());
+        assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
+        assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
+        assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+    }
+
+    /**
+     * Tests that a re-queued message that becomes expired is not resent to the subscriber.
+     * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries.
+     * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset.
+     */
+    public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception
+    {
+        _queue.registerSubscription(_subscription, false);
+
+        final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        {
+            public void onEnqueue(QueueEntry entry)
+            {
+                queueEntries.add(entry);
+            }
+        };
+
+        /* Enqueue one message with expiration set for a short time in the future */
+
+        AMQMessage messageA = createMessage(new Long(24));
+        int messageExpirationOffset = 200;
+        messageA.setExpiration(System.currentTimeMillis() + messageExpirationOffset);
+
+        _queue.enqueue(messageA, postEnqueueAction);
+
+        int subFlushWaitTime = 150;
+        Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+
+        /* Wait a little more to be sure that message will have expired, then requeue it */
+        Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
+        queueEntries.get(0).release();
+        _queue.requeue(queueEntries.get(0));
+
+        Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread
+
+        assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
+        assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+
+    }
+
+    /**
+     * Tests that if a client requeues messages 'out of order' (the order
+     * used by QueueEntryImpl.compareTo) that messages are still resent
+     * successfully.  Specifically this test ensures the {@see SimpleAMQQueue#requeue()}
+     * can correctly move the _releasedEntry to an earlier position in the QueueEntry list.
+     */
+    public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception
+    {
+        _queue.registerSubscription(_subscription, false);
+
+        final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        {
+            public void onEnqueue(QueueEntry entry)
+            {
+                queueEntries.add(entry);
+            }
+        };
+
+        AMQMessage messageA = createMessage(new Long(24));
+        AMQMessage messageB = createMessage(new Long(25));
+        AMQMessage messageC = createMessage(new Long(26));
+
+        /* Enqueue three messages */
+
+        _queue.enqueue(messageA, postEnqueueAction);
+        _queue.enqueue(messageB, postEnqueueAction);
+        _queue.enqueue(messageC, postEnqueueAction);
+
+        Thread.sleep(150);  // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
+        assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
+
+        /* Now requeue the third and first message only */
+
+        queueEntries.get(2).release();
+        queueEntries.get(0).release();
+        _queue.requeue(queueEntries.get(2));
+        _queue.requeue(queueEntries.get(0));
+
+        Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size());
+        assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
+        assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
+        assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+    }
+
+
+    /**
+     * Tests a requeue for a queue with multiple subscriptions.  Verifies that a
+     * requeue resends a message to a <i>single</i> subscriber.
+     */
+    public void testRequeueForQueueWithMultipleSubscriptions() throws Exception
+    {
+        MockSubscription subscription1 = new MockSubscription();
+        MockSubscription subscription2 = new MockSubscription();
+
+        _queue.registerSubscription(subscription1, false);
+        _queue.registerSubscription(subscription2, false);
+
+        final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        {
+            public void onEnqueue(QueueEntry entry)
+            {
+                queueEntries.add(entry);
+            }
+        };
+
+        AMQMessage messageA = createMessage(new Long(24));
+        AMQMessage messageB = createMessage(new Long(25));
+
+        /* Enqueue two messages */
+
+        _queue.enqueue(messageA, postEnqueueAction);
+        _queue.enqueue(messageB, postEnqueueAction);
+
+        Thread.sleep(150);  // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size());
+
+        /* Now requeue a message (for any subscription) */
+
+        queueEntries.get(0).release();
+        _queue.requeue((QueueEntryImpl)queueEntries.get(0));
+
+        Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
+    }
+
+    /**
+     * Tests a requeue for a queue with multiple subscriptions.  Verifies that a
+     * subscriber specific requeue resends the message to <i>that</i> subscriber.
+     */
+    public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception
+    {
+        MockSubscription subscription1 = new MockSubscription();
+        MockSubscription subscription2 = new MockSubscription();
+
+        _queue.registerSubscription(subscription1, false);
+        _queue.registerSubscription(subscription2, false);
+
+        final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        {
+            public void onEnqueue(QueueEntry entry)
+            {
+                queueEntries.add(entry);
+            }
+        };
+
+        AMQMessage messageA = createMessage(new Long(24));
+        AMQMessage messageB = createMessage(new Long(25));
+
+        /* Enqueue two messages */
+
+        _queue.enqueue(messageA, postEnqueueAction);
+        _queue.enqueue(messageB, postEnqueueAction);
+
+        Thread.sleep(150);  // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size());
+
+        /* Now requeue a message (for first subscription) */
+
+        queueEntries.get(0).release();
+        _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1);
+
+        Thread.sleep(150); // Work done by SubFlushRunner Thread
+
+        assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
     }
 
     public void testExclusiveConsumer() throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java?rev=1068315&r1=1068314&r2=1068315&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java Tue Feb  8 10:23:05 2011
@@ -51,6 +51,21 @@ public class SimpleQueueEntryListTest ex
         }
     }
     
+    /**
+     * Tests the behavior of the next(QueuyEntry) method.
+     */
+    public void testNext() throws Exception
+    {
+        SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
+        int i = 0;
+
+        QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++));
+        QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++));
+
+        assertSame(queueEntry2, sqel.next(queueEntry1));
+        assertNull(sqel.next(queueEntry2));
+    }
+
     public void testScavenge() throws Exception
     {
         SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=1068315&r1=1068314&r2=1068315&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Tue Feb  8 10:23:05 2011
@@ -20,17 +20,12 @@
  */
 package org.apache.qpid.server.store;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashMap;
-import java.util.List;
-import java.nio.ByteBuffer;
 
 /**
  * Adds some extra methods to the memory message store for testing purposes.
@@ -52,8 +47,11 @@ public class TestableMemoryMessageStore 
 
     }
 
-
-
+    @Override
+    public void close() throws Exception
+    {
+        // Not required to do anything
+    }
 
     @Override
     public StoredMessage addMessage(StorableMessageMetaData metaData)

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1068315&r1=1068314&r2=1068315&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Feb  8 10:23:05 2011
@@ -21,20 +21,19 @@ package org.apache.qpid.server.subscript
 *
 */
 
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
 
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 public class MockSubscription implements Subscription
 {
 
@@ -137,12 +136,11 @@ public class MockSubscription implements
 
     public void set(String key, Object value)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public Object get(String key)
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public boolean isAutoClose()
@@ -194,12 +192,15 @@ public class MockSubscription implements
 
     public void restoreCredit(QueueEntry queueEntry)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void send(QueueEntry msg) throws AMQException
+    public void send(QueueEntry entry) throws AMQException
     {
-        messages.add(msg);
+        if (messages.contains(entry))
+        {
+            entry.setRedelivered();
+        }
+        messages.add(entry);
     }
 
     public void setQueueContext(AMQQueue.Context queueContext)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org