You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [8/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Aug 10 12:04:27 2012
@@ -38,8 +38,8 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
 
 #include <iostream>
 #include <vector>
@@ -56,196 +56,47 @@ using namespace qpid::sys;
 
 namespace qpid {
 namespace tests {
-
 class TestConsumer : public virtual Consumer{
 public:
     typedef boost::shared_ptr<TestConsumer> shared_ptr;
 
-    QueuedMessage last;
+    QueueCursor lastCursor;
+    Message lastMessage;
     bool received;
-    TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
+    TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {};
 
-    virtual bool deliver(QueuedMessage& msg){
-        last = msg;
+    virtual bool deliver(const QueueCursor& cursor, const Message& message){
+        lastCursor = cursor;
+        lastMessage = message;
         received = true;
         return true;
     };
     void notify() {}
     void cancel() {}
-    void acknowledged(const QueuedMessage&) {}
+    void acknowledged(const DeliveryRecord&) {}
     OwnershipToken* getSession() { return 0; }
 };
 
 class FailOnDeliver : public Deliverable
 {
-    boost::intrusive_ptr<Message> msg;
+    Message msg;
 public:
     FailOnDeliver() : msg(MessageUtils::createMessage()) {}
     void deliverTo(const boost::shared_ptr<Queue>& queue)
     {
         throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
     }
-    Message& getMessage() { return *(msg.get()); }
+    Message& getMessage() { return msg; }
 };
 
-intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
-    intrusive_ptr<Message> msg(new Message());
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
-    msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
-    if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
-    return msg;
-}
-
-intrusive_ptr<Message> contentMessage(string content) {
-    intrusive_ptr<Message> m(MessageUtils::createMessage());
-    MessageUtils::addContent(m, content);
-    return m;
-}
-
-string getContent(intrusive_ptr<Message> m) {
-    return m->getFrames().getContent();
-}
-
 QPID_AUTO_TEST_SUITE(QueueTestSuite)
 
-QPID_AUTO_TEST_CASE(testAsyncMessage) {
-    Queue::shared_ptr queue(new Queue("my_test_queue", true));
-    intrusive_ptr<Message> received;
-
-    TestConsumer::shared_ptr c1(new TestConsumer());
-    queue->consume(c1);
-
-
-    //Test basic delivery:
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
-    queue->process(msg1);
-    sleep(2);
-
-    BOOST_CHECK(!c1->received);
-    msg1->enqueueComplete();
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg1.get(), received.get());
-}
-
-
-QPID_AUTO_TEST_CASE(testAsyncMessageCount){
-    Queue::shared_ptr queue(new Queue("my_test_queue", true));
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
-
-    queue->process(msg1);
-    sleep(2);
-    uint32_t compval=0;
-    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
-    msg1->enqueueComplete();
-    compval=1;
-    BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
-    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
-}
-
-QPID_AUTO_TEST_CASE(testConsumers){
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-
-    //Test adding consumers:
-    TestConsumer::shared_ptr c1(new TestConsumer());
-    TestConsumer::shared_ptr c2(new TestConsumer());
-    queue->consume(c1);
-    queue->consume(c2);
-
-    BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
-
-    //Test basic delivery:
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
-    queue->deliver(msg1);
-    BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
-
-    queue->deliver(msg2);
-    BOOST_CHECK(queue->dispatch(c2));
-    BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
-
-    c1->received = false;
-    queue->deliver(msg3);
-    BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
-
-    //Test cancellation:
-    queue->cancel(c1);
-    BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount());
-    queue->cancel(c2);
-    BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount());
-}
-
-QPID_AUTO_TEST_CASE(testRegistry){
-    //Test use of queues in registry:
-    QueueRegistry registry;
-    registry.declare("queue1", true, true);
-    registry.declare("queue2", true, true);
-    registry.declare("queue3", true, true);
-
-    BOOST_CHECK(registry.find("queue1"));
-    BOOST_CHECK(registry.find("queue2"));
-    BOOST_CHECK(registry.find("queue3"));
-
-    registry.destroy("queue1");
-    registry.destroy("queue2");
-    registry.destroy("queue3");
-
-    BOOST_CHECK(!registry.find("queue1"));
-    BOOST_CHECK(!registry.find("queue2"));
-    BOOST_CHECK(!registry.find("queue3"));
-}
-
-QPID_AUTO_TEST_CASE(testDequeue){
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-    intrusive_ptr<Message> received;
-
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-
-    BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg1.get(), received.get());
-    BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg2.get(), received.get());
-    BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
-
-    TestConsumer::shared_ptr consumer(new TestConsumer());
-    queue->consume(consumer);
-    queue->dispatch(consumer);
-    if (!consumer->received)
-        sleep(2);
-
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
-    BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
-    received = queue->get().payload;
-    BOOST_CHECK(!received);
-    BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
-}
-
 QPID_AUTO_TEST_CASE(testBound){
     //test the recording of bindings, and use of those to allow a queue to be unbound
     string key("my-key");
     FieldTable args;
 
-    Queue::shared_ptr queue(new Queue("my-queue", true));
+    Queue::shared_ptr queue(new Queue("my-queue"));
     ExchangeRegistry exchanges;
     //establish bindings from exchange->queue and notify the queue as it is bound:
     Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
@@ -273,423 +124,69 @@ QPID_AUTO_TEST_CASE(testBound){
     exchange3->route(deliverable);
 }
 
-QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
-    client::QueueOptions args;
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue(new Queue("my-queue", true));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
-    //enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-
-    //change mode
-    queue->setLastNodeFailure();
-
-    //enqueue 1 message
-    queue->deliver(msg3);
-
-    //check all have persistent ids.
-    BOOST_CHECK(msg1->isPersistent());
-    BOOST_CHECK(msg2->isPersistent());
-    BOOST_CHECK(msg3->isPersistent());
-
-}
-
-
-QPID_AUTO_TEST_CASE(testSeek){
-
-    Queue::shared_ptr queue(new Queue("my-queue", true));
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
+QPID_AUTO_TEST_CASE(testLVQ){
 
-    //enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-
-    TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
-    SequenceNumber seq(2);
-    consumer->setPosition(seq);
-
-    QueuedMessage qm;
-    queue->dispatch(consumer);
-
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
-    queue->dispatch(consumer);
-    queue->dispatch(consumer); // make sure over-run is safe
-
-}
-
-QPID_AUTO_TEST_CASE(testSearch){
-
-    Queue::shared_ptr queue(new Queue("my-queue", true));
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
-    //enqueue 2 messages
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-
-    SequenceNumber seq(2);
-    QueuedMessage qm;
-    TestConsumer::shared_ptr c1(new TestConsumer());
-
-    BOOST_CHECK(queue->find(seq, qm));
-
-    BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
-    queue->acquire(qm, c1->getName());
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-    SequenceNumber seq1(3);
-    QueuedMessage qm1;
-    BOOST_CHECK(queue->find(seq1, qm1));
-    BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
-}
-const std::string nullxid = "";
-
-class SimpleDummyCtxt : public TransactionContext {};
+    QueueSettings settings;
+    string key="key";
+    settings.lvqKey = key;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
-class DummyCtxt : public TPCTransactionContext
-{
-    const std::string xid;
-  public:
-    DummyCtxt(const std::string& _xid) : xid(_xid) {}
-    static std::string getXid(TransactionContext& ctxt)
-    {
-        DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
-        return c ? c->xid : nullxid;
+    const char* values[] = { "a", "b", "c", "a"};
+    for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+        qpid::types::Variant::Map properties;
+        properties[key] = values[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
     }
-};
+    BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
 
-class TestMessageStoreOC : public MessageStore
-{
-    std::set<std::string> prepared;
-    uint64_t nextPersistenceId;
-  public:
-
-    uint enqCnt;
-    uint deqCnt;
-    bool error;
-
-    TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
-    ~TestMessageStoreOC(){}
-
-    virtual void dequeue(TransactionContext*,
-                 const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
-                 const PersistableQueue& /*queue*/)
-    {
-        if (error) throw Exception("Dequeue error test");
-        deqCnt++;
-    }
+    TestConsumer::shared_ptr c(new TestConsumer("test", true));
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent());
 
-    virtual void enqueue(TransactionContext*,
-                 const boost::intrusive_ptr<PersistableMessage>& msg,
-                 const PersistableQueue& /* queue */)
-    {
-        if (error) throw Exception("Enqueue error test");
-        enqCnt++;
-        msg->enqueueComplete();
-    }
 
-    void createError()
-    {
-        error=true;
+    const char* values2[] = { "a", "b", "c"};
+    for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) {
+        qpid::types::Variant::Map properties;
+        properties[key] = values[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5)));
     }
