You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/10/10 16:41:09 UTC
svn commit: r703485 - in /incubator/qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/store/
Author: aidan
Date: Fri Oct 10 07:41:08 2008
New Revision: 703485
URL: http://svn.apache.org/viewvc?rev=703485&view=rev
Log:
QPID-1314: Make sure all messags that are enqueued are dequeued.
SimpleAMQQueue - dequeue messages if they are persistent, regardless of queue durability.
SimpleAMQQueueTest - make sure that all messages which are stored are removed properly.
TestableMemoryMessageStore - override enqueue/dequeue so it's possible to determine what is in the queue at any given point in time.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=703485&r1=703484&r2=703485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Oct 10 07:41:08 2008
@@ -587,7 +587,7 @@
try
{
AMQMessage msg = entry.getMessage();
- if (isDurable() && msg.isPersistent())
+ if (msg.isPersistent())
{
_virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=703485&r1=703484&r2=703485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Oct 10 07:41:08 2008
@@ -21,13 +21,16 @@
*/
+import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -39,6 +42,7 @@
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueueTest extends TestCase
@@ -46,7 +50,7 @@
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected MessageStore _store = new TestableMemoryMessageStore();
+ protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -328,6 +332,39 @@
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
+
+ public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
+ IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ msg.setContentHeaderBody(contentHeaderBody);
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent message
+ qs.add(_queue);
+ msg.enqueue(qs);
+ msg.routingComplete(_store, new MessageHandleFactory());
+
+ // Check that it is enqueued
+ AMQQueue data = _store.getMessages().get(1L);
+ _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
+ assertNotNull(data);
+
+ // Dequeue message
+ MockQueueEntry entry = new MockQueueEntry();
+ AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
+
+ entry.setMessage(amqmsg);
+ _queue.dequeue(null, entry);
+
+ // Check that it is dequeued
+ data = _store.getMessages().get(1L);
+ assertNull(data);
+ }
// FIXME: move this to somewhere useful
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=703485&r1=703484&r2=703485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Oct 10 07:41:08 2008
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.HashMap;
import java.util.List;
/**
@@ -35,6 +38,7 @@
{
MemoryMessageStore _mms = null;
+ private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -70,4 +74,19 @@
return _contentBodyMap;
}
}
+
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ getMessages().put(messageId, queue);
+ }
+
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ getMessages().remove(messageId);
+ }
+
+ public HashMap<Long, AMQQueue> getMessages()
+ {
+ return _messages;
+ }
}