You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/09/11 15:33:44 UTC

svn commit: r813825 [2/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/sys/ qpid/xml/ tests/

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=813825&r1=813824&r2=813825&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Sep 11 13:33:42 2009
@@ -22,6 +22,8 @@
 #include "test_tools.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/ExchangeRegistry.h"
@@ -30,12 +32,14 @@
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/client/QueueOptions.h"
+#include "qpid/framing/reply_exceptions.h"
 #include <iostream>
 #include "boost/format.hpp"
 
 using boost::intrusive_ptr;
 using namespace qpid;
 using namespace qpid::broker;
+using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
@@ -273,14 +277,35 @@
 
 }
 
-class TestMessageStoreOC : public NullMessageStore
+const std::string nullxid = "";
+
+class SimpleDummyCtxt : public TransactionContext {};
+
+class DummyCtxt : public TPCTransactionContext
 {
+    const std::string xid;
+public:
+    DummyCtxt(const std::string& _xid) : xid(_xid) {}
+    static std::string getXid(TransactionContext& ctxt)
+    {
+        DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+        return c ? c->xid : nullxid;
+    }
+};
+
+class TestMessageStoreOC : public MessageStore
+{
+    std::set<std::string> prepared;
+    uint64_t nextPersistenceId;
   public:
 
     uint enqCnt;
     uint deqCnt;
     bool error;
 
+    TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
+    ~TestMessageStoreOC(){}
+
     virtual void dequeue(TransactionContext*,
                  const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
                  const PersistableQueue& /*queue*/)
@@ -302,8 +327,32 @@
         error=true;
     }
 
-    TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
-    ~TestMessageStoreOC(){}
+    bool init(const Options*) { return true; }
+    void truncateInit(const bool) {}
+    void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
+    void destroy(PersistableQueue&) {}
+    void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
+    void destroy(const PersistableExchange&) {}
+    void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+    void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+    void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
+    void destroy(const PersistableConfig&) {}
+    void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
+    void destroy(PersistableMessage&) {}
+    void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
+    void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
+                    std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
+    void flush(const qpid::broker::PersistableQueue&) {}
+    uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
+
+    std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
+    std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
+    void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
+    void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+    void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+    void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
+
+    void recover(RecoveryManager&) {}
 };
 
 
@@ -703,7 +752,7 @@
 
 QPID_AUTO_TEST_CASE(testLastNodeJournalError){
 /*
-simulate store excption going into last node standing
+simulate store exception going into last node standing
 
 */
     TestMessageStoreOC  testStore;
@@ -727,6 +776,83 @@
 
 }
 
+intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) {
+    intrusive_ptr<Message> msg(new Message());
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+    AMQFrame header((AMQHeaderBody()));
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+    msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    return msg;
+}
+
+QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){
+
+    TestMessageStoreOC  testStore;
+    client::QueueOptions args;
+    args.setSizePolicy(FLOW_TO_DISK, 0, 1);
+
+    intrusive_ptr<Message> msg1 = mkMsg("e", "A");
+    intrusive_ptr<Message> msg2 = mkMsg("e", "B");
+    intrusive_ptr<Message> msg3 = mkMsg("e", "C");
+    intrusive_ptr<Message> msg4 = mkMsg("e", "D");
+    intrusive_ptr<Message> msg5 = mkMsg("e", "E");
+    intrusive_ptr<Message> msg6 = mkMsg("e", "F");
+    intrusive_ptr<Message> msg7 = mkMsg("e", "G");
+    msg4->forcePersistent();
+    msg5->forcePersistent();
+    msg7->forcePersistent();
+
+    DeliverableMessage dmsg1(msg1);
+    DeliverableMessage dmsg2(msg2);
+    DeliverableMessage dmsg3(msg3);
+    DeliverableMessage dmsg4(msg4);
+    DeliverableMessage dmsg5(msg5);
+    DeliverableMessage dmsg6(msg6);
+    DeliverableMessage dmsg7(msg7);
+
+    FanOutExchange fanout1("fanout1", false, args);
+    FanOutExchange fanout2("fanout2", false, args);
+
+    Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
+    queue1->configure(args);
+    Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
+    queue2->configure(args);
+    Queue::shared_ptr queue3(new Queue("queue3", true));
+    fanout1.bind(queue1, "", 0);
+    fanout1.bind(queue2, "", 0);
+    fanout1.route(dmsg1, "", 0);
+    msg1->releaseContent();
+    fanout1.route(dmsg2, "", 0);
+    msg2->releaseContent();
+    fanout1.route(dmsg3, "", 0);
+    msg3->releaseContent();
+
+    BOOST_CHECK_EQUAL(3, queue1->getMessageCount());
+    BOOST_CHECK_EQUAL(3, queue2->getMessageCount());
+    BOOST_CHECK_EQUAL(msg1->isContentReleased(), false);
+    BOOST_CHECK_EQUAL(msg2->isContentReleased(), true);
+    BOOST_CHECK_EQUAL(msg3->isContentReleased(), true);
+
+    fanout1.bind(queue3, "", 0);
+    fanout1.route(dmsg4, "", 0);
+    msg4->releaseContent();
+    BOOST_CHECK_EQUAL(msg4->isContentReleased(), false);
+    fanout1.route(dmsg5, "", 0);
+    msg5->releaseContent();
+    BOOST_CHECK_EQUAL(msg5->isContentReleased(), false);
+
+    fanout2.bind(queue3, "", 0);
+    fanout2.route(dmsg6, "", 0);
+    fanout2.route(dmsg7, "", 0);
+    msg6->releaseContent();
+    BOOST_CHECK_EQUAL(msg6->isContentReleased(), false);
+    msg7->releaseContent();
+    BOOST_CHECK_EQUAL(msg7->isContentReleased(), false);
+
+}
+
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org