+    BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
 
-    bool init(const Options*) { return true; }
-    void truncateInit(const bool) {}
-    void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
-    void destroy(PersistableQueue&) {}
-    void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
-    void destroy(const PersistableExchange&) {}
-    void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
-    void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
-    void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
-    void destroy(const PersistableConfig&) {}
-    void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
-    void destroy(PersistableMessage&) {}
-    void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
-    void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
-                    std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
-    void flush(const qpid::broker::PersistableQueue&) {}
-    uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
-
-    std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
-    std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
-    void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
-    void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
-    void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
-    void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
-
-    void recover(RecoveryManager&) {}
-};
-
-
-QPID_AUTO_TEST_CASE(testLVQOrdering){
-
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-
-    Queue::shared_ptr queue(new Queue("my-queue", true ));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-    intrusive_ptr<Message> msg4 = createMessage("e", "D");
-    intrusive_ptr<Message> received;
-
-    //set deliever match for LVQ a,b,c,a
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"b");
-    msg3->insertCustomProperty(key,"c");
-    msg4->insertCustomProperty(key,"a");
-
-    //enqueue 4 message
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-    queue->deliver(msg4);
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg3.get(), received.get());
-
-    intrusive_ptr<Message> msg5 = createMessage("e", "A");
-    intrusive_ptr<Message> msg6 = createMessage("e", "B");
-    intrusive_ptr<Message> msg7 = createMessage("e", "C");
-    msg5->insertCustomProperty(key,"a");
-    msg6->insertCustomProperty(key,"b");
-    msg7->insertCustomProperty(key,"c");
-    queue->deliver(msg5);
-    queue->deliver(msg6);
-    queue->deliver(msg7);
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg5.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg6.get(), received.get());
-
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg7.get(), received.get());
-
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent());
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent());
 }
 
 QPID_AUTO_TEST_CASE(testLVQEmptyKey){
 
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-
-    Queue::shared_ptr queue(new Queue("my-queue", true ));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
-    msg1->insertCustomProperty(key,"a");
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQAcquire){
-
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-    // disable flow control, as this test violates the enqueue/dequeue sequence.
-    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
-
-    Queue::shared_ptr queue(new Queue("my-queue", true ));
-    queue->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    intrusive_ptr<Message> msg3 = createMessage("e", "C");
-    intrusive_ptr<Message> msg4 = createMessage("e", "D");
-    intrusive_ptr<Message> msg5 = createMessage("e", "F");
-    intrusive_ptr<Message> msg6 = createMessage("e", "G");
-
-    //set deliever match for LVQ a,b,c,a
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"b");
-    msg3->insertCustomProperty(key,"c");
-    msg4->insertCustomProperty(key,"a");
-    msg5->insertCustomProperty(key,"b");
-    msg6->insertCustomProperty(key,"c");
-
-    //enqueue 4 message
-    queue->deliver(msg1);
-    queue->deliver(msg2);
-    queue->deliver(msg3);
-    queue->deliver(msg4);
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    framing::SequenceNumber sequence(1);
-    QueuedMessage qmsg(queue.get(), msg1, sequence);
-    QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
-    framing::SequenceNumber sequence1(10);
-    QueuedMessage qmsg3(queue.get(), 0, sequence1);
-    TestConsumer::shared_ptr dummy(new TestConsumer());
-
-    BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
-    BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
-    // Acquire the massage again to test failure case.
-    BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
-    BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
-
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
-    queue->deliver(msg5);
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    // set mode to no browse and check
-    args.setOrdering(client::LVQ_NO_BROWSE);
-    queue->configure(args);
-    TestConsumer::shared_ptr c1(new TestConsumer("test", false));
-
-    queue->dispatch(c1);
-    queue->dispatch(c1);
-    queue->dispatch(c1);
-
-    queue->deliver(msg6);
-    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
-    intrusive_ptr<Message> received;
-    received = queue->get().payload;
-    BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQMultiQueue){
-
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true ));
-    Queue::shared_ptr queue2(new Queue("my-queue", true ));
-    intrusive_ptr<Message> received;
-    queue1->configure(args);
-    queue2->configure(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "A");
-
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"a");
-
-    queue1->deliver(msg1);
-    queue2->deliver(msg1);
-    queue1->deliver(msg2);
-
-    received = queue1->get().payload;
-    BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
-    received = queue2->get().payload;
-    BOOST_CHECK_EQUAL(msg1.get(), received.get());
-
-}
+    QueueSettings settings;
+    string key="key";
+    settings.lvqKey = key;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
-QPID_AUTO_TEST_CASE(testLVQRecover){
 
-/* simulate this
-  1. start 2 nodes
-  2. create cluster durable lvq
-  3. send a transient message to the queue
-  4. kill one of the nodes (to trigger force persistent behaviour)...
-  5. then restart it (to turn off force persistent behaviour)
-  6. send another transient message with same lvq key as in 3
-  7. kill the second node again (retrigger force persistent)
-  8. stop and recover the first node
-*/
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    // set queue mode
-    args.setOrdering(client::LVQ);
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
-    intrusive_ptr<Message> received;
-    queue1->create(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-    intrusive_ptr<Message> msg2 = createMessage("e", "A");
-    // 2
-    string key;
-    args.getLVQKey(key);
-    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-    msg1->insertCustomProperty(key,"a");
-    msg2->insertCustomProperty(key,"a");
-    // 3
-    queue1->deliver(msg1);
-    // 4
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    // 5
-    queue1->clearLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    // 6
-    queue1->deliver(msg2);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-    BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
+    qpid::types::Variant::Map properties;
+    properties["key"] = "a";
+    q->deliver(MessageUtils::createMessage(properties, "one"));
+    properties.clear();
+    q->deliver(MessageUtils::createMessage(properties, "two"));
+    BOOST_CHECK_EQUAL(q->getMessageCount(), 2u);
 }
 
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
 {
     for (uint i = 0; i < count; i++) {
-        intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
-        m->computeExpiration(new broker::ExpiryPolicy);
+        Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
+        m.computeExpiration(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
 }
@@ -706,7 +203,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
 QPID_AUTO_TEST_CASE(testQueueCleaner) {
     Timer timer;
     QueueRegistry queues;
-    Queue::shared_ptr queue = queues.declare("my-queue").first;
+    Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
     addMessagesToQueue(10, *queue, 200, 400);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
 
@@ -717,44 +214,57 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
 }
-
-
 namespace {
-    // helper for group tests
-    void verifyAcquire( Queue::shared_ptr queue,
-                        TestConsumer::shared_ptr c,
-                        std::deque<QueuedMessage>& results,
-                        const std::string& expectedGroup,
-                        const int expectedId )
-    {
-        BOOST_CHECK(queue->dispatch(c));
-        results.push_back(c->last);
-        std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
-        int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+int getIntProperty(const Message& message, const std::string& key)
+{
+    qpid::types::Variant v = message.getProperty(key);
+    int i(0);
+    if (!v.isVoid()) i = v;
+    return i;
+}
+// helper for group tests
+void verifyAcquire( Queue::shared_ptr queue,
+                    TestConsumer::shared_ptr c,
+                    std::deque<QueueCursor>& results,
+                    const std::string& expectedGroup,
+                    const int expectedId )
+{
+    bool success = queue->dispatch(c);
+    BOOST_CHECK(success);
+    if (success) {
+        results.push_back(c->lastCursor);
+        std::string group = c->lastMessage.getPropertyAsString("GROUP-ID");
+        int id = getIntProperty(c->lastMessage, "MY-ID");
         BOOST_CHECK_EQUAL( group, expectedGroup );
         BOOST_CHECK_EQUAL( id, expectedId );
     }
 }
 
+Message createGroupMessage(int id, const std::string& group)
+{
+    qpid::types::Variant::Map properties;
+    properties["GROUP-ID"] = group;
+    properties["MY-ID"] = id;
+    return MessageUtils::createMessage(properties);
+}
+}
+
 QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
     //
     // Verify that consumers of grouped messages own the groups once a message is acquired,
     // and release the groups once all acquired messages have been dequeued or requeued
     //
-    FieldTable args;
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-    args.setString("qpid.group_header_key", "GROUP-ID");
-    args.setInt("qpid.shared_msg_group", 1);
-    queue->configure(args);
+    QueueSettings settings;
+    settings.shareGroups = 1;
+    settings.groupKey = "GROUP-ID";
+    QueueFactory factory;
+    Queue::shared_ptr queue(factory.create("my_queue", settings));
 
     std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
                              std::string("b"), std::string("b"), std::string("b"),
                              std::string("c"), std::string("c"), std::string("c") };
     for (int i = 0; i < 9; ++i) {
-        intrusive_ptr<Message> msg = createMessage("e", "A");
-        msg->insertCustomProperty("GROUP-ID", groups[i]);
-        msg->insertCustomProperty("MY-ID", i);
-        queue->deliver(msg);
+        queue->deliver(createGroupMessage(i, groups[i]));
     }
 
     // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
@@ -768,8 +278,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->consume(c1);
     queue->consume(c2);
 
-    std::deque<QueuedMessage> dequeMeC1;
-    std::deque<QueuedMessage> dequeMeC2;
+    std::deque<QueueCursor> dequeMeC1;
+    std::deque<QueueCursor> dequeMeC2;
 
 
     verifyAcquire(queue, c1, dequeMeC1, "a", 0 );  // c1 now owns group "a" (acquire a-0)
@@ -828,9 +338,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
 
     // what happens if C-2 "requeues" a-1 and a-2?
-    queue->requeue( dequeMeC2.front() );
+    queue->release( dequeMeC2.front() );
     dequeMeC2.pop_front();
-    queue->requeue( dequeMeC2.front() );
+    queue->release( dequeMeC2.front() );
     dequeMeC2.pop_front();  // now just has c-7 acquired
 
     // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
@@ -855,9 +365,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     gotOne = queue->dispatch(c2);
     BOOST_CHECK( !gotOne );
 
-    // requeue all of C1's acquired messages, then cancel C1
+    // release all of C1's acquired messages, then cancel C1
     while (!dequeMeC1.empty()) {
-        queue->requeue(dequeMeC1.front());
+        queue->release(dequeMeC1.front());
         dequeMeC1.pop_front();
     }
     queue->cancel(c1);
@@ -877,7 +387,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Owners= ---, ---, ---
 
     TestConsumer::shared_ptr c3(new TestConsumer("C3"));
-    std::deque<QueuedMessage> dequeMeC3;
+    std::deque<QueueCursor> dequeMeC3;
 
     verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
     verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
@@ -897,11 +407,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
 
     // Queue = a-2,
     // Owners= ^C3,
-
-    intrusive_ptr<Message> msg = createMessage("e", "A");
-    msg->insertCustomProperty("GROUP-ID", "a");
-    msg->insertCustomProperty("MY-ID", 9);
-    queue->deliver(msg);
+    queue->deliver(createGroupMessage(9, "a"));
 
     // Queue = a-2, a-9
     // Owners= ^C3, ^C3
@@ -909,10 +415,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     gotOne = queue->dispatch(c2);
     BOOST_CHECK( !gotOne );
 
-    msg = createMessage("e", "A");
-    msg->insertCustomProperty("GROUP-ID", "b");
-    msg->insertCustomProperty("MY-ID", 10);
-    queue->deliver(msg);
+    queue->deliver(createGroupMessage(10, "b"));
 
     // Queue = a-2, a-9, b-10
     // Owners= ^C3, ^C3, ----
@@ -933,17 +436,17 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Verify that the same default group name is automatically applied to messages that
     // do not specify a group name.
     //
-    FieldTable args;
-    Queue::shared_ptr queue(new Queue("my_queue", true));
-    args.setString("qpid.group_header_key", "GROUP-ID");
-    args.setInt("qpid.shared_msg_group", 1);
-    queue->configure(args);
+    QueueSettings settings;
+    settings.shareGroups = 1;
+    settings.groupKey = "GROUP-ID";
+    QueueFactory factory;
+    Queue::shared_ptr queue(factory.create("my_queue", settings));
 
     for (int i = 0; i < 3; ++i) {
-        intrusive_ptr<Message> msg = createMessage("e", "A");
+        qpid::types::Variant::Map properties;
         // no "GROUP-ID" header
-        msg->insertCustomProperty("MY-ID", i);
-        queue->deliver(msg);
+        properties["MY-ID"] = i;
+        queue->deliver(MessageUtils::createMessage(properties));
     }
 
     // Queue = 0, 1, 2
@@ -956,20 +459,20 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->consume(c1);
     queue->consume(c2);
 
-    std::deque<QueuedMessage> dequeMeC1;
-    std::deque<QueuedMessage> dequeMeC2;
+    std::deque<QueueCursor> dequeMeC1;
+    std::deque<QueueCursor> dequeMeC2;
 
     queue->dispatch(c1);    // c1 now owns default group (acquired 0)
-    dequeMeC1.push_back(c1->last);
-    int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    dequeMeC1.push_back(c1->lastCursor);
+    int id = getIntProperty(c1->lastMessage, "MY-ID");
     BOOST_CHECK_EQUAL( id, 0 );
 
     bool gotOne = queue->dispatch(c2);  // c2 should get nothing
     BOOST_CHECK( !gotOne );
 
     queue->dispatch(c1);    // c1 now acquires 1
-    dequeMeC1.push_back(c1->last);
-    id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    dequeMeC1.push_back(c1->lastCursor);
+    id = getIntProperty(c1->lastMessage, "MY-ID");
     BOOST_CHECK_EQUAL( id, 1 );
 
     gotOne = queue->dispatch(c2);  // c2 should still get nothing
@@ -982,7 +485,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
 
     // now default group should be available...
     queue->dispatch(c2);    // c2 now owns default group (acquired 2)
-    id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    id = c2->lastMessage.getProperty("MY-ID");
     BOOST_CHECK_EQUAL( id, 2 );
 
     gotOne = queue->dispatch(c1);  // c1 should get nothing
@@ -992,556 +495,128 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->cancel(c2);
 }
 
-QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
-
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
-    queue1->create(args);
-    Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
-    queue2->create(args);
-
-    intrusive_ptr<Message> msg1 = createMessage("e", "A");
-
-    queue1->deliver(msg1);
-    queue2->deliver(msg1);
-
-    //change mode
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
-    // check they don't get stored twice
-    queue1->setLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
-    intrusive_ptr<Message> msg2 = createMessage("e", "B");
-    queue1->deliver(msg2);
-    queue2->deliver(msg2);
-
-    queue1->clearLastNodeFailure();
-    queue2->clearLastNodeFailure();
-    // check only new messages get forced
-    queue1->setLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
-
-    // check no failure messages are stored
-    queue1->clearLastNodeFailure();
-    queue2->clearLastNodeFailure();
-
-    intrusive_ptr<Message> msg3 = createMessage("e", "B");
-    queue1->deliver(msg3);
-    queue2->deliver(msg3);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
-    queue1->setLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
-
-    /**
-     * TODO: Fix or replace the following test which incorrectly requeues a
-     * message that was never on the queue in the first place. This relied on
-     * internal details not part of the queue abstraction.
-
-    // check requeue 1
-    intrusive_ptr<Message> msg4 = createMessage("e", "C");
-    intrusive_ptr<Message> msg5 = createMessage("e", "D");
-
-    framing::SequenceNumber sequence(1);
-    QueuedMessage qmsg1(queue1.get(), msg4, sequence);
-    QueuedMessage qmsg2(queue2.get(), msg5, ++sequence);
-
-    queue1->requeue(qmsg1);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-
-    // check requeue 2
-    queue2->clearLastNodeFailure();
-    queue2->requeue(qmsg2);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-
-    queue2->clearLastNodeFailure();
-    queue2->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-    */
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){
-/*
-simulate this:
-  1. start two nodes
-  2. create cluster durable queue and add some messages
-  3. kill one node (trigger force-persistent behaviour)
-  4. stop and recover remaining node
-  5. add another node
-  6. kill that new node again
-make sure that an attempt to re-enqueue a message does not happen which will
-result in the last man standing exiting with an error.
-
-we need to make sure that recover is safe, i.e. messages are
-not requeued to the store.
-*/
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    // set queue mode
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
-    intrusive_ptr<Message> received;
-    queue1->create(args);
-
-    // check requeue 1
-    intrusive_ptr<Message> msg1 = createMessage("e", "C");
-    intrusive_ptr<Message> msg2 = createMessage("e", "D");
-
-    queue1->recover(msg1);
-
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-    queue1->clearLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-    queue1->deliver(msg2);
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeJournalError){
-/*
-simulate store exception going into last node standing
-
-*/
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args;
-    // set queue mode
-    args.setPersistLastNode();
-
-    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
-    intrusive_ptr<Message> received;
-    queue1->configure(args);
-
-    // check requeue 1
-    intrusive_ptr<Message> msg1 = createMessage("e", "C");
-
-    queue1->deliver(msg1);
-    testStore.createError();
-
-    ScopedSuppressLogging sl; // Suppress messages for expected errors.
-    queue1->setLastNodeFailure();
-    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-}
-
-intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false)
-{
-    intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable);
-    if (content.size()) MessageUtils::addContent(msg, content);
-    msg->setStore(&store);
-    return msg;
-}
-
-QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
-
-    TestMessageStoreOC  testStore;
-    client::QueueOptions args0; // No size policy
-    client::QueueOptions args1;
-    args1.setSizePolicy(FLOW_TO_DISK, 0, 1);
-    client::QueueOptions args2;
-    args2.setSizePolicy(FLOW_TO_DISK, 0, 2);
-
-    // --- Fanout exchange bound to single transient queue -------------------------------------------------------------
-
-    FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue
-    Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit
-    tq1->configure(args1);
-    sbtFanout1.bind(tq1, "", 0);
-
-    intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg01(msg01);
-    sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit
-    msg01->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg02(msg02);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException);
-    }
-    msg02->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg03(msg03);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException);
-    }
-    msg03->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
-    DeliverableMessage dmsg04(msg04);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException);
-    }
-    msg04->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
-    DeliverableMessage dmsg05(msg05);
-    {
-        ScopedSuppressLogging sl; // suppress expected error messages.
-        BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException);
-    }
-    msg05->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
-    // --- Fanout exchange bound to single durable queue ---------------------------------------------------------------
-
-    FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue
-    Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit
-    dq2->configure(args1);
-    sbdFanout2.bind(dq2, "", 0);
-
-    intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg06(msg06);
-    sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit
-    msg06->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg07(msg07);
-    sbdFanout2.route(dmsg07);
-    msg07->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg08(msg08);
-    sbdFanout2.route(dmsg08);
-    msg08->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg09 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg09(msg09);
-    sbdFanout2.route(dmsg09);
-    msg09->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
-
-    intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg10(msg10);
-    sbdFanout2.route(dmsg10);
-    msg10->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
-
-    // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------
-
-    FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues
-    Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2
-    dq3->configure(args2);
-    mbdFanout3.bind(dq3, "", 0);
-    Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1
-    dq4->configure(args1);
-    mbdFanout3.bind(dq4, "", 0);
-    Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit
-    dq5->configure(args0);
-    mbdFanout3.bind(dq5, "", 0);
-
-    intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg11(msg11);
-    mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit
-    msg11->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg12(msg12);
-    mbdFanout3.route(dmsg12);
-    msg12->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
-    BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(2u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(2u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg13(msg13);
-    mbdFanout3.route(dmsg13);
-    msg13->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(3u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(3u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg14 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg14(msg14);
-    mbdFanout3.route(dmsg14);
-    msg14->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
-    BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(4u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(4u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg15(msg15);
-    mbdFanout3.route(dmsg15);
-    msg15->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
-    BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(5u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(5u, dq5->getMessageCount());
-
-    // Bind a transient queue, this should block the release of any further messages.
-    // Note: this will result in a violation of the count policy of dq3 and dq4 - but this
-    // is expected until a better overall multi-queue design is implemented. Similarly
-    // for the other tests in this section.
-
-    Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit
-    tq6->configure(args0);
-    mbdFanout3.bind(tq6, "", 0);
-
-    intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg16(msg16);
-    mbdFanout3.route(dmsg16);
-    msg16->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(6u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(6u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg17(msg17);
-    mbdFanout3.route(dmsg17);
-    msg17->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(7u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(7u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg18 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg18(msg18);
-    mbdFanout3.route(dmsg18);
-    msg18->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(8u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(8u, dq5->getMessageCount());
-
-    intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg19(msg19);
-    mbdFanout3.route(dmsg19);
-    msg19->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
-    BOOST_CHECK_EQUAL(9u, dq4->getMessageCount());
-    BOOST_CHECK_EQUAL(9u, dq5->getMessageCount());
-
-
-    // --- Fanout exchange bound to multiple durable and transient queues ----------------------------------------------
-
-    FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues
-    Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit
-    dq7->configure(args0);
-    mbmFanout4.bind(dq7, "", 0);
-    Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit
-    dq8->configure(args1);
-    mbmFanout4.bind(dq8, "", 0);
-    Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit
-    tq9->configure(args0);
-    mbmFanout4.bind(tq9, "", 0);
-
-    intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg20(msg20);
-    mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit
-    msg20->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, dq8->getMessageCount());
-    BOOST_CHECK_EQUAL(1u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
-    DeliverableMessage dmsg21(msg21);
-    mbmFanout4.route(dmsg21);
-    msg21->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(2u, dq8->getMessageCount());
-    BOOST_CHECK_EQUAL(2u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
-    DeliverableMessage dmsg22(msg22);
-    mbmFanout4.route(dmsg22);
-    msg22->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(3u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg23 = mkMsg(testStore);  // transient no content
-    DeliverableMessage dmsg23(msg23);
-    mbmFanout4.route(dmsg23);
-    msg23->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(4u, tq9->getMessageCount());
-
-    intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true);  // durable no content
-    DeliverableMessage dmsg24(msg24);
-    mbmFanout4.route(dmsg24);
-    msg24->tryReleaseContent();
-    BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
-    BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit
-    BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
-}
-
 QPID_AUTO_TEST_CASE(testSetPositionFifo) {
     Queue::shared_ptr q(new Queue("my-queue", true));
     BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
     for (int i = 0; i < 10; ++i)
-        q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+        q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1)));
 
     // Verify the front of the queue
     TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1
-    BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1
+    BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
+
     // Verify the back of the queue
-    QueuedMessage qm;
     BOOST_CHECK_EQUAL(10u, q->getPosition());
-    BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
-    BOOST_CHECK_EQUAL("10", getContent(qm.payload));
     BOOST_CHECK_EQUAL(10u, q->getMessageCount());
 
     // Using setPosition to introduce a gap in sequence numbers.
     q->setPosition(15);
     BOOST_CHECK_EQUAL(10u, q->getMessageCount());
     BOOST_CHECK_EQUAL(15u, q->getPosition());
-    BOOST_CHECK(q->find(10, qm)); // Back of the queue
-    BOOST_CHECK_EQUAL("10", getContent(qm.payload));
-    q->deliver(contentMessage("16"));
-    c->setPosition(9);
+    q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16"));
+
+    q->seek(*c, Queue::MessagePredicate(), 9);
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(10u, c->last.position);
-    BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("10", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(16u, c->last.position);
-    BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("16", c->lastMessage.getContent());
 
     // Using setPosition to trunkcate the queue
     q->setPosition(5);
     BOOST_CHECK_EQUAL(5u, q->getMessageCount());
-    q->deliver(contentMessage("6a"));
-    c->setPosition(4);
+    q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a"));
+    c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+    q->seek(*c, Queue::MessagePredicate(), 4);
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(5u, c->last.position);
-    BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("5", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(6u, c->last.position);
-    BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent());
     BOOST_CHECK(!q->dispatch(c)); // No more messages.
 }
 
 QPID_AUTO_TEST_CASE(testSetPositionLvq) {
-    Queue::shared_ptr q(new Queue("my-queue", true));
+    QueueSettings settings;
     string key="key";
-    framing::FieldTable args;
-    args.setString("qpid.last_value_queue_key", "key");
-    q->configure(args);
+    settings.lvqKey = key;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
     const char* values[] = { "a", "b", "c", "a", "b", "c" };
     for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
-        intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
-        m->insertCustomProperty(key, values[i]);
-        q->deliver(m);
+        qpid::types::Variant::Map properties;
+        properties[key] = values[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
     }
     BOOST_CHECK_EQUAL(3u, q->getMessageCount());
     // Verify the front of the queue
     TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1
-    BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1
+    BOOST_CHECK_EQUAL("4", c->lastMessage.getContent());
     // Verify the back of the queue
-    QueuedMessage qm;
     BOOST_CHECK_EQUAL(6u, q->getPosition());
-    BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
-    BOOST_CHECK_EQUAL("6", getContent(qm.payload));
 
     q->setPosition(5);
-    c->setPosition(4);
+
+    c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+    q->seek(*c, Queue::MessagePredicate(), 4);
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1
+    BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1
     BOOST_CHECK(!q->dispatch(c));
 }
 
 QPID_AUTO_TEST_CASE(testSetPositionPriority) {
-    Queue::shared_ptr q(new Queue("my-queue", true));
-    framing::FieldTable args;
-    args.setInt("qpid.priorities", 10);
-    q->configure(args);
+    QueueSettings settings;
+    settings.priorities = 10;
+    QueueFactory factory;
+    Queue::shared_ptr q(factory.create("my-queue", settings));
 
     const int priorities[] = { 1, 2, 3, 2, 1, 3 };
     for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
-        intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
-        m->getFrames().getHeaders()->get<DeliveryProperties>(true)
-            ->setPriority(priorities[i]);
-        q->deliver(m);
+        qpid::types::Variant::Map properties;
+        properties["priority"] = priorities[i];
+        q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
     }
 
     // Truncation removes messages in fifo order, not priority order.
     q->setPosition(3);
-    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+    TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(1u, c->last.position);
+    BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(2u, c->last.position);
+    BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(3u, c->last.position);
+    BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
     BOOST_CHECK(!q->dispatch(c));
 
-    intrusive_ptr<Message> m = contentMessage("4a");
-    m->getFrames().getHeaders()->get<DeliveryProperties>(true)
-        ->setPriority(4);
-    q->deliver(m);
+    qpid::types::Variant::Map properties;
+    properties["priority"] = 4;
+    q->deliver(MessageUtils::createMessage(properties, "4a"));
+
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(4u, c->last.position);
-    BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
 
     // But consumers see priority order
     c.reset(new TestConsumer("test", true));
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(4u, c->last.position);
-    BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(3u, c->last.position);
-    BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("3", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(2u, c->last.position);
-    BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("2", c->lastMessage.getContent());
     BOOST_CHECK(q->dispatch(c));
-    BOOST_CHECK_EQUAL(1u, c->last.position);
-    BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+    BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
+    BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: qpid/trunk/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TxMocks.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TxMocks.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/TxMocks.h Fri Aug 10 12:04:27 2012
@@ -119,8 +119,6 @@ public:
         assertEqualVector(expected, actual);
     }
 
-    void accept(TxOpConstVisitor&) const {}
-
     ~MockTxOp(){}
 };
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug 10 12:04:27 2012
@@ -583,15 +583,7 @@ class ReplicationTests(BrokerTest):
         s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
-
-        # FIXME aconway 2012-02-22: there is a bug in priority ring
-        # queues that allows a low priority message to displace a high
-        # one. The following commented-out assert_browse is for the
-        # correct result, the uncommented one is for the actualy buggy
-        # result.  See https://issues.apache.org/jira/browse/QPID-3866
-        #
-        # expect = sorted(priorities,reverse=True)[0:5]
-        expect = [9,9,9,9,2]
+        expect = sorted(priorities,reverse=True)[0:5]
         primary.assert_browse("q", expect, transform=lambda m: m.priority)
         backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
 

Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Fri Aug 10 12:04:27 2012
@@ -34,6 +34,7 @@
 
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Plugin.h"
@@ -95,7 +96,7 @@ class TestStore : public NullMessageStor
                  const boost::intrusive_ptr<PersistableMessage>& pmsg,
                  const PersistableQueue& )
     {
-        Message* msg = dynamic_cast<Message*>(pmsg.get());
+        qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
         assert(msg);
 
         // Dump the message if there is a dump file.

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py Fri Aug 10 12:04:27 2012
@@ -308,8 +308,6 @@ class AlternateExchangeTests(TestBase010
 
         #create a queue using the intermediary as its alternate exchange:
         session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
-        #bind that queue to the dlq as well:
-        session.exchange_bind(exchange="dlq", queue="delivery-queue")
         #send it some messages:
         dp=self.session.delivery_properties(routing_key="delivery-queue")
         for m in ["One", "Two", "Three"]:
@@ -349,5 +347,5 @@ class AlternateExchangeTests(TestBase010
     def assertEmpty(self, queue):
         try:
             msg = queue.get(timeout=1) 
-            self.fail("Queue not empty: " + msg)
+            self.fail("Queue not empty: " + str(msg))
         except Empty: None

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Fri Aug 10 12:04:27 2012
@@ -498,7 +498,7 @@ class ManagementTest (TestBase010):
         session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
 
     def test_immediate_method(self):
-        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host or "localhost", self.broker.port or 5672)
         conn = qpid.messaging.Connection(url)
         conn.open()
         sess = conn.session()
@@ -659,7 +659,7 @@ class ManagementTest (TestBase010):
         self.assertEqual(rc.receive, True)
 
         # setup a connection & session to the broker
-        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host or "localhost", self.broker.port or 5672)
         conn = qpid.messaging.Connection(url)
         conn.open()
         sess = conn.session()

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Fri Aug 10 12:04:27 2012
@@ -787,7 +787,7 @@ class MultiConsumerMsgGroupTests(Base):
         except Empty:
             pass
         assert count == 3   # non-A's
-        assert a_count == 2 # pending acquired message included in browse results
+        assert a_count == 1 # assumes the acquired message was not the one purged and regular browsers don't get acquired messages
         s1.acknowledge()    # ack the consumed A-0
         self.qmf_session.delBroker(self.qmf_broker)
 

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py Fri Aug 10 12:04:27 2012
@@ -106,10 +106,10 @@ class GeneralTests(Base):
 
         self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange")
 
-        # Close sess1; This will cause the queue to be deleted
+        # Close sess1; This will cause the queue to be deleted and all its messages (including those acquired) to be reouted to the alternate exchange
         sess1.close()
         sleep(1)
-        self.assertEqual(rx_alt.available(), 2, "2 of the messages should have been routed to the alt_exchange")
+        self.assertEqual(rx_alt.available(), 5, "All the messages should have been routed to the alt_exchange")
 
         # Close sess2; This will cause the acquired messages to be requeued and routed to the alternate
         sess2.close()

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py Fri Aug 10 12:04:27 2012
@@ -112,11 +112,11 @@ class PriorityTests (Base):
         #check all messages on the queue were received by the browser; don't relay on any specific ordering at present
         assert set([m.content for m in msgs]) == set([m.content for m in received])
 
-    def ring_queue_check(self, msgs):
+    def ring_queue_check(self, msgs, count=10):
         """
         Ensure that a ring queue removes lowest priority messages first.
         """
-        snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"),
+        snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':%s" % count),
                               durable=self.durable())
         for m in msgs: snd.send(m)
 
@@ -126,23 +126,31 @@ class PriorityTests (Base):
             while True: received.append(rcv.fetch(0))
         except Empty: None
 
-        expected = []
-        for m in msgs:
-            while len(expected) > 9:
-                expected=sorted_(expected, key=lambda x: priority_level(x.priority,10))
-                expected.pop(0)
-            expected.append(m)
-        #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received])        
+        expected = sorted_(msgs, key=lambda x: priority_level(x.priority,10))[len(msgs)-count:]
+        expected = sorted_(expected, key=lambda x: priority_level(x.priority,10), reverse=True)
+        #print "sent %s; expected %s; got %s" % ([m.priority for m in msgs], [m.priority for m in expected], [m.priority for m in received])
+        #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received])
         assert [m.content for m in expected] == [m.content for m in received]
 
     def test_ring_queue_1(self):
         priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3]
-        seq = content("msg")    
+        seq = content("msg")
         self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
 
     def test_ring_queue_2(self):
-        priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9]
-        seq = content("msg")    
+        priorities = [9,0,2,3,6,3,4,2,9,2,9,9,1,9,4,7,1,1,3,9,7,3,9,3,9,1,5,1,9,7,2,3,0,9]
+        seq = content("msg")
+        self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+    def test_ring_queue_3(self):
+        #test case given for QPID-3866
+        priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
+        seq = content("msg")
+        self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities], 5)
+
+    def test_ring_queue_4(self):
+        priorities = [9,0,2,3,6,3,4,2,9,2,9,3,1,9,4,7,1,1,3,2,7,3,9,3,6,1,5,1,9,7,2,3,0,2]
+        seq = content("msg")
         self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
 
     def test_requeue(self):
@@ -169,6 +177,7 @@ class PriorityTests (Base):
         for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,10), reverse=True):
             msg = rcv.fetch(0)
             #print "expected priority %s got %s" % (expected.priority, msg.priority)
+            #print "expected content %s got %s" % (expected.content, msg.content)
             assert msg.content == expected.content
             self.ssn.acknowledge(msg)
 
@@ -231,7 +240,7 @@ def sorted_(msgs, key=None, reverse=Fals
     Workaround lack of sorted builtin function in python 2.3 and lack
     of keyword arguments to list.sort()
     """
-    temp = msgs
+    temp = [m for m in msgs]
     temp.sort(key_to_cmp(key, reverse=reverse))
     return temp
 

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py Fri Aug 10 12:04:27 2012
@@ -41,7 +41,7 @@ class ThresholdTests (Base):
             snd.send(m)
             count = count + 1
             size = size + len(m.content)
-        event = rcv.fetch()
+        event = rcv.fetch(timeout=1)
         schema = event.content[0]["_schema_id"]
         assert schema["_class_name"] == "queueThresholdExceeded"
         values = event.content[0]["_values"]



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org