You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/12/08 11:36:33 UTC

svn commit: r483916 - in /incubator/qpid/trunk/qpid/cpp: lib/broker/BrokerQueue.cpp lib/broker/InMemoryContent.cpp tests/ChannelTest.cpp

Author: gsim
Date: Fri Dec  8 02:36:32 2006
New Revision: 483916

URL: http://svn.apache.org/viewvc?view=rev&rev=483916
Log:
Some more unit tests.


Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp
    incubator/qpid/trunk/qpid/cpp/tests/ChannelTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=483916&r1=483915&r2=483916
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Fri Dec  8 02:36:32 2006
@@ -65,6 +65,8 @@
 void Queue::recover(Message::shared_ptr& msg){
     push(msg);
     if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
+        //content has not been loaded, need to ensure that lazy loading mode is set:
+        //TODO: find a nicer way to do this
         msg->releaseContent(store);
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=483916&r1=483915&r2=483916
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp Fri Dec  8 02:36:32 2006
@@ -33,8 +33,7 @@
 {
     int sum(0);
     for (content_iterator i = content.begin(); i != content.end(); i++) {
-        sum += (*i)->size() + 8;//8 extra bytes for the frame
-        //TODO: have to get rid of the frame stuff from encoded data
+        sum += (*i)->size();
     }
     return sum;
 }

Modified: incubator/qpid/trunk/qpid/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ChannelTest.cpp?view=diff&rev=483916&r1=483915&r2=483916
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ChannelTest.cpp Fri Dec  8 02:36:32 2006
@@ -20,6 +20,9 @@
  */
 #include <BrokerChannel.h>
 #include <BrokerMessage.h>
+#include <BrokerQueue.h>
+#include <FanOutExchange.h>
+#include <NullMessageStore.h>
 #include <qpid_test_plugin.h>
 #include <iostream>
 #include <memory>
@@ -28,6 +31,8 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
+using std::string;
+using std::queue;
 
 struct DummyHandler : OutputHandler{
     std::vector<AMQFrame*> frames; 
@@ -43,8 +48,86 @@
     CPPUNIT_TEST_SUITE(ChannelTest);
     CPPUNIT_TEST(testConsumerMgmt);
     CPPUNIT_TEST(testDeliveryNoAck);
+    CPPUNIT_TEST(testDeliveryAndRecovery);
+    CPPUNIT_TEST(testStaging);
+    CPPUNIT_TEST(testQueuePolicy);
     CPPUNIT_TEST_SUITE_END();
 
+    class MockMessageStore : public NullMessageStore
+    {
+        struct MethodCall
+        {
+            const string name;
+            Message* const msg;
+            const string data;//only needed for appendContent
+
+            void check(const MethodCall& other) const
+            {
+                CPPUNIT_ASSERT_EQUAL(name, other.name);
+                CPPUNIT_ASSERT_EQUAL(msg, other.msg);
+                CPPUNIT_ASSERT_EQUAL(data, other.data);
+            }
+        };
+
+        queue<MethodCall> expected;
+        bool expectMode;//true when setting up expected calls
+
+        void handle(const MethodCall& call)
+        {
+            if (expectMode) {
+                expected.push(call);
+            } else {
+                call.check(expected.front());
+                expected.pop();
+            }
+        }
+
+        void handle(const string& name, Message* msg, const string& data)
+        {
+            MethodCall call = {name, msg, data};
+            handle(call);
+        }
+
+    public:
+
+        MockMessageStore() : expectMode(false) {}
+
+        void stage(Message* const msg)
+        {
+            if(!expectMode) msg->setPersistenceId(1);
+            MethodCall call = {"stage", msg, ""};
+            handle(call);
+        }
+
+        void appendContent(Message* msg, const string& data)
+        {
+            MethodCall call = {"appendContent", msg, data};
+            handle(call);
+        }
+
+        void destroy(Message* msg)
+        {
+            MethodCall call = {"destroy", msg, ""};
+            handle(call);
+        }
+        
+        void expect()
+        {
+            expectMode = true;
+        }
+
+        void test()
+        {
+            expectMode = false;
+        }
+
+        void check()
+        {
+            CPPUNIT_ASSERT(expected.empty());
+        }
+    };
+
+
   public:
 
     void testConsumerMgmt(){
@@ -76,13 +159,10 @@
         DummyHandler handler;
         Channel channel(&handler, 7, 10000);
 
-        Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
-        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
-        header->setContentSize(14);
-        msg->setHeader(header);
-        AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
-        msg->addContent(body);
+        const string data("abcdefghijklmn");
 
+        Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+        addContent(msg, data);
         Queue::shared_ptr queue(new Queue("my_queue"));
         ConnectionToken* owner(0);
         string tag("no_ack");
@@ -99,22 +179,19 @@
         CPPUNIT_ASSERT(deliver);
         CPPUNIT_ASSERT(contentHeader);
         CPPUNIT_ASSERT(contentBody);
-        CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
+        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
     }
 
     void testDeliveryAndRecovery(){
         DummyHandler handler;
         Channel channel(&handler, 7, 10000);
+        const string data("abcdefghijklmn");
 
-        Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
-        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
-        header->setContentSize(14);
-        msg->setHeader(header);
-        AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
-        msg->addContent(body);
+        Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+        addContent(msg, data);
 
         Queue::shared_ptr queue(new Queue("my_queue"));
-        ConnectionToken* owner;
+        ConnectionToken* owner(0);
         string tag("ack");
         channel.consume(tag, queue, true, false, owner);
 
@@ -129,7 +206,115 @@
         CPPUNIT_ASSERT(deliver);
         CPPUNIT_ASSERT(contentHeader);
         CPPUNIT_ASSERT(contentBody);
-        CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
+        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+    }
+
+    void testStaging(){
+        MockMessageStore store;
+        DummyHandler handler;
+        Channel channel(&handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
+        const string data[] = {"abcde", "fghij", "klmno"};
+
+        Message* msg = new Message(0, "my_exchange", "my_routing_key", false, false);
+
+        store.expect();
+        store.stage(msg);
+        for (int i = 0; i < 3; i++) {
+            store.appendContent(msg, data[i]);
+        }
+        store.destroy(msg);
+        store.test();
+
+        Exchange::shared_ptr exchange(new FanOutExchange("my_exchange"));
+        Queue::shared_ptr queue(new Queue("my_queue"));
+        exchange->bind(queue, "", 0);
+
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        u_int64_t contentSize(0);
+        for (int i = 0; i < 3; i++) {
+            contentSize += data[i].size();
+        }
+        header->setContentSize(contentSize);
+        channel.handlePublish(msg, exchange);
+        channel.handleHeader(header);
+
+        for (int i = 0; i < 3; i++) {
+            AMQContentBody::shared_ptr body(new AMQContentBody(data[i]));
+            channel.handleContent(body);
+        }
+        Message::shared_ptr msg2 = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
+        msg2.reset();//should trigger destroy call
+
+        store.check();
+    }
+
+
+    //NOTE: strictly speaking this should/could be part of QueueTest,
+    //but as it can usefully use the same utility classes as this
+    //class it is defined here for simpllicity
+    void testQueuePolicy()
+    {
+        MockMessageStore store;
+        {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content
+        const string data1("abcd");
+        const string data2("efghijk");
+        const string data3("lmnopqrstuvwxyz");
+        Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size()));
+        Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size()));
+        Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size()));
+        addContent(msg1, data1);
+        addContent(msg2, data2);
+        addContent(msg3, data3);
+
+        QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded
+        FieldTable settings;
+        policy.update(settings);
+        
+        store.expect();
+        store.stage(msg3.get());
+        store.destroy(msg3.get());
+        store.test();
+
+        Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
+        queue->configure(settings);//set policy
+        queue->deliver(msg1);
+        queue->deliver(msg2);
+        queue->deliver(msg3);
+        
+        Message::shared_ptr next = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1, next);
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) data1.size(), next->encodedContentSize());
+        next = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg2, next);
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) data2.size(), next->encodedContentSize());
+        next = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg3, next);
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, next->encodedContentSize());
+
+        next.reset();
+        msg1.reset();
+        msg2.reset();
+        msg3.reset();//must clear all references to messages to allow them to be destroyed
+
+        }
+        store.check();
+    }
+
+    Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
+    {
+        Message* msg = new Message(0, exchange, routingKey, false, false);
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(contentSize);        
+        msg->setHeader(header);
+        msg->getHeaderProperties()->setMessageId(messageId);
+        return msg;
+    }
+
+    void addContent(Message::shared_ptr msg, const string& data)
+    {
+        AMQContentBody::shared_ptr body(new AMQContentBody(data));
+        msg->addContent(body);
     }
 };