You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC

svn commit: r1377715 [8/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/ cpp/src/qpid/asyncStore/ cpp/src/qpid...

Modified: qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp Mon Aug 27 15:40:33 2012
@@ -38,8 +38,8 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
 
 #include <iostream>
 #include <vector>
@@ -56,196 +56,47 @@ using namespace qpid::sys;
 
 namespace qpid {
 namespace tests {
-
 class TestConsumer : public virtual Consumer{
 public:
     typedef boost::shared_ptr<TestConsumer> shared_ptr;
 
-    QueuedMessage last;
+    QueueCursor lastCursor;
+    Message lastMessage;
     bool received;
-    TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
+    TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {};
 
-    virtual bool deliver(QueuedMessage& msg){
-        last = msg;
+    virtual bool deliver(const QueueCursor& cursor, const Message& message){
+        lastCursor = cursor;
+        lastMessage = message;
         received = true;
         return true;
     };
     void notify() {}
     void cancel() {}
-    void acknowledged(const QueuedMessage&) {}
+    void acknowledged(const DeliveryRecord&) {}
     OwnershipToken* getSession() { return 0; }
 };
 
 class FailOnDeliver : public Deliverable
 {
-    boost::intrusive_ptr<Message> msg;
+    Message msg;
 public:
     FailOnDeliver() : msg(MessageUtils::createMessage()) {}
     void deliverTo(const boost::shared_ptr<Queue>& queue)
     {
         throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
     }
-    Message& getMessage() { return *(msg.get()); }
+    Message& getMessage() { return msg; }
 };
 
-intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
-    intrusive_ptr<Message> msg(new Message());
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
-    msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
-    if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
-    return msg;
-}
-
-intrusive_ptr<Message> contentMessage(string content) {
-    intrusive_ptr<Message> m(MessageUtils::createMessage());
-    MessageUtils::addContent(m, content);
-    return m;
-}
-
-string getContent(intrusive_ptr<Message> m) {
-    return m->getFrames().getContent();
-}
-
 QPID_AUTO_TEST_SUITE(QueueTestSuite)
 
-QPID_AUTO_TEST_CASE(testAsyncMessage) {
-    Queue::shared_ptr queue(new Queue("my_test_queue", true));
-    intrusive_ptr<Message> received;
-
-    TestConsumer::shared_ptr c1(new TestConsumer());
-    queue->consume(c1);
-
-
-    //Test basic delivery:
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
-    queue->process(msg1);
-    sleep(2);
-
-    BOOST_CHECK(!c1->received);
-    msg1->enqueueComplete();
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg1.get(), received.get());
-}
-
-
-QPID_AUTO_TEST_CASE(testAsyncMessageCount){
-    Queue::shared_ptr queue(new Queue("my_test_queue", true));
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
-
-    queue->process(msg1);
-    sleep(2);
-    uint32_t compval=0;
-    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
-    msg1->enqueueComplete();
-    compval=1;
-    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
-    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
-}
-
-QPID_AUTO_TEST_CASE(testConsumers){
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-
-    //Test adding consumers:
-    TestConsumer::shared_ptr c1(new TestConsumer());
-    TestConsumer::shared_ptr c2(new TestConsumer());
-    queue->consume(c1);
-    queue->consume(c2);
-
-    BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
-
-    //Test basic delivery:
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
-    queue->deliver(msg1);
-    BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
-
-    queue->deliver(msg2);
-    BOOST_CHECK(queue->dispatch(c2));
-    BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
-
-    c1->received = false;
-    queue->deliver(msg3);
-    BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
-
-    //Test cancellation:
-    queue->cancel(c1);
-    BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount());
-    queue->cancel(c2);
-    BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount());
-}
-
-QPID_AUTO_TEST_CASE(testRegistry){
-    //Test use of queues in registry:
-    QueueRegistry registry;
-    registry.declare("queue1", true, true);
-    registry.declare("queue2", true, true);
-    registry.declare("queue3", true, true);
-
-    BOOST_CHECK(registry.find("queue1"));
-    BOOST_CHECK(registry.find("queue2"));
-    BOOST_CHECK(registry.find("queue3"));
-
-    registry.destroy("queue1");
-    registry.destroy("queue2");
-    registry.destroy("queue3");
-
-    BOOST_CHECK(!registry.find("queue1"));
-    BOOST_CHECK(!registry.find("queue2"));
-    BOOST_CHECK(!registry.find("queue3"));
-}
-
-QPID_AUTO_TEST_CASE(testDequeue){
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-    intrusive_ptr<Message> received;
-
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-
-    BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg1.get(), received.get());
-    BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg2.get(), received.get());
-    BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
-
-    TestConsumer::shared_ptr consumer(new TestConsumer());
-    queue->consume(consumer);
-    queue->dispatch(consumer);
-    if (!consumer->received)
-        sleep(2);
-
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
-    BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
-    received = queue->get().payload;
-    BOOST_CHECK(!received);
-    BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
-}
-
 QPID_AUTO_TEST_CASE(testBound){
     //test the recording of bindings, and use of those to allow a queue to be unbound
     string key("my-key");
     FieldTable args;
 
-    Queue::shared_ptr queue(new Queue("my-queue", true));
+    Queue::shared_ptr queue(new Queue("my-queue"));
     ExchangeRegistry exchanges;
     //establish bindings from exchange->queue and notify the queue as it is bound:
     Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
@@ -273,423 +124,69 @@ QPID_AUTO_TEST_CASE(testBound){
     exchange3->route(deliverable);
 }
 
-QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
-    client::QueueOptions args;
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue(new Queue("my-queue", true));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
-    //enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-
-    //change mode
-    queue->setLastNodeFailure();
-
-    //enqueue 1 message
-    queue->deliver(msg3);
-
-    //check all have persistent ids.
-    BOOST_CHECK(msg1->isPersistent());
-    BOOST_CHECK(msg2->isPersistent());
-    BOOST_CHECK(msg3->isPersistent());
-
-}
-
-
-QPID_AUTO_TEST_CASE(testSeek){
-
-    Queue::shared_ptr queue(new Queue("my-queue", true));
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
+QPID_AUTO_TEST_CASE(testLVQ){
 
-    //enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-
-    TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
-    SequenceNumber seq(2);
-    consumer->setPosition(seq);
-
-    QueuedMessage qm;
-    queue->dispatch(consumer);
-
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
-    queue->dispatch(consumer);
-    queue->dispatch(consumer); // make sure over-run is safe
-
-}
-
-QPID_AUTO_TEST_CASE(testSearch){
-
-    Queue::shared_ptr queue(new Queue("my-queue", true));
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
-    //enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-
-    SequenceNumber seq(2);
-    QueuedMessage qm;
-    TestConsumer::shared_ptr c1(new TestConsumer());
-
-    BOOST_CHECK(queue->find(seq, qm));
-
-    BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
-    queue->acquire(qm, c1->getName());
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-    SequenceNumber seq1(3);
-    QueuedMessage qm1;
-    BOOST_CHECK(queue->find(seq1, qm1));
-    BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
-}
-const std::string nullxid = "";
-
-class SimpleDummyCtxt : public TransactionContext {};
+    QueueSettings settings;
+    string key="key";
+    settings.lvqKey = key;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
-class DummyCtxt : public TPCTransactionContext
-{
-    const std::string xid;
-  public:
-    DummyCtxt(const std::string& _xid) : xid(_xid) {}
-    static std::string getXid(TransactionContext& ctxt)
-    {
-        DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
-        return c ? c->xid : nullxid;
+    const char* values[] = { "a", "b", "c", "a"};
+    for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+        qpid::types::Variant::Map properties;
+        properties[key] = values[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
     }
-};
+    BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
 
-class TestMessageStoreOC : public MessageStore
-{
-    std::set<std::string> prepared;
-    uint64_t nextPersistenceId;
-  public:
-
-    uint enqCnt;
-    uint deqCnt;
-    bool error;
-
-    TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
-    ~TestMessageStoreOC(){}
-
-    virtual void dequeue(TransactionContext*,
-                 const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
-                 const PersistableQueue& /*queue*/)
-    {
-        if (error) throw Exception("Dequeue error test");
-        deqCnt++;
-    }
+    TestConsumer::shared_ptr c(new TestConsumer("test", true));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent());
 
-    virtual void enqueue(TransactionContext*,
-                 const boost::intrusive_ptr<PersistableMessage>& msg,
-                 const PersistableQueue& /* queue */)
-    {
-        if (error) throw Exception("Enqueue error test");
-        enqCnt++;
-        msg->enqueueComplete();
-    }
 
-    void createError()
-    {
-        error=true;
+    const char* values2[] = { "a", "b", "c"};
+    for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) {
+        qpid::types::Variant::Map properties;
+        properties[key] = values[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5)));
     }
