You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/10/10 16:41:09 UTC

svn commit: r703485 - in /incubator/qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/store/

Author: aidan
Date: Fri Oct 10 07:41:08 2008
New Revision: 703485

URL: http://svn.apache.org/viewvc?rev=703485&view=rev
Log:
QPID-1314: Make sure all messags that are enqueued are dequeued.

SimpleAMQQueue - dequeue messages if they are persistent, regardless of queue durability.
SimpleAMQQueueTest - make sure that all messages which are stored are removed properly.
TestableMemoryMessageStore - override enqueue/dequeue so it's possible to determine what is in the queue at any given point in time.

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=703485&r1=703484&r2=703485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Oct 10 07:41:08 2008
@@ -587,7 +587,7 @@
         try
         {
             AMQMessage msg = entry.getMessage();
-            if (isDurable() && msg.isPersistent())
+            if (msg.isPersistent())
             {
                 _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=703485&r1=703484&r2=703485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Oct 10 07:41:08 2008
@@ -21,13 +21,16 @@
  */
 
 
+import java.util.ArrayList;
 import java.util.List;
 
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exchange.DirectExchange;
@@ -39,6 +42,7 @@
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class SimpleAMQQueueTest extends TestCase
@@ -46,7 +50,7 @@
 
     protected SimpleAMQQueue _queue;
     protected VirtualHost _virtualHost;
-    protected MessageStore _store = new TestableMemoryMessageStore();
+    protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
     protected AMQShortString _qname = new AMQShortString("qname");
     protected AMQShortString _owner = new AMQShortString("owner");
     protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -328,6 +332,39 @@
             assertEquals("Message ID was wrong", messageId, msgids.get(i));
         }
     }
+  
+    public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
+    {
+        // Create IncomingMessage and nondurable queue
+        NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
+        IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.properties = new BasicContentHeaderProperties();
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+        msg.setContentHeaderBody(contentHeaderBody);
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        
+        // Send persistent message
+        qs.add(_queue);
+        msg.enqueue(qs);
+        msg.routingComplete(_store, new MessageHandleFactory());
+        
+        // Check that it is enqueued
+        AMQQueue data = _store.getMessages().get(1L);
+        _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
+        assertNotNull(data);
+        
+        // Dequeue message
+        MockQueueEntry entry = new MockQueueEntry();
+        AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
+        
+        entry.setMessage(amqmsg);
+        _queue.dequeue(null, entry);
+        
+        // Check that it is dequeued
+        data = _store.getMessages().get(1L);
+        assertNull(data);
+    }
 
 
     // FIXME: move this to somewhere useful

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=703485&r1=703484&r2=703485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Oct 10 07:41:08 2008
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.server.store;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -35,6 +38,7 @@
 {
 
     MemoryMessageStore _mms = null;
+    private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
 
     public TestableMemoryMessageStore(MemoryMessageStore mms)
     {
@@ -70,4 +74,19 @@
             return _contentBodyMap;
         }
     }
+    
+    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    {
+        getMessages().put(messageId, queue);
+    }
+
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    {
+        getMessages().remove(messageId);
+    }
+
+    public HashMap<Long, AMQQueue> getMessages()
+    {
+        return _messages;
+    }
 }