You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/09/11 15:33:44 UTC
svn commit: r813825 [2/2] - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/sys/ qpid/xml/ tests/
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=813825&r1=813824&r2=813825&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Sep 11 13:33:42 2009
@@ -22,6 +22,8 @@
#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -30,12 +32,14 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/reply_exceptions.h"
#include <iostream>
#include "boost/format.hpp"
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
+using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -273,14 +277,35 @@
}
-class TestMessageStoreOC : public NullMessageStore
+const std::string nullxid = "";
+
+class SimpleDummyCtxt : public TransactionContext {};
+
+class DummyCtxt : public TPCTransactionContext
{
+ const std::string xid;
+public:
+ DummyCtxt(const std::string& _xid) : xid(_xid) {}
+ static std::string getXid(TransactionContext& ctxt)
+ {
+ DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+ return c ? c->xid : nullxid;
+ }
+};
+
+class TestMessageStoreOC : public MessageStore
+{
+ std::set<std::string> prepared;
+ uint64_t nextPersistenceId;
public:
uint enqCnt;
uint deqCnt;
bool error;
+ TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
+ ~TestMessageStoreOC(){}
+
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
@@ -302,8 +327,32 @@
error=true;
}
- TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
- ~TestMessageStoreOC(){}
+ bool init(const Options*) { return true; }
+ void truncateInit(const bool) {}
+ void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
+ void destroy(PersistableQueue&) {}
+ void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableExchange&) {}
+ void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableConfig&) {}
+ void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
+ void destroy(PersistableMessage&) {}
+ void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
+ void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
+ std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
+ void flush(const qpid::broker::PersistableQueue&) {}
+ uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
+
+ std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
+ std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
+ void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
+ void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
+
+ void recover(RecoveryManager&) {}
};
@@ -703,7 +752,7 @@
QPID_AUTO_TEST_CASE(testLastNodeJournalError){
/*
-simulate store excption going into last node standing
+simulate store exception going into last node standing
*/
TestMessageStoreOC testStore;
@@ -727,6 +776,83 @@
}
+intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+}
+
+QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){
+
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ args.setSizePolicy(FLOW_TO_DISK, 0, 1);
+
+ intrusive_ptr<Message> msg1 = mkMsg("e", "A");
+ intrusive_ptr<Message> msg2 = mkMsg("e", "B");
+ intrusive_ptr<Message> msg3 = mkMsg("e", "C");
+ intrusive_ptr<Message> msg4 = mkMsg("e", "D");
+ intrusive_ptr<Message> msg5 = mkMsg("e", "E");
+ intrusive_ptr<Message> msg6 = mkMsg("e", "F");
+ intrusive_ptr<Message> msg7 = mkMsg("e", "G");
+ msg4->forcePersistent();
+ msg5->forcePersistent();
+ msg7->forcePersistent();
+
+ DeliverableMessage dmsg1(msg1);
+ DeliverableMessage dmsg2(msg2);
+ DeliverableMessage dmsg3(msg3);
+ DeliverableMessage dmsg4(msg4);
+ DeliverableMessage dmsg5(msg5);
+ DeliverableMessage dmsg6(msg6);
+ DeliverableMessage dmsg7(msg7);
+
+ FanOutExchange fanout1("fanout1", false, args);
+ FanOutExchange fanout2("fanout2", false, args);
+
+ Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
+ queue1->configure(args);
+ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
+ queue2->configure(args);
+ Queue::shared_ptr queue3(new Queue("queue3", true));
+ fanout1.bind(queue1, "", 0);
+ fanout1.bind(queue2, "", 0);
+ fanout1.route(dmsg1, "", 0);
+ msg1->releaseContent();
+ fanout1.route(dmsg2, "", 0);
+ msg2->releaseContent();
+ fanout1.route(dmsg3, "", 0);
+ msg3->releaseContent();
+
+ BOOST_CHECK_EQUAL(3, queue1->getMessageCount());
+ BOOST_CHECK_EQUAL(3, queue2->getMessageCount());
+ BOOST_CHECK_EQUAL(msg1->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(msg2->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(msg3->isContentReleased(), true);
+
+ fanout1.bind(queue3, "", 0);
+ fanout1.route(dmsg4, "", 0);
+ msg4->releaseContent();
+ BOOST_CHECK_EQUAL(msg4->isContentReleased(), false);
+ fanout1.route(dmsg5, "", 0);
+ msg5->releaseContent();
+ BOOST_CHECK_EQUAL(msg5->isContentReleased(), false);
+
+ fanout2.bind(queue3, "", 0);
+ fanout2.route(dmsg6, "", 0);
+ fanout2.route(dmsg7, "", 0);
+ msg6->releaseContent();
+ BOOST_CHECK_EQUAL(msg6->isContentReleased(), false);
+ msg7->releaseContent();
+ BOOST_CHECK_EQUAL(msg7->isContentReleased(), false);
+
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org