+    BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
 
-    bool init(const Options*) { return true; }
-    void truncateInit(const bool) {}
-    void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
-    void destroy(PersistableQueue&) {}
-    void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
-    void destroy(const PersistableExchange&) {}
-    void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
-    void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
-    void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
-    void destroy(const PersistableConfig&) {}
-    void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
-    void destroy(PersistableMessage&) {}
-    void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
-    void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
-                    std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
-    void flush(const qpid::broker::PersistableQueue&) {}
-    uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
-
-    std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
-    std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
-    void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
-    void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
-    void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
-    void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
-
-    void recover(RecoveryManager&) {}
-};
-
-
-QPID_AUTO_TEST_CASE(testLVQOrdering){
-
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-
-    Queue::shared_ptr queue(new Queue("my-queue", true ));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-    intrusive_ptr<Message> msg4 = createMessage("e", "D");
-    intrusive_ptr<Message> received;
-
-    //set deliever match for LVQ a,b,c,a
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"b");
-    msg3->insertCustomProperty(key,"c");
-    msg4->insertCustomProperty(key,"a");
-
-    //enqueue 4 message
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-    queue->deliver(msg4);
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg3.get(), received.get());
-
-    intrusive_ptr<Message> msg5 = createMessage("e", "A");
-    intrusive_ptr<Message> msg6 = createMessage("e", "B");
-    intrusive_ptr<Message> msg7 = createMessage("e", "C");
-    msg5->insertCustomProperty(key,"a");
-    msg6->insertCustomProperty(key,"b");
-    msg7->insertCustomProperty(key,"c");
-    queue->deliver(msg5);
-    queue->deliver(msg6);
-    queue->deliver(msg7);
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg5.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg6.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg7.get(), received.get());
-
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent());
 }
 
 QPID_AUTO_TEST_CASE(testLVQEmptyKey){
 
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-
-    Queue::shared_ptr queue(new Queue("my-queue", true ));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
-    msg1->insertCustomProperty(key,"a");
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQAcquire){
-
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-    // disable flow control, as this test violates the enqueue/dequeue sequence.
-    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
-
-    Queue::shared_ptr queue(new Queue("my-queue", true ));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-    intrusive_ptr<Message> msg4 = createMessage("e", "D");
-    intrusive_ptr<Message> msg5 = createMessage("e", "F");
-    intrusive_ptr<Message> msg6 = createMessage("e", "G");
-
-    //set deliever match for LVQ a,b,c,a
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"b");
-    msg3->insertCustomProperty(key,"c");
-    msg4->insertCustomProperty(key,"a");
-    msg5->insertCustomProperty(key,"b");
-    msg6->insertCustomProperty(key,"c");
-
-    //enqueue 4 message
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-    queue->deliver(msg4);
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    framing::SequenceNumber sequence(1);
-    QueuedMessage qmsg(queue.get(), msg1, sequence);
-    QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
-    framing::SequenceNumber sequence1(10);
-    QueuedMessage qmsg3(queue.get(), 0, sequence1);
-    TestConsumer::shared_ptr dummy(new TestConsumer());
-
-    BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
-    BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
-    // Acquire the massage again to test failure case.
-    BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
-    BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
-    queue->deliver(msg5);
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    // set mode to no browse and check
-    args.setOrdering(client::LVQ_NO_BROWSE);
-    queue->configure(args);
-    TestConsumer::shared_ptr c1(new TestConsumer("test", false));
-
-    queue->dispatch(c1);
-    queue->dispatch(c1);
-    queue->dispatch(c1);
-
-    queue->deliver(msg6);
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    intrusive_ptr<Message> received;
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQMultiQueue){
-
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true ));
-    Queue::shared_ptr queue2(new Queue("my-queue", true ));
-    intrusive_ptr<Message> received;
-    queue1->configure(args);
-    queue2->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "A");
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"a");
-
-    queue1->deliver(msg1);
-    queue2->deliver(msg1);
-    queue1->deliver(msg2);
-
-    received = queue1->get().payload;
-    BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
-    received = queue2->get().payload;
-    BOOST_CHECK_EQUAL(msg1.get(), received.get());
-
-}
+    QueueSettings settings;
+    string key="key";
+    settings.lvqKey = key;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
-QPID_AUTO_TEST_CASE(testLVQRecover){
 
-/* simulate this
-  1. start 2 nodes
-  2. create cluster durable lvq
-  3. send a transient message to the queue
-  4. kill one of the nodes (to trigger force persistent behaviour)...
-  5. then restart it (to turn off force persistent behaviour)
-  6. send another transient message with same lvq key as in 3
-  7. kill the second node again (retrigger force persistent)
-  8. stop and recover the first node
-*/
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
-    intrusive_ptr<Message> received;
-    queue1->create(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "A");
-    // 2
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"a");
-    // 3
-    queue1->deliver(msg1);
-    // 4
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    // 5
-    queue1->clearLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    // 6
-    queue1->deliver(msg2);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-    BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
+    qpid::types::Variant::Map properties;
+    properties["key"] = "a";
+    q->deliver(MessageUtils::createMessage(properties, "one"));
+    properties.clear();
+    q->deliver(MessageUtils::createMessage(properties, "two"));
+    BOOST_CHECK_EQUAL(q->getMessageCount(), 2u);
 }
 
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
 {
     for (uint i = 0; i < count; i++) {
-        intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
-        m->computeExpiration(new broker::ExpiryPolicy);
+        Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
+        m.computeExpiration(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
 }
@@ -706,7 +203,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
 QPID_AUTO_TEST_CASE(testQueueCleaner) {
     Timer timer;
     QueueRegistry queues;
-    Queue::shared_ptr queue = queues.declare("my-queue").first;
+    Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
     addMessagesToQueue(10, *queue, 200, 400);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
 
@@ -717,44 +214,57 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
 }
-
-
 namespace {
-    // helper for group tests
-    void verifyAcquire( Queue::shared_ptr queue,
-                        TestConsumer::shared_ptr c,
-                        std::deque<QueuedMessage>& results,
-                        const std::string& expectedGroup,
-                        const int expectedId )
-    {
-        BOOST_CHECK(queue->dispatch(c));
-        results.push_back(c->last);
-        std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
-        int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+int getIntProperty(const Message& message, const std::string& key)
+{
+    qpid::types::Variant v = message.getProperty(key);
+    int i(0);
+    if (!v.isVoid()) i = v;
+    return i;
+}
+// helper for group tests
+void verifyAcquire( Queue::shared_ptr queue,
+                    TestConsumer::shared_ptr c,
+                    std::deque<QueueCursor>& results,
+                    const std::string& expectedGroup,
+                    const int expectedId )
+{
+    bool success = queue->dispatch(c);
+    BOOST_CHECK(success);
+    if (success) {
+        results.push_back(c->lastCursor);
+        std::string group = c->lastMessage.getPropertyAsString("GROUP-ID");
+        int id = getIntProperty(c->lastMessage, "MY-ID");
         BOOST_CHECK_EQUAL( group, expectedGroup );
         BOOST_CHECK_EQUAL( id, expectedId );
     }
 }
 
+Message createGroupMessage(int id, const std::string& group)
+{
+    qpid::types::Variant::Map properties;
+    properties["GROUP-ID"] = group;
+    properties["MY-ID"] = id;
+    return MessageUtils::createMessage(properties);
+}
+}
+
 QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
     //
     // Verify that consumers of grouped messages own the groups once a message is acquired,
     // and release the groups once all acquired messages have been dequeued or requeued
     //
-    FieldTable args;
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-    args.setString("qpid.group_header_key", "GROUP-ID");
-    args.setInt("qpid.shared_msg_group", 1);
-    queue->configure(args);
+    QueueSettings settings;
+    settings.shareGroups = 1;
+    settings.groupKey = "GROUP-ID";
+    QueueFactory factory;
+    Queue::shared_ptr queue(factory.create("my_queue", settings));
 
     std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
                              std::string("b"), std::string("b"), std::string("b"),
                              std::string("c"), std::string("c"), std::string("c") };
     for (int i = 0; i < 9; ++i) {
-        intrusive_ptr<Message> msg = createMessage("e", "A");
-        msg->insertCustomProperty("GROUP-ID", groups[i]);
-        msg->insertCustomProperty("MY-ID", i);
-        queue->deliver(msg);
+        queue->deliver(createGroupMessage(i, groups[i]));
     }
 
     // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
@@ -768,8 +278,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->consume(c1);
     queue->consume(c2);
 
-    std::deque<QueuedMessage> dequeMeC1;
-    std::deque<QueuedMessage> dequeMeC2;
+    std::deque<QueueCursor> dequeMeC1;
+    std::deque<QueueCursor> dequeMeC2;
 
 
     verifyAcquire(queue, c1, dequeMeC1, "a", 0 );  // c1 now owns group "a" (acquire a-0)
@@ -828,9 +338,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
 
     // what happens if C-2 "requeues" a-1 and a-2?
-    queue->requeue( dequeMeC2.front() );
+    queue->release( dequeMeC2.front() );
     dequeMeC2.pop_front();
-    queue->requeue( dequeMeC2.front() );
+    queue->release( dequeMeC2.front() );
     dequeMeC2.pop_front();  // now just has c-7 acquired
 
     // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
@@ -855,9 +365,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     gotOne = queue->dispatch(c2);
     BOOST_CHECK( !gotOne );
 
-    // requeue all of C1's acquired messages, then cancel C1
+    // release all of C1's acquired messages, then cancel C1
     while (!dequeMeC1.empty()) {
-        queue->requeue(dequeMeC1.front());
+        queue->release(dequeMeC1.front());
         dequeMeC1.pop_front();
     }
     queue->cancel(c1);
@@ -877,7 +387,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Owners= ---, ---, ---
 
     TestConsumer::shared_ptr c3(new TestConsumer("C3"));
-    std::deque<QueuedMessage> dequeMeC3;
+    std::deque<QueueCursor> dequeMeC3;
 
     verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
     verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
@@ -897,11 +407,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
 
     // Queue = a-2,
     // Owners= ^C3,
-
-    intrusive_ptr<Message> msg = createMessage("e", "A");
-    msg->insertCustomProperty("GROUP-ID", "a");
-    msg->insertCustomProperty("MY-ID", 9);
-    queue->deliver(msg);
+    queue->deliver(createGroupMessage(9, "a"));
 
     // Queue = a-2, a-9
     // Owners= ^C3, ^C3
@@ -909,10 +415,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     gotOne = queue->dispatch(c2);
     BOOST_CHECK( !gotOne );
 
-    msg = createMessage("e", "A");
-    msg->insertCustomProperty("GROUP-ID", "b");
-    msg->insertCustomProperty("MY-ID", 10);
-    queue->deliver(msg);
+    queue->deliver(createGroupMessage(10, "b"));
 
     // Queue = a-2, a-9, b-10
     // Owners= ^C3, ^C3, ----
@@ -933,17 +436,17 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Verify that the same default group name is automatically applied to messages that
     // do not specify a group name.
     //
-    FieldTable args;
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-    args.setString("qpid.group_header_key", "GROUP-ID");
-    args.setInt("qpid.shared_msg_group", 1);
-    queue->configure(args);
+    QueueSettings settings;
+    settings.shareGroups = 1;
+    settings.groupKey = "GROUP-ID";
+    QueueFactory factory;
+    Queue::shared_ptr queue(factory.create("my_queue", settings));
 
     for (int i = 0; i < 3; ++i) {
-        intrusive_ptr<Message> msg = createMessage("e", "A");
+        qpid::types::Variant::Map properties;
         // no "GROUP-ID" header
-        msg->insertCustomProperty("MY-ID", i);
-        queue->deliver(msg);
+        properties["MY-ID"] = i;
+        queue->deliver(MessageUtils::createMessage(properties));
     }
 
     // Queue = 0, 1, 2
@@ -956,20 +459,20 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->consume(c1);
     queue->consume(c2);
 
-    std::deque<QueuedMessage> dequeMeC1;
-    std::deque<QueuedMessage> dequeMeC2;
+    std::deque<QueueCursor> dequeMeC1;
+    std::deque<QueueCursor> dequeMeC2;
 
     queue->dispatch(c1);    // c1 now owns default group (acquired 0)
-    dequeMeC1.push_back(c1->last);
-    int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    dequeMeC1.push_back(c1->lastCursor);
+    int id = getIntProperty(c1->lastMessage, "MY-ID");
     BOOST_CHECK_EQUAL( id, 0 );
 
     bool gotOne = queue->dispatch(c2);  // c2 should get nothing
     BOOST_CHECK( !gotOne );
 
     queue->dispatch(c1);    // c1 now acquires 1
-    dequeMeC1.push_back(c1->last);
-    id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    dequeMeC1.push_back(c1->lastCursor);
+    id = getIntProperty(c1->lastMessage, "MY-ID");
     BOOST_CHECK_EQUAL( id, 1 );
 
     gotOne = queue->dispatch(c2);  // c2 should still get nothing
@@ -982,7 +485,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
 
     // now default group should be available...
     queue->dispatch(c2);    // c2 now owns default group (acquired 2)
-    id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    id = c2->lastMessage.getProperty("MY-ID");
     BOOST_CHECK_EQUAL( id, 2 );
 
     gotOne = queue->dispatch(c1);  // c1 should get nothing
@@ -992,556 +495,128 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->cancel(c2);
 }
 
-QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
-
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
-    queue1->create(args);
-    Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
-    queue2->create(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-
-    queue1->deliver(msg1);
-    queue2->deliver(msg1);
-
-    //change mode
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
-    // check they don't get stored twice
-    queue1->setLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    queue1->deliver(msg2);
-    queue2->deliver(msg2);
-
-    queue1->clearLastNodeFailure();
-    queue2->clearLastNodeFailure();
-    // check only new messages get forced
-    queue1->setLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
-
-    // check no failure messages are stored
-    queue1->clearLastNodeFailure();
-    queue2->clearLastNodeFailure();
-
-    intrusive_ptr<Message> msg3 = createMessage("e", "B");
-    queue1->deliver(msg3);
-    queue2->deliver(msg3);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
-    queue1->setLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
-
-    /**
-     * TODO: Fix or replace the following test which incorrectly requeues a
-     * message that was never on the queue in the first place. This relied on
-     * internal details not part of the queue abstraction.
-
-    // check requeue 1
-    intrusive_ptr<Message> msg4 = createMessage("e", "C");
-    intrusive_ptr<Message> msg5 = createMessage("e", "D");
-
-    framing::SequenceNumber sequence(1);
-    QueuedMessage qmsg1(queue1.get(), msg4, sequence);
-    QueuedMessage qmsg2(queue2.get(), msg5, ++sequence);
-
-    queue1->requeue(qmsg1);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-
-    // check requeue 2
-    queue2->clearLastNodeFailure();
-    queue2->requeue(qmsg2);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-
-    queue2->clearLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-    */
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){
-/*
-simulate this:
-  1. start two nodes
-  2. create cluster durable queue and add some messages
-  3. kill one node (trigger force-persistent behaviour)
-  4. stop and recover remaining node
-  5. add another node
-  6. kill that new node again
-make sure that an attempt to re-enqueue a message does not happen which will
-result in the last man standing exiting with an error.
-
-we need to make sure that recover is safe, i.e. messages are
-not requeued to the store.
-*/
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    // set queue mode
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
-    intrusive_ptr<Message> received;
-    queue1->create(args);
-
-    // check requeue 1
-    intrusive_ptr<Message> msg1 = createMessage("e", "C");
-    intrusive_ptr<Message> msg2 = createMessage("e", "D");
-
-    queue1->recover(msg1);
-
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-    queue1->clearLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-    queue1->deliver(msg2);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeJournalError){
-/*
-simulate store exception going into last node standing
-
-*/
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    // set queue mode
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
-    intrusive_ptr<Message> received;
-    queue1->configure(args);
-
-    // check requeue 1
-    intrusive_ptr<Message> msg1 = createMessage("e", "C");
-
-    queue1->deliver(msg1);
-    testStore.createError();
-
-    ScopedSuppressLogging sl; // Suppress messages for expected errors.
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-}
-
-intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false)
-{
-    intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable);
-    if (content.size()) MessageUtils::addContent(msg, content);
-    msg->setStore(&store);
-    return msg;
-}
-
-QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
-
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args0; // No size policy
-    client::QueueOptions args1;
-    args1.setSizePolicy(FLOW_TO_DISK, 0, 1);
-    client::QueueOptions args2;
-    args2.setSizePolicy(FLOW_TO_DISK, 0, 2);
-
-    // --- Fanout exchange bound to single transient queue -------------------------------------------------------------
-
-    FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue
-    Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit
-    tq1->configure(args1);
-    sbtFanout1.bind(tq1, "", 0);
-
-    intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg01(msg01);
-    sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit
-    msg01->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg02(msg02);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException);
-    }
-    msg02->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg03(msg03);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException);
-    }
-    msg03->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
-    DeliverableMessage dmsg04(msg04);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException);
-    }
-    msg04->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
-    DeliverableMessage dmsg05(msg05);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException);
-    }
-    msg05->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    // --- Fanout exchange bound to single durable queue ---------------------------------------------------------------
-
-    FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue
-    Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit
-    dq2->configure(args1);
-    sbdFanout2.bind(dq2, "", 0);
-
-    intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg06(msg06);
-    sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit
-    msg06->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg07(msg07);
-    sbdFanout2.route(dmsg07);
-    msg07->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg08(msg08);
-    sbdFanout2.route(dmsg08);
-    msg08->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg09 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg09(msg09);
-    sbdFanout2.route(dmsg09);
-    msg09->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg10(msg10);
-    sbdFanout2.route(dmsg10);
-    msg10->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
-
-    // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------
-
-    FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues
-    Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2
-    dq3->configure(args2);
-    mbdFanout3.bind(dq3, "", 0);
-    Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1
-    dq4->configure(args1);
-    mbdFanout3.bind(dq4, "", 0);
-    Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit
-    dq5->configure(args0);
-    mbdFanout3.bind(dq5, "", 0);
-
-    intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg11(msg11);
-    mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit
-    msg11->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg12(msg12);
-    mbdFanout3.route(dmsg12);
-    msg12->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
-    BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(2u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(2u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg13(msg13);
-    mbdFanout3.route(dmsg13);
-    msg13->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(3u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(3u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg14 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg14(msg14);
-    mbdFanout3.route(dmsg14);
-    msg14->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
-    BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(4u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(4u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg15(msg15);
-    mbdFanout3.route(dmsg15);
-    msg15->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(5u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(5u, dq5->getMessageCount());
-
-    // Bind a transient queue, this should block the release of any further messages.
-    // Note: this will result in a violation of the count policy of dq3 and dq4 - but this
-    // is expected until a better overall multi-queue design is implemented. Similarly
-    // for the other tests in this section.
-
-    Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit
-    tq6->configure(args0);
-    mbdFanout3.bind(tq6, "", 0);
-
-    intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg16(msg16);
-    mbdFanout3.route(dmsg16);
-    msg16->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(6u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(6u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg17(msg17);
-    mbdFanout3.route(dmsg17);
-    msg17->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(7u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(7u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg18 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg18(msg18);
-    mbdFanout3.route(dmsg18);
-    msg18->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(8u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(8u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg19(msg19);
-    mbdFanout3.route(dmsg19);
-    msg19->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(9u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(9u, dq5->getMessageCount());
-
-
-    // --- Fanout exchange bound to multiple durable and transient queues ----------------------------------------------
-
-    FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues
-    Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit
-    dq7->configure(args0);
-    mbmFanout4.bind(dq7, "", 0);
-    Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit
-    dq8->configure(args1);
-    mbmFanout4.bind(dq8, "", 0);
-    Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit
-    tq9->configure(args0);
-    mbmFanout4.bind(tq9, "", 0);
-
-    intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg20(msg20);
-    mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit
-    msg20->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, dq8->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg21(msg21);
-    mbmFanout4.route(dmsg21);
-    msg21->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(2u, dq8->getMessageCount());
-    BOOST_CHECK_EQUAL(2u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg22(msg22);
-    mbmFanout4.route(dmsg22);
-    msg22->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(3u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg23 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg23(msg23);
-    mbmFanout4.route(dmsg23);
-    msg23->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(4u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg24(msg24);
-    mbmFanout4.route(dmsg24);
-    msg24->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
-}
-
 QPID_AUTO_TEST_CASE(testSetPositionFifo) {
     Queue::shared_ptr q(new Queue("my-queue", true));
     BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
     for (int i = 0; i < 10; ++i)
-        q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+        q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1)));
 
     // Verify the front of the queue
     TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1
-    BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1
+    BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
+
     // Verify the back of the queue
-    QueuedMessage qm;
     BOOST_CHECK_EQUAL(10u, q->getPosition());
-    BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
-    BOOST_CHECK_EQUAL("10", getContent(qm.payload));
     BOOST_CHECK_EQUAL(10u, q->getMessageCount());
 
     // Using setPosition to introduce a gap in sequence numbers.
     q->setPosition(15);
     BOOST_CHECK_EQUAL(10u, q->getMessageCount());
     BOOST_CHECK_EQUAL(15u, q->getPosition());
-    BOOST_CHECK(q->find(10, qm)); // Back of the queue
-    BOOST_CHECK_EQUAL("10", getContent(qm.payload));
-    q->deliver(contentMessage("16"));
-    c->setPosition(9);
+    q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16"));
+
+    q->seek(*c, Queue::MessagePredicate(), 9);
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(10u, c->last.position);
-    BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("10", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(16u, c->last.position);
-    BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("16", c->lastMessage.getContent());
 
     // Using setPosition to trunkcate the queue
     q->setPosition(5);
     BOOST_CHECK_EQUAL(5u, q->getMessageCount());
-    q->deliver(contentMessage("6a"));
-    c->setPosition(4);
+    q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a"));
+    c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+    q->seek(*c, Queue::MessagePredicate(), 4);
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(5u, c->last.position);
-    BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("5", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(6u, c->last.position);
-    BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent());
     BOOST_CHECK(!q->dispatch(c)); // No more messages.
 }
 
 QPID_AUTO_TEST_CASE(testSetPositionLvq) {
-    Queue::shared_ptr q(new Queue("my-queue", true));
+    QueueSettings settings;
     string key="key";
-    framing::FieldTable args;
-    args.setString("qpid.last_value_queue_key", "key");
-    q->configure(args);
+    settings.lvqKey = key;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
     const char* values[] = { "a", "b", "c", "a", "b", "c" };
     for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
-        intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
-        m->insertCustomProperty(key, values[i]);
-        q->deliver(m);
+        qpid::types::Variant::Map properties;
+        properties[key] = values[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
     }
     BOOST_CHECK_EQUAL(3u, q->getMessageCount());
     // Verify the front of the queue
     TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1
-    BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1
+    BOOST_CHECK_EQUAL("4", c->lastMessage.getContent());
     // Verify the back of the queue
-    QueuedMessage qm;
     BOOST_CHECK_EQUAL(6u, q->getPosition());
-    BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
-    BOOST_CHECK_EQUAL("6", getContent(qm.payload));
 
     q->setPosition(5);
-    c->setPosition(4);
+
+    c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+    q->seek(*c, Queue::MessagePredicate(), 4);
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1
+    BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1
     BOOST_CHECK(!q->dispatch(c));
 }
 
 QPID_AUTO_TEST_CASE(testSetPositionPriority) {
-    Queue::shared_ptr q(new Queue("my-queue", true));
-    framing::FieldTable args;
-    args.setInt("qpid.priorities", 10);
-    q->configure(args);
+    QueueSettings settings;
+    settings.priorities = 10;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
     const int priorities[] = { 1, 2, 3, 2, 1, 3 };
     for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
-        intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
-        m->getFrames().getHeaders()->get<DeliveryProperties>(true)
-            ->setPriority(priorities[i]);
-        q->deliver(m);
+        qpid::types::Variant::Map properties;
+        properties["priority"] = priorities[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
     }
 
     // Truncation removes messages in fifo order, not priority order.
     q->setPosition(3);
-    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(1u, c->last.position);
+    BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(2u, c->last.position);
+    BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(3u, c->last.position);
+    BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
     BOOST_CHECK(!q->dispatch(c));
 
-    intrusive_ptr<Message> m = contentMessage("4a");
-    m->getFrames().getHeaders()->get<DeliveryProperties>(true)
-        ->setPriority(4);
-    q->deliver(m);
+    qpid::types::Variant::Map properties;
+    properties["priority"] = 4;
+    q->deliver(MessageUtils::createMessage(properties, "4a"));
+
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(4u, c->last.position);
-    BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
 
     // But consumers see priority order
     c.reset(new TestConsumer("test", true));
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(4u, c->last.position);
-    BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(3u, c->last.position);
-    BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("3", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(2u, c->last.position);
-    BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("2", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(1u, c->last.position);
-    BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: qpid/branches/asyncstore/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/TxMocks.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/TxMocks.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/TxMocks.h Mon Aug 27 15:40:33 2012
@@ -119,8 +119,6 @@ public:
         assertEqualVector(expected, actual);
     }
 
-    void accept(TxOpConstVisitor&) const {}
-
     ~MockTxOp(){}
 };
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org