You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [16/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp Fri Oct 21 14:42:12 2011
@@ -36,6 +36,9 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+
 #include <iostream>
 #include "boost/format.hpp"
 
@@ -53,12 +56,12 @@ class TestConsumer : public virtual Cons
 public:
     typedef boost::shared_ptr<TestConsumer> shared_ptr;
 
-    intrusive_ptr<Message> last;
+    QueuedMessage last;
     bool received;
-    TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+    TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
 
     virtual bool deliver(QueuedMessage& msg){
-        last = msg.payload;
+        last = msg;
         received = true;
         return true;
     };
@@ -78,13 +81,14 @@ public:
     Message& getMessage() { return *(msg.get()); }
 };
 
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
     intrusive_ptr<Message> msg(new Message());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
     return msg;
 }
 
@@ -145,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
 
     queue->deliver(msg1);
     BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+    BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
 
     queue->deliver(msg2);
     BOOST_CHECK(queue->dispatch(c2));
-    BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+    BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
 
     c1->received = false;
     queue->deliver(msg3);
     BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+    BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
 
     //Test cancellation:
     queue->cancel(c1);
@@ -210,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
     if (!consumer->received)
         sleep(2);
 
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
     BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
 
     received = queue->get().payload;
@@ -244,7 +248,7 @@ QPID_AUTO_TEST_CASE(testBound){
     exchange2.reset();
 
     //unbind the queue from all exchanges it knows it has been bound to:
-    queue->unbind(exchanges, queue);
+    queue->unbind(exchanges);
 
     //ensure the remaining exchanges don't still have the queue bound to them:
     FailOnDeliver deliverable;
@@ -254,26 +258,26 @@ QPID_AUTO_TEST_CASE(testBound){
 
 QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
     client::QueueOptions args;
-	args.setPersistLastNode();
+    args.setPersistLastNode();
 
-	Queue::shared_ptr queue(new Queue("my-queue", true));
+    Queue::shared_ptr queue(new Queue("my-queue", true));
     queue->configure(args);
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
     intrusive_ptr<Message> msg2 = create_message("e", "B");
     intrusive_ptr<Message> msg3 = create_message("e", "C");
 
-	//enqueue 2 messages
+    //enqueue 2 messages
     queue->deliver(msg1);
     queue->deliver(msg2);
 
-	//change mode
-	queue->setLastNodeFailure();
+    //change mode
+    queue->setLastNodeFailure();
 
-	//enqueue 1 message
+    //enqueue 1 message
     queue->deliver(msg3);
 
-	//check all have persistent ids.
+    //check all have persistent ids.
     BOOST_CHECK(msg1->isPersistent());
     BOOST_CHECK(msg2->isPersistent());
     BOOST_CHECK(msg3->isPersistent());
@@ -283,54 +287,58 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeS
 
 QPID_AUTO_TEST_CASE(testSeek){
 
-	Queue::shared_ptr queue(new Queue("my-queue", true));
+    Queue::shared_ptr queue(new Queue("my-queue", true));
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
     intrusive_ptr<Message> msg2 = create_message("e", "B");
     intrusive_ptr<Message> msg3 = create_message("e", "C");
 
-	//enqueue 2 messages
+    //enqueue 2 messages
     queue->deliver(msg1);
     queue->deliver(msg2);
     queue->deliver(msg3);
 
-    TestConsumer::shared_ptr consumer(new TestConsumer(false));
+    TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
     SequenceNumber seq(2);
     consumer->position = seq;
 
     QueuedMessage qm;
     queue->dispatch(consumer);
-    
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+
+    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
     queue->dispatch(consumer);
     queue->dispatch(consumer); // make sure over-run is safe
- 
+
 }
 
 QPID_AUTO_TEST_CASE(testSearch){
 
-	Queue::shared_ptr queue(new Queue("my-queue", true));
+    Queue::shared_ptr queue(new Queue("my-queue", true));
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
     intrusive_ptr<Message> msg2 = create_message("e", "B");
     intrusive_ptr<Message> msg3 = create_message("e", "C");
 
-	//enqueue 2 messages
+    //enqueue 2 messages
     queue->deliver(msg1);
     queue->deliver(msg2);
     queue->deliver(msg3);
 
     SequenceNumber seq(2);
-    QueuedMessage qm = queue->find(seq);
-    
+    QueuedMessage qm;
+    TestConsumer::shared_ptr c1(new TestConsumer());
+
+    BOOST_CHECK(queue->find(seq, qm));
+
     BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-    
-    queue->acquire(qm);
+
+    queue->acquire(qm, c1->getName());
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
     SequenceNumber seq1(3);
-    QueuedMessage qm1 = queue->find(seq1);
+    QueuedMessage qm1;
+    BOOST_CHECK(queue->find(seq1, qm1));
     BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-    
+
 }
 const std::string nullxid = "";
 
@@ -416,10 +424,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
 
     client::QueueOptions args;
     // set queue mode
-	args.setOrdering(client::LVQ);
+    args.setOrdering(client::LVQ);
 
-	Queue::shared_ptr queue(new Queue("my-queue", true ));
-	queue->configure(args);
+    Queue::shared_ptr queue(new Queue("my-queue", true ));
+    queue->configure(args);
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
     intrusive_ptr<Message> msg2 = create_message("e", "B");
@@ -430,16 +438,16 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     //set deliever match for LVQ a,b,c,a
 
     string key;
-	args.getLVQKey(key);
+    args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-	msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-	msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-	msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-	msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"b");
+    msg3->insertCustomProperty(key,"c");
+    msg4->insertCustomProperty(key,"a");
 
-	//enqueue 4 message
+    //enqueue 4 message
     queue->deliver(msg1);
     queue->deliver(msg2);
     queue->deliver(msg3);
@@ -459,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     intrusive_ptr<Message> msg5 = create_message("e", "A");
     intrusive_ptr<Message> msg6 = create_message("e", "B");
     intrusive_ptr<Message> msg7 = create_message("e", "C");
-	msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-	msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-	msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    msg5->insertCustomProperty(key,"a");
+    msg6->insertCustomProperty(key,"b");
+    msg7->insertCustomProperty(key,"c");
     queue->deliver(msg5);
     queue->deliver(msg6);
     queue->deliver(msg7);
@@ -496,7 +504,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
     queue->deliver(msg1);
     queue->deliver(msg2);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -508,6 +516,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     client::QueueOptions args;
     // set queue mode
     args.setOrdering(client::LVQ);
+    // disable flow control, as this test violates the enqueue/dequeue sequence.
+    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
     Queue::shared_ptr queue(new Queue("my-queue", true ));
     queue->configure(args);
@@ -526,12 +536,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-    msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-    msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-    msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"b");
+    msg3->insertCustomProperty(key,"c");
+    msg4->insertCustomProperty(key,"a");
+    msg5->insertCustomProperty(key,"b");
+    msg6->insertCustomProperty(key,"c");
 
     //enqueue 4 message
     queue->deliver(msg1);
@@ -546,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
     framing::SequenceNumber sequence1(10);
     QueuedMessage qmsg3(queue.get(), 0, sequence1);
+    TestConsumer::shared_ptr dummy(new TestConsumer());
 
-    BOOST_CHECK(!queue->acquire(qmsg));
-    BOOST_CHECK(queue->acquire(qmsg2));
+    BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
+    BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
     // Acquire the massage again to test failure case.
-    BOOST_CHECK(!queue->acquire(qmsg2));
-    BOOST_CHECK(!queue->acquire(qmsg3));
+    BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
+    BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
 
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
 
@@ -561,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     // set mode to no browse and check
     args.setOrdering(client::LVQ_NO_BROWSE);
     queue->configure(args);
-    TestConsumer::shared_ptr c1(new TestConsumer(false));
+    TestConsumer::shared_ptr c1(new TestConsumer("test", false));
 
     queue->dispatch(c1);
     queue->dispatch(c1);
@@ -595,8 +606,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
     args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"a");
 
     queue1->deliver(msg1);
     queue2->deliver(msg1);
@@ -630,7 +641,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
 
     Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
     intrusive_ptr<Message> received;
-    queue1->configure(args);
+    queue1->create(args);
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
     intrusive_ptr<Message> msg2 = create_message("e", "A");
@@ -639,9 +650,9 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
     args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-	// 3
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"a");
+    // 3
     queue1->deliver(msg1);
     // 4
     queue1->setLastNodeFailure();
@@ -660,13 +671,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
 {
     for (uint i = 0; i < count; i++) {
-        intrusive_ptr<Message> m = create_message("exchange", "key");
-        if (i % 2) {
-            if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
-        } else {
-            if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
-        }
-        m->setTimestamp(new broker::ExpiryPolicy);
+        intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
+        m->computeExpiration(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
 }
@@ -676,7 +682,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
     addMessagesToQueue(10, queue);
     BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
     ::usleep(300*1000);
-    queue.purgeExpired();
+    queue.purgeExpired(0);
     BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
 }
 
@@ -687,7 +693,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     addMessagesToQueue(10, *queue, 200, 400);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
 
-    QueueCleaner cleaner(queues, timer);
+    QueueCleaner cleaner(queues, &timer);
     cleaner.start(100 * qpid::sys::TIME_MSEC);
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
@@ -695,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
 }
 
+
+namespace {
+    // helper for group tests
+    void verifyAcquire( Queue::shared_ptr queue,
+                        TestConsumer::shared_ptr c,
+                        std::deque<QueuedMessage>& results,
+                        const std::string& expectedGroup,
+                        const int expectedId )
+    {
+        queue->dispatch(c);
+        results.push_back(c->last);
+        std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+        int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+        BOOST_CHECK_EQUAL( group, expectedGroup );
+        BOOST_CHECK_EQUAL( id, expectedId );
+    }
+}
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+    //
+    // Verify that consumers of grouped messages own the groups once a message is acquired,
+    // and release the groups once all acquired messages have been dequeued or requeued
+    //
+    FieldTable args;
+    Queue::shared_ptr queue(new Queue("my_queue", true));
+    args.setString("qpid.group_header_key", "GROUP-ID");
+    args.setInt("qpid.shared_msg_group", 1);
+    queue->configure(args);
+
+    std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+                             std::string("b"), std::string("b"), std::string("b"),
+                             std::string("c"), std::string("c"), std::string("c") };
+    for (int i = 0; i < 9; ++i) {
+        intrusive_ptr<Message> msg = create_message("e", "A");
+        msg->insertCustomProperty("GROUP-ID", groups[i]);
+        msg->insertCustomProperty("MY-ID", i);
+        queue->deliver(msg);
+    }
+
+    // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+    BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
+
+    TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+    TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+    queue->consume(c1);
+    queue->consume(c2);
+
+    std::deque<QueuedMessage> dequeMeC1;
+    std::deque<QueuedMessage> dequeMeC2;
+
+
+    verifyAcquire(queue, c1, dequeMeC1, "a", 0 );  // c1 now owns group "a" (acquire a-0)
+    verifyAcquire(queue, c2, dequeMeC2, "b", 3 );  // c2 should now own group "b" (acquire b-3)
+
+    // now let c1 complete the 'a-0' message - this should free the 'a' group
+    queue->dequeue( 0, dequeMeC1.front() );
+    dequeMeC1.pop_front();
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+    // now c2 should pick up the next 'a-1', since it is oldest free
+    verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+    // c1 should only be able to snarf up the first "c" message now...
+    verifyAcquire(queue, c1, dequeMeC1, "c", 6 );    // should skip to the first "c"
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+    // hmmm... what if c2 now dequeues "b-3"?  (now only has a-1 acquired)
+    queue->dequeue( 0, dequeMeC2.front() );
+    dequeMeC2.pop_front();
+
+    // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+    // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+    verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
+
+    // c2 can now only grab a-2, and that's all
+    verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
+
+    // now C2 can't get any more, since C1 owns "b" and "c" group...
+    bool gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    // hmmm... what if c1 now dequeues "c-6"?  (now only own's b-4)
+    queue->dequeue( 0, dequeMeC1.front() );
+    dequeMeC1.pop_front();
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+    // c2 can now grab c-7
+    verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+    // what happens if C-2 "requeues" a-1 and a-2?
+    queue->requeue( dequeMeC2.front() );
+    dequeMeC2.pop_front();
+    queue->requeue( dequeMeC2.front() );
+    dequeMeC2.pop_front();  // now just has c-7 acquired
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+    // now c1 will grab a-1 and a-2...
+    verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+    verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+    // c2 can now acquire c-8 only
+    verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
+
+    // and c1 can get b-5
+    verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+    // should be no more acquire-able for anyone now:
+    gotOne = queue->dispatch(c1);
+    BOOST_CHECK( !gotOne );
+    gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    // requeue all of C1's acquired messages, then cancel C1
+    while (!dequeMeC1.empty()) {
+        queue->requeue(dequeMeC1.front());
+        dequeMeC1.pop_front();
+    }
+    queue->cancel(c1);
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ---, ---, ---, ---, ^C2, ^C2
+
+    // b-4, a-1, a-2, b-5 all should be available, right?
+    verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
+
+    while (!dequeMeC2.empty()) {
+        queue->dequeue(0, dequeMeC2.front());
+        dequeMeC2.pop_front();
+    }
+
+    // Queue = a-2, b-4, b-5
+    // Owners= ---, ---, ---
+
+    TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+    std::deque<QueuedMessage> dequeMeC3;
+
+    verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+    verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+    // Queue = a-2, b-4, b-5
+    // Owners= ^C3, ^C2, ^C2
+
+    gotOne = queue->dispatch(c3);
+    BOOST_CHECK( !gotOne );
+
+    verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+    while (!dequeMeC2.empty()) {
+        queue->dequeue(0, dequeMeC2.front());
+        dequeMeC2.pop_front();
+    }
+
+    // Queue = a-2,
+    // Owners= ^C3,
+
+    intrusive_ptr<Message> msg = create_message("e", "A");
+    msg->insertCustomProperty("GROUP-ID", "a");
+    msg->insertCustomProperty("MY-ID", 9);
+    queue->deliver(msg);
+
+    // Queue = a-2, a-9
+    // Owners= ^C3, ^C3
+
+    gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    msg = create_message("e", "A");
+    msg->insertCustomProperty("GROUP-ID", "b");
+    msg->insertCustomProperty("MY-ID", 10);
+    queue->deliver(msg);
+
+    // Queue = a-2, a-9, b-10
+    // Owners= ^C3, ^C3, ----
+
+    verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+    verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+    gotOne = queue->dispatch(c3);
+    BOOST_CHECK( !gotOne );
+
+    queue->cancel(c2);
+    queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+    //
+    // Verify that the same default group name is automatically applied to messages that
+    // do not specify a group name.
+    //
+    FieldTable args;
+    Queue::shared_ptr queue(new Queue("my_queue", true));
+    args.setString("qpid.group_header_key", "GROUP-ID");
+    args.setInt("qpid.shared_msg_group", 1);
+    queue->configure(args);
+
+    for (int i = 0; i < 3; ++i) {
+        intrusive_ptr<Message> msg = create_message("e", "A");
+        // no "GROUP-ID" header
+        msg->insertCustomProperty("MY-ID", i);
+        queue->deliver(msg);
+    }
+
+    // Queue = 0, 1, 2
+
+    BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
+
+    TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+    TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+    queue->consume(c1);
+    queue->consume(c2);
+
+    std::deque<QueuedMessage> dequeMeC1;
+    std::deque<QueuedMessage> dequeMeC2;
+
+    queue->dispatch(c1);    // c1 now owns default group (acquired 0)
+    dequeMeC1.push_back(c1->last);
+    int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 0 );
+
+    bool gotOne = queue->dispatch(c2);  // c2 should get nothing
+    BOOST_CHECK( !gotOne );
+
+    queue->dispatch(c1);    // c1 now acquires 1
+    dequeMeC1.push_back(c1->last);
+    id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 1 );
+
+    gotOne = queue->dispatch(c2);  // c2 should still get nothing
+    BOOST_CHECK( !gotOne );
+
+    while (!dequeMeC1.empty()) {
+        queue->dequeue(0, dequeMeC1.front());
+        dequeMeC1.pop_front();
+    }
+
+    // now default group should be available...
+    queue->dispatch(c2);    // c2 now owns default group (acquired 2)
+    id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 2 );
+
+    gotOne = queue->dispatch(c1);  // c1 should get nothing
+    BOOST_CHECK( !gotOne );
+
+    queue->cancel(c1);
+    queue->cancel(c2);
+}
+
 QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
 
     TestMessageStoreOC  testStore;
@@ -702,9 +982,9 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
     args.setPersistLastNode();
 
     Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
-    queue1->configure(args);
+    queue1->create(args);
     Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
-    queue2->configure(args);
+    queue2->create(args);
 
     intrusive_ptr<Message> msg1 = create_message("e", "A");
 
@@ -790,7 +1070,7 @@ not requeued to the store.
 
     Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
     intrusive_ptr<Message> received;
-    queue1->configure(args);
+    queue1->create(args);
 
     // check requeue 1
     intrusive_ptr<Message> msg1 = create_message("e", "C");
@@ -870,28 +1150,40 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
 
     intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X'));  // transient w/ content
     DeliverableMessage dmsg02(msg02);
-    BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+    {
+        ScopedSuppressLogging sl; // suppress expected error messages.
+        BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+    }
     msg02->tryReleaseContent();
     BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
     BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
 
     intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true);  // durable w/ content
     DeliverableMessage dmsg03(msg03);
-    BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+    {
+        ScopedSuppressLogging sl; // suppress expected error messages.
+        BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+    }
     msg03->tryReleaseContent();
     BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
     BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
 
     intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
     DeliverableMessage dmsg04(msg04);
-    BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+    {
+        ScopedSuppressLogging sl; // suppress expected error messages.
+        BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+    }
     msg04->tryReleaseContent();
     BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
     BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
 
     intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
     DeliverableMessage dmsg05(msg05);
-    BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+    {
+        ScopedSuppressLogging sl; // suppress expected error messages.
+        BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+    }
     msg05->tryReleaseContent();
     BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
     BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());

Modified: qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp Fri Oct 21 14:42:12 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testReplicationExcha
 {
     qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<string>("qpidd")
                                                            ("--replication-exchange-name=qpid.replication")));
-    ProxySessionFixture f(brokerOpts);
+    SessionFixture f(brokerOpts);
 
 
     std::string dataQ("queue-1");

Modified: qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp Fri Oct 21 14:42:12 2011
@@ -43,7 +43,7 @@ using namespace qpid::framing;
 // Apply f to [begin, end) and accumulate the result
 template <class Iter, class T, class F>
 T applyAccumulate(Iter begin, Iter end, T seed, const F& f) {
-    return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, _2)));
+    return std::accumulate(begin, end, seed, boost::bind(std::plus<T>(), _1, boost::bind(f, _2)));
 }
 
 // Create a frame with a one-char string.
@@ -105,8 +105,8 @@ size_t transferN(qpid::SessionState& s, 
         char last = content[content.size()-1];
         content.resize(content.size()-1);
         size += applyAccumulate(content.begin(), content.end(), 0,
-                                bind(&send, ref(s),
-                                     bind(contentFrameChar, _1, false)));
+                                boost::bind(&send, boost::ref(s),
+                                     boost::bind(contentFrameChar, _1, false)));
         size += send(s, contentFrameChar(last, true));
     }
     return size;
@@ -115,7 +115,7 @@ size_t transferN(qpid::SessionState& s, 
 // Send multiple transfers with single-byte content.
 size_t transfers(qpid::SessionState& s, string content) {
     return applyAccumulate(content.begin(), content.end(), 0,
-                           bind(transfer1Char, ref(s), _1));
+                           boost::bind(transfer1Char, boost::ref(s), _1));
 }
 
 size_t contentFrameSize(size_t n=1) { return AMQFrame(( AMQContentBody())).encodedSize() + n; }

Modified: qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp Fri Oct 21 14:42:12 2011
@@ -77,8 +77,10 @@ class TestTask : public TimerTask
         BOOST_CHECK(fired);
         BOOST_CHECK_EQUAL(expected_position, position);
         Duration actual(start, end);
-#ifdef _WIN32
+#ifdef _MSC_VER
         uint64_t difference = _abs64(expected - actual);
+#elif defined(_WIN32)
+        uint64_t difference = labs(expected - actual);
 #else
         uint64_t difference = abs(expected - actual);
 #endif

Modified: qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp Fri Oct 21 14:42:12 2011
@@ -50,10 +50,9 @@ struct TxPublishTest
     TxPublishTest() :
         queue1(new Queue("queue1", false, &store, 0)),
         queue2(new Queue("queue2", false, &store, 0)),
-        msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")),
+        msg(MessageUtils::createMessage("exchange", "routing_key", true)),
         op(msg)
     {
-        msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
         op.deliverTo(queue1);
         op.deliverTo(queue2);
     }
@@ -74,7 +73,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
     BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
-    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete());
+    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
 }
 
 QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +86,7 @@ QPID_AUTO_TEST_CASE(testCommit)
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
     intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
 
-    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
+    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete());
     BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
 
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());

Modified: qpid/branches/QPID-2519/cpp/src/tests/Url.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Url.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Url.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Url.cpp Fri Oct 21 14:42:12 2011
@@ -60,6 +60,32 @@ QPID_AUTO_TEST_CASE(TestParseXyz) {
     BOOST_CHECK_EQUAL(Url("xyz:host").str(), "amqp:xyz:host:5672");
 }
 
+QPID_AUTO_TEST_CASE(TestParseTricky) {
+    BOOST_CHECK_EQUAL(Url("amqp").str(), "amqp:tcp:amqp:5672");
+    BOOST_CHECK_EQUAL(Url("amqp:tcp").str(), "amqp:tcp:tcp:5672");
+    // These are ambiguous parses and arguably not the best result
+    BOOST_CHECK_EQUAL(Url("amqp:876").str(), "amqp:tcp:876:5672");
+    BOOST_CHECK_EQUAL(Url("tcp:567").str(), "amqp:tcp:567:5672");
+}
+
+QPID_AUTO_TEST_CASE(TestParseIPv6) {
+    Url u1("[::]");
+    BOOST_CHECK_EQUAL(u1[0].host, "::");
+    BOOST_CHECK_EQUAL(u1[0].port, 5672);
+    Url u2("[::1]");
+    BOOST_CHECK_EQUAL(u2[0].host, "::1");
+    BOOST_CHECK_EQUAL(u2[0].port, 5672);
+    Url u3("[::127.0.0.1]");
+    BOOST_CHECK_EQUAL(u3[0].host, "::127.0.0.1");
+    BOOST_CHECK_EQUAL(u3[0].port, 5672);
+    Url u4("[2002::222:68ff:fe0b:e61a]");
+    BOOST_CHECK_EQUAL(u4[0].host, "2002::222:68ff:fe0b:e61a");
+    BOOST_CHECK_EQUAL(u4[0].port, 5672);
+    Url u5("[2002::222:68ff:fe0b:e61a]:123");
+    BOOST_CHECK_EQUAL(u5[0].host, "2002::222:68ff:fe0b:e61a");
+    BOOST_CHECK_EQUAL(u5[0].port, 123);
+}
+
 QPID_AUTO_TEST_CASE(TestParseMultiAddress) {
     Url::addProtocol("xyz");
     URL_CHECK_STR("amqp:tcp:host:0,xyz:foo:123,tcp:foo:0,xyz:bar:1");

Modified: qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp Fri Oct 21 14:42:12 2011
@@ -86,6 +86,64 @@ QPID_AUTO_TEST_CASE(testConversions)
     BOOST_CHECK_THROW(value.asBool(), InvalidConversion);
 }
 
+QPID_AUTO_TEST_CASE(testConversionsFromString)
+{
+    Variant value;
+    value = "5";
+    BOOST_CHECK_EQUAL(5, value.asInt16());
+    BOOST_CHECK_EQUAL(5u, value.asUint16());
+
+    value = "-5";
+    BOOST_CHECK_EQUAL(-5, value.asInt16());
+    BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+
+    value = "18446744073709551615";
+    BOOST_CHECK_EQUAL(18446744073709551615ull, value.asUint64());
+    BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+
+    value = "9223372036854775808";
+    BOOST_CHECK_EQUAL(9223372036854775808ull, value.asUint64());
+    BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+
+    value = "-9223372036854775809";
+    BOOST_CHECK_THROW(value.asUint64(), InvalidConversion);
+    BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+
+    value = "2147483648";
+    BOOST_CHECK_EQUAL(2147483648ul, value.asUint32());
+    BOOST_CHECK_THROW(value.asInt32(), InvalidConversion);
+
+    value = "-2147483649";
+    BOOST_CHECK_THROW(value.asUint32(), InvalidConversion);
+    BOOST_CHECK_THROW(value.asInt32(), InvalidConversion);
+
+    value = "32768";
+    BOOST_CHECK_EQUAL(32768u, value.asUint16());
+    BOOST_CHECK_THROW(value.asInt16(), InvalidConversion);
+
+    value = "-32769";
+    BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+    BOOST_CHECK_THROW(value.asInt16(), InvalidConversion);
+
+    value = "-2.5";
+    BOOST_CHECK_EQUAL(-2.5, value.asFloat());
+
+    value = "-0.875432e10";
+    BOOST_CHECK_EQUAL(-0.875432e10, value.asDouble());
+
+    value = "-0";
+    BOOST_CHECK_EQUAL(0, value.asInt16());
+    BOOST_CHECK_EQUAL(0u, value.asUint16());
+
+    value = "-000";
+    BOOST_CHECK_EQUAL(0, value.asInt16());
+    BOOST_CHECK_EQUAL(0u, value.asUint16());
+
+    value = "-0010";
+    BOOST_CHECK_EQUAL(-10, value.asInt16());
+    BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+}
+
 QPID_AUTO_TEST_CASE(testSizeConversionsUint)
 {
     Variant value;

Modified: qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp Fri Oct 21 14:42:12 2011
@@ -90,7 +90,7 @@ struct SimpleListener : public MessageLi
     }
 };
 
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
 {
     void declareSubscribe(const string& q="odd_blue",
                           const string& dest="xml")

Modified: qpid/branches/QPID-2519/cpp/src/tests/acl.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/acl.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/acl.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/acl.py Fri Oct 21 14:42:12 2011
@@ -26,10 +26,11 @@ from qpid.datatypes import uuid4
 from qpid.testlib import TestBase010
 from qmf.console import Session
 from qpid.datatypes import Message
+import qpid.messaging
 
 class ACLFile:
-    def __init__(self):
-        self.f = open('data_dir/policy.acl','w');
+    def __init__(self, policy='data_dir/policy.acl'):
+        self.f = open(policy,'w')
    
     def write(self,line):
         self.f.write(line)
@@ -50,14 +51,24 @@ class ACLTests(TestBase010):
         acl = self.qmf.getObjects(_class="acl")[0]    
         return acl.reloadACLFile()
 
+    def get_acl_file(self):
+        return ACLFile(self.config.defines.get("policy-file", "data_dir/policy.acl"))
+
     def setUp(self):
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl allow all all\n')
         aclf.close()
         TestBase010.setUp(self)
         self.startQmf()
         self.reload_acl()
-        
+
+    def tearDown(self):
+        aclf = self.get_acl_file()
+        aclf.write('acl allow all all\n')
+        aclf.close()
+        self.reload_acl()
+        TestBase010.tearDown(self)
+
    #=====================================
    # ACL general tests
    #=====================================     
@@ -66,7 +77,7 @@ class ACLTests(TestBase010):
         """
         Test the deny all mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl allow anonymous all all\n')
         aclf.write('acl allow bob@QPID create queue\n')
         aclf.write('acl deny all all')
@@ -94,7 +105,7 @@ class ACLTests(TestBase010):
         """
         Test the allow all mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID bind exchange\n')
         aclf.write('acl allow all all')
         aclf.close()        
@@ -126,7 +137,7 @@ class ACLTests(TestBase010):
         """
         Test empty groups
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl group\n')
         aclf.write('acl group admins bob@QPID joe@QPID\n')
         aclf.write('acl allow all all')
@@ -140,7 +151,7 @@ class ACLTests(TestBase010):
         """
         Test illegal acl formats
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl group admins bob@QPID joe@QPID\n')
         aclf.write('acl allow all all')
         aclf.close()
@@ -154,7 +165,7 @@ class ACLTests(TestBase010):
         Test illegal extension lines
         """
          
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('group admins bob@QPID \n')
         aclf.write('          \ \n')
         aclf.write('joe@QPID \n')
@@ -172,7 +183,7 @@ class ACLTests(TestBase010):
         """
         Test proper extention lines
         """
-        aclf = ACLFile()        
+        aclf = self.get_acl_file()
         aclf.write('group test1 joe@EXAMPLE.com \\ \n') # should be allowed
         aclf.write('            jack@EXAMPLE.com \\ \n') # should be allowed
         aclf.write('jill@TEST.COM \\ \n') # should be allowed
@@ -189,7 +200,7 @@ class ACLTests(TestBase010):
         Test a user defined without a realm
         Ex. group admin rajith
         """
-        aclf = ACLFile()        
+        aclf = self.get_acl_file()
         aclf.write('group admin bob\n') # shouldn't be allowed
         aclf.write('acl deny admin bind exchange\n')
         aclf.write('acl allow all all')
@@ -204,7 +215,7 @@ class ACLTests(TestBase010):
         Test a user defined without a realm
         Ex. group admin rajith
         """
-        aclf = ACLFile()        
+        aclf = self.get_acl_file()
         aclf.write('group test1 joe@EXAMPLE.com\n') # should be allowed
         aclf.write('group test2 jack_123-jill@EXAMPLE.com\n') # should be allowed
         aclf.write('group test4 host/somemachine.example.com@EXAMPLE.COM\n') # should be allowed
@@ -215,7 +226,7 @@ class ACLTests(TestBase010):
         if (result.text.find("ACL format error",0,len(result.text)) != -1):
             self.fail(result)
 
-        aclf = ACLFile()        
+        aclf = self.get_acl_file()
         aclf.write('group test1 joe$H@EXAMPLE.com\n') # shouldn't be allowed
         aclf.write('acl allow all all')
         aclf.close() 
@@ -233,7 +244,7 @@ class ACLTests(TestBase010):
         Test illegal queue policy
         """
          
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ding\n')
         aclf.write('acl allow all all')
         aclf.close()        
@@ -249,7 +260,7 @@ class ACLTests(TestBase010):
         Test illegal queue policy
         """
          
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=-1\n')
         aclf.write('acl allow all all')
         aclf.close()        
@@ -260,7 +271,7 @@ class ACLTests(TestBase010):
         if (result.text != expected): 
             self.fail(result) 
 
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=9223372036854775808\n')
         aclf.write('acl allow all all')                                 
         aclf.close()        
@@ -277,7 +288,7 @@ class ACLTests(TestBase010):
         Test illegal queue policy
         """
          
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=-1\n')
         aclf.write('acl allow all all')
         aclf.close()        
@@ -288,7 +299,7 @@ class ACLTests(TestBase010):
         if (result.text != expected): 
             self.fail(result) 
 
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=9223372036854775808\n')
         aclf.write('acl allow all all')                                 
         aclf.close()        
@@ -308,7 +319,7 @@ class ACLTests(TestBase010):
         """
         Test cases for queue acl in allow mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create queue name=q1 durable=true passive=true\n')
         aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
         aclf.write('acl deny bob@QPID access queue name=q3\n')
@@ -411,7 +422,7 @@ class ACLTests(TestBase010):
         """
         Test cases for queue acl in deny mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl allow bob@QPID create queue name=q1 durable=true passive=true\n')
         aclf.write('acl allow bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
         aclf.write('acl allow bob@QPID access queue name=q3\n')
@@ -534,7 +545,7 @@ class ACLTests(TestBase010):
         """
         Test cases for exchange acl in allow mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID create exchange name=testEx durable=true passive=true\n')
         aclf.write('acl deny bob@QPID create exchange name=ex1 type=direct\n')
         aclf.write('acl deny bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
@@ -665,7 +676,7 @@ class ACLTests(TestBase010):
         """
         Test cases for exchange acl in deny mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl allow bob@QPID create exchange name=myEx durable=true passive=false\n')
         aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n') 
         aclf.write('acl allow bob@QPID unbind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
@@ -772,6 +783,52 @@ class ACLTests(TestBase010):
             if (403 == e.args[0].error_code):
                 self.fail("ACL should allow exchange delete request for myEx");
 
+    def test_create_and_delete_exchange_via_qmf(self):
+        """
+        Test acl is enforced when creating/deleting via QMF
+        methods. Note that in order to be able to send the QMF methods
+        and receive the responses a significant amount of permissions
+        need to be enabled (TODO: can the set below be narrowed down
+        at all?)
+        """
+        aclf = self.get_acl_file()
+        aclf.write('acl allow bob@QPID create exchange\n')
+        aclf.write('acl allow admin@QPID delete exchange\n')
+        aclf.write('acl allow all access exchange\n')
+        aclf.write('acl allow all bind exchange\n')
+        aclf.write('acl allow all create queue\n')
+        aclf.write('acl allow all access queue\n')
+        aclf.write('acl allow all delete queue\n')
+        aclf.write('acl allow all consume queue\n')
+        aclf.write('acl allow all access method\n')
+        aclf.write('acl deny all all')
+        aclf.close()
+
+        result = self.reload_acl()
+        if (result.text.find("format error",0,len(result.text)) != -1):
+            self.fail(result)
+
+        bob = BrokerAdmin(self.config.broker, "bob", "bob")
+        bob.create_exchange("my-exchange") #should pass
+        #cleanup by deleting exchange
+        try:
+            bob.delete_exchange("my-exchange") #should fail
+            self.fail("ACL should deny exchange delete request for my-exchange");
+        except Exception, e:
+            self.assertEqual(7,e.args[0]["error_code"])
+            assert e.args[0]["error_text"].find("unauthorized-access") == 0
+        admin = BrokerAdmin(self.config.broker, "admin", "admin")
+        admin.delete_exchange("my-exchange") #should pass
+
+        anonymous = BrokerAdmin(self.config.broker)
+        try:
+            anonymous.create_exchange("another-exchange") #should fail
+            self.fail("ACL should deny exchange create request for another-exchange");
+        except Exception, e:
+            self.assertEqual(7,e.args[0]["error_code"])
+            assert e.args[0]["error_text"].find("unauthorized-access") == 0
+
+
    #=====================================
    # ACL consume tests
    #=====================================
@@ -780,7 +837,7 @@ class ACLTests(TestBase010):
         """
         Test cases for consume in allow mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID consume queue name=q1\n')
         aclf.write('acl deny bob@QPID consume queue name=q2\n')                
         aclf.write('acl allow all all')
@@ -826,7 +883,7 @@ class ACLTests(TestBase010):
         """
         Test cases for consume in allow mode
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl allow bob@QPID consume queue name=q1\n')
         aclf.write('acl allow bob@QPID consume queue name=q2\n')
         aclf.write('acl allow bob@QPID create queue\n')                                
@@ -872,7 +929,7 @@ class ACLTests(TestBase010):
         """
         Test various publish acl
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl deny bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
         aclf.write('acl deny bob@QPID publish exchange name=amq.topic\n')
         aclf.write('acl deny bob@QPID publish exchange name=myEx routingkey=rk2\n')                
@@ -921,7 +978,7 @@ class ACLTests(TestBase010):
         """
         Test various publish acl
         """
-        aclf = ACLFile()
+        aclf = self.get_acl_file()
         aclf.write('acl allow bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
         aclf.write('acl allow bob@QPID publish exchange name=amq.topic\n')
         aclf.write('acl allow bob@QPID publish exchange name=myEx routingkey=rk2\n')
@@ -972,3 +1029,113 @@ class ACLTests(TestBase010):
         except qpid.session.SessionException, e:
             if (403 == e.args[0].error_code):
                 self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
+
+   #=====================================
+   # ACL broker configuration tests
+   #=====================================
+
+    def test_broker_timestamp_config(self):
+        """
+        Test ACL control of the broker timestamp configuration
+        """
+        aclf = self.get_acl_file()
+        # enable lots of stuff to allow QMF to work
+        aclf.write('acl allow all create exchange\n')
+        aclf.write('acl allow all access exchange\n')
+        aclf.write('acl allow all bind exchange\n')
+        aclf.write('acl allow all publish exchange\n')
+        aclf.write('acl allow all create queue\n')
+        aclf.write('acl allow all access queue\n')
+        aclf.write('acl allow all delete queue\n')
+        aclf.write('acl allow all consume queue\n')
+        aclf.write('acl allow all access method\n')
+        # this should let bob access the timestamp configuration
+        aclf.write('acl allow bob@QPID access broker\n')
+        aclf.write('acl allow admin@QPID all all\n')
+        aclf.write('acl deny all all')
+        aclf.close()
+
+        result = self.reload_acl()
+        if (result.text.find("format error",0,len(result.text)) != -1):
+            self.fail(result)
+
+        ts = None
+        bob = BrokerAdmin(self.config.broker, "bob", "bob")
+        ts = bob.get_timestamp_cfg() #should work
+        bob.set_timestamp_cfg(ts);   #should work
+
+        obo = BrokerAdmin(self.config.broker, "obo", "obo")
+        try:
+            ts = obo.get_timestamp_cfg() #should fail
+            failed = False
+        except Exception, e:
+            failed = True
+            self.assertEqual(7,e.args[0]["error_code"])
+            assert e.args[0]["error_text"].find("unauthorized-access") == 0
+        assert(failed)
+
+        try:
+            obo.set_timestamp_cfg(ts) #should fail
+            failed = False
+        except Exception, e:
+            failed = True
+            self.assertEqual(7,e.args[0]["error_code"])
+            assert e.args[0]["error_text"].find("unauthorized-access") == 0
+        assert(failed)
+
+        admin = BrokerAdmin(self.config.broker, "admin", "admin")
+        ts = admin.get_timestamp_cfg() #should pass
+        admin.set_timestamp_cfg(ts) #should pass
+
+
+class BrokerAdmin:
+    def __init__(self, broker, username=None, password=None):
+        self.connection = qpid.messaging.Connection(broker)
+        if username:
+            self.connection.username = username
+            self.connection.password = password
+            self.connection.sasl_mechanisms = "PLAIN"
+        self.connection.open()
+        self.session = self.connection.session()
+        self.sender = self.session.sender("qmf.default.direct/broker")
+        self.reply_to = "responses-#; {create:always}"
+        self.receiver = self.session.receiver(self.reply_to)
+
+    def invoke(self, method, arguments):
+        content = {
+            "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"},
+            "_method_name": method,
+            "_arguments": arguments
+            }
+        request = qpid.messaging.Message(reply_to=self.reply_to, content=content)
+        request.properties["x-amqp-0-10.app-id"] = "qmf2"
+        request.properties["qmf.opcode"] = "_method_request"
+        self.sender.send(request)
+        response = self.receiver.fetch()
+        self.session.acknowledge()
+        if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
+            if response.properties['qmf.opcode'] == '_method_response':
+                return response.content['_arguments']
+            elif response.properties['qmf.opcode'] == '_exception':
+                raise Exception(response.content['_values'])
+            else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode'])
+        else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id'])
+    def create_exchange(self, name, exchange_type=None, options={}):
+        properties = options
+        if exchange_type: properties["exchange_type"] = exchange_type
+        self.invoke("create", {"type": "exchange", "name":name, "properties":properties})
+
+    def create_queue(self, name, properties={}):
+        self.invoke("create", {"type": "queue", "name":name, "properties":properties})
+
+    def delete_exchange(self, name):
+        self.invoke("delete", {"type": "exchange", "name":name})
+
+    def delete_queue(self, name):
+        self.invoke("delete", {"type": "queue", "name":name})
+
+    def get_timestamp_cfg(self):
+        return self.invoke("getTimestampConfig", {})
+
+    def set_timestamp_cfg(self, receive):
+        return self.invoke("getTimestampConfig", {"receive":receive})

Modified: qpid/branches/QPID-2519/cpp/src/tests/allhosts
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/allhosts?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/allhosts (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/allhosts Fri Oct 21 14:42:12 2011
@@ -29,11 +29,12 @@ Options:
   -s SECONDS  sleep between starting commands.
   -q          don't print banner lines for each host.
   -o SUFFIX   log output of each command to <host>.SUFFIX
+  -X          passed to ssh - forward X connection.
 "
     exit 1
 }
 
-while getopts "tl:bs:dqo:" opt; do
+while getopts "tl:bs:dqo:X" opt; do
     case $opt in
 	l) SSHOPTS="-l$OPTARG $SSHOPTS" ;;
 	t) SSHOPTS="-t $SSHOPTS" ;;
@@ -42,6 +43,7 @@ while getopts "tl:bs:dqo:" opt; do
 	s) SLEEP="sleep $OPTARG" ;;
 	q) NOBANNER=1 ;;
 	o) SUFFIX=$OPTARG ;;
+	X) SSHOPTS="-X $SSHOPTS" ;;
 	*) usage;;
     esac
 done

Modified: qpid/branches/QPID-2519/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/brokertest.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/brokertest.py Fri Oct 21 14:42:12 2011
@@ -29,6 +29,7 @@ from unittest import TestCase
 from copy import copy
 from threading import Thread, Lock, Condition
 from logging import getLogger
+import qmf.console
 
 log = getLogger("qpid.brokertest")
 
@@ -61,24 +62,6 @@ def is_running(pid):
 class BadProcessStatus(Exception):
     pass
 
-class ExceptionWrapper:
-    """Proxy object that adds a message to exceptions raised"""
-    def __init__(self, obj, msg):
-        self.obj = obj
-        self.msg = msg
-        
-    def __getattr__(self, name):
-        func = getattr(self.obj, name)
-        if type(func) != callable:
-            return func
-        return lambda *args, **kwargs: self._wrap(func, args, kwargs)
-
-    def _wrap(self, func, args, kwargs):
-        try:
-            return func(*args, **kwargs)
-        except Exception, e:
-            raise Exception("%s: %s" %(self.msg, str(e)))
-
 def error_line(filename, n=1):
     """Get the last n line(s) of filename for error messages"""
     result = []
@@ -88,7 +71,8 @@ def error_line(filename, n=1):
             for l in f:
                 if len(result) == n:  result.pop(0)
                 result.append("    "+l)
-        finally: f.close()
+        finally:
+            f.close()
     except: return ""
     return ":\n" + "".join(result)
 
@@ -96,111 +80,90 @@ def retry(function, timeout=10, delay=.0
     """Call function until it returns True or timeout expires.
     Double the delay for each retry. Return True if function
     returns true, False if timeout expires."""
+    deadline = time.time() + timeout
     while not function():
-        if delay > timeout: delay = timeout
+        remaining = deadline - time.time()
+        if remaining <= 0: return False
+        delay = min(delay, remaining)
         time.sleep(delay)
-        timeout -= delay
-        if timeout <= 0: return False
         delay *= 2
     return True
 
+class AtomicCounter:
+    def __init__(self):
+        self.count = 0
+        self.lock = Lock()
+
+    def next(self):
+        self.lock.acquire();
+        ret = self.count
+        self.count += 1
+        self.lock.release();
+        return ret
+
+_popen_id = AtomicCounter() # Popen identifier for use in output file names.
+
+# Constants for file descriptor arguments to Popen
+FILE = "FILE"                       # Write to file named after process
+PIPE = subprocess.PIPE
+
 class Popen(subprocess.Popen):
     """
     Can set and verify expectation of process status at end of test.
     Dumps command line, stdout, stderr to data dir for debugging.
     """
 
-    class DrainThread(Thread):
-        """Thread to drain a file object and write the data to a file."""
-        def __init__(self, infile, outname):
-            Thread.__init__(self)
-            self.infile, self.outname = infile, outname
-            self.outfile = None
-
-        def run(self):
-            try:
-                for line in self.infile:
-                    if self.outfile is None:
-                        self.outfile = open(self.outname, "w")
-                    self.outfile.write(line)
-            finally:
-                self.infile.close()
-                if self.outfile is not None: self.outfile.close()
-
-    class OutStream(ExceptionWrapper):
-        """Wrapper for output streams, handles exceptions & draining output"""
-        def __init__(self, infile, outfile, msg):
-            ExceptionWrapper.__init__(self, infile, msg)
-            self.infile, self.outfile = infile, outfile
-            self.thread = None
-
-        def drain(self):
-            if self.thread is None:
-                self.thread = Popen.DrainThread(self.infile, self.outfile)
-                self.thread.start()
-
-    def outfile(self, ext): return "%s.%s" % (self.pname, ext)
-
-    def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
-        """Run cmd (should be a list of arguments)
+    def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
+        """Run cmd (should be a list of program and arguments)
         expect - if set verify expectation at end of test.
-        drain  - if true (default) drain stdout/stderr to files.
+        stdout, stderr - can have the same values as for subprocess.Popen as well as
+          FILE (the default) which means write to a file named after the process.
+        stdin - like subprocess.Popen but defauts to PIPE
         """
         self._clean = False
         self._clean_lock = Lock()
         assert find_exe(cmd[0]), "executable not found: "+cmd[0]
         if type(cmd) is type(""): cmd = [cmd] # Make it a list.
         self.cmd  = [ str(x) for x in cmd ]
-        self.returncode = None
         self.expect = expect
-        try:
-            subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True)
-        except ValueError:     # Windows can't do close_fds
-            subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE)
-        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
-        msg = "Process %s" % self.pname
-        self.stdin = ExceptionWrapper(self.stdin, msg)
-        self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg)
-        self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg)
+        self.id = _popen_id.next()
+        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
+        if stdout == FILE: stdout = open(self.outfile("out"), "w")
+        if stderr == FILE: stderr = open(self.outfile("err"), "w")
+        try:
+            subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+                                      stdin=stdin, stdout=stdout, stderr=stderr,
+                                      close_fds=True)
+        except ValueError: # Windows can't do close_fds
+            subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+                                      stdin=stdin, stdout=stdout, stderr=stderr)
+
         f = open(self.outfile("cmd"), "w")
-        try: f.write(self.cmd_str())
+        try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
         finally: f.close()
         log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
-        if drain: self.drain()
 
-        def __str__(self): return "Popen<%s>"%(self.pname)
+    def __str__(self): return "Popen<%s>"%(self.pname)
 
-    def drain(self):
-        """Start threads to drain stdout/err"""
-        self.stdout.drain()
-        self.stderr.drain()
-
-    def _cleanup(self):
-        """Close pipes to sub-process"""
-        self._clean_lock.acquire()
-        try:
-            if self._clean: return
-            self._clean = True
-            self.stdin.close()
-            self.drain()                    # Drain output pipes.
-            self.stdout.thread.join()       # Drain thread closes pipe.
-            self.stderr.thread.join()
-        finally: self._clean_lock.release()
+    def outfile(self, ext): return "%s.%s" % (self.pname, ext)
 
     def unexpected(self,msg):
         err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
         raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
-    
+
     def stop(self):                  # Clean up at end of test.
         try:
             if self.expect == EXPECT_UNKNOWN:
                 try: self.kill()            # Just make sure its dead
                 except: pass
             elif self.expect == EXPECT_RUNNING:
-                try:
-                    self.kill()
-                except:
-                    self.unexpected("expected running, exit code %d" % self.wait())
+                    if self.poll() != None:
+                        self.unexpected("expected running, exit code %d" % self.returncode)
+                    else:
+                        try:
+                            self.kill()
+                        except Exception,e:
+                            self.unexpected("exception from kill: %s" % str(e))
             else:
                 retry(lambda: self.poll() is not None)
                 if self.returncode is None: # Still haven't stopped
@@ -212,40 +175,21 @@ class Popen(subprocess.Popen):
                     self.unexpected("expected error")
         finally:
             self.wait()                 # Clean up the process.
-               
+
     def communicate(self, input=None):
-        if input:
-            self.stdin.write(input)
-            self.stdin.close()
-        outerr = (self.stdout.read(), self.stderr.read())
-        self.wait()
-        return outerr
+        ret = subprocess.Popen.communicate(self, input)
+        self.cleanup()
+        return ret
 
-    def is_running(self):
-        return self.poll() is None
+    def is_running(self): return self.poll() is None
 
     def assert_running(self):
         if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
 
-    def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
-        if self.returncode is None:
-            # Pass _deadstate only if it has been set, there is no _deadstate
-            # parameter in Python 2.6 
-            if _deadstate is None: ret = subprocess.Popen.poll(self)
-            else: ret = subprocess.Popen.poll(self, _deadstate)
-
-            if (ret != -1):
-                self.returncode = ret
-                self._cleanup()
-        return self.returncode
-
     def wait(self):
-        if self.returncode is None:
-            self.drain()
-            try: self.returncode = subprocess.Popen.wait(self)
-            except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
-            self._cleanup()
-        return self.returncode
+        ret = subprocess.Popen.wait(self)
+        self._cleanup()
+        return ret
 
     def terminate(self):
         try: subprocess.Popen.terminate(self)
@@ -254,7 +198,8 @@ class Popen(subprocess.Popen):
                 os.kill( self.pid , signal.SIGTERM)
             except AttributeError: # no os.kill, using taskkill.. (Windows only)
                 os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
-            
+        self._cleanup()
+
     def kill(self):
         try: subprocess.Popen.kill(self)
         except AttributeError:          # No terminate method
@@ -262,6 +207,20 @@ class Popen(subprocess.Popen):
                 os.kill( self.pid , signal.SIGKILL)
             except AttributeError: # no os.kill, using taskkill.. (Windows only)
                 os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
+        self._cleanup()
+
+    def _cleanup(self):
+        """Clean up after a dead process"""
+        self._clean_lock.acquire()
+        if not self._clean:
+            self._clean = True
+            try: self.stdin.close()
+            except: pass
+            try: self.stdout.close()
+            except: pass
+            try: self.stderr.close()
+            except: pass
+        self._clean_lock.release()
 
     def cmd_str(self): return " ".join([str(s) for s in self.cmd])
 
@@ -288,11 +247,11 @@ class Broker(Popen):
         while (os.path.exists(self.log)):
             self.log = "%s-%d.log" % (self.name, i)
             i += 1
-    
+
     def get_log(self):
         return os.path.abspath(self.log)
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
+    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
@@ -318,15 +277,20 @@ class Broker(Popen):
         cmd += ["--log-to-file", self.log]
         cmd += ["--log-to-stderr=no"]
         if log_level != None:
-            cmd += ["--log-enable=%s" % log_level] 
+            cmd += ["--log-enable=%s" % log_level]
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
-        Popen.__init__(self, cmd, expect, drain=False)
+        if show_cmd: print cmd
+        Popen.__init__(self, cmd, expect, stdout=PIPE)
         test.cleanup_stop(self)
         self._host = "127.0.0.1"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
         self._log_ready = False
 
+    def startQmf(self, handler=None):
+        self.qmf_session = qmf.console.Session(handler)
+        self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+
     def host(self): return self._host
 
     def port(self):
@@ -357,7 +321,7 @@ class Broker(Popen):
         s = c.session(str(qpid.datatypes.uuid4()))
         s.queue_declare(queue=queue)
         c.close()
-    
+
     def _prep_sender(self, queue, durable, xprops):
         s = queue + "; {create:always, node:{durable:" + str(durable)
         if xprops != None: s += ", x-declare:{" + xprops + "}"
@@ -401,13 +365,14 @@ class Broker(Popen):
 
     def log_ready(self):
         """Return true if the log file exists and contains a broker ready message"""
-        if self._log_ready: return True
-        self._log_ready = find_in_file("notice Broker running", self.log)
+        if not self._log_ready:
+            self._log_ready = find_in_file("notice Broker running", self.log)
+        return self._log_ready
 
     def ready(self, **kwargs):
         """Wait till broker is ready to serve clients"""
         # First make sure the broker is listening by checking the log.
-        if not retry(self.log_ready, timeout=30):
+        if not retry(self.log_ready, timeout=60):
             raise Exception(
                 "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
         # Create a connection and a session. For a cluster broker this will
@@ -416,23 +381,27 @@ class Broker(Popen):
             c = self.connect(**kwargs)
             try: c.session()
             finally: c.close()
-        except: raise RethrownException(
-            "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
+        except Exception,e: raise RethrownException(
+            "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
 
     def store_state(self):
-        uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
+        f = open(os.path.join(self.datadir, "cluster", "store.status"))
+        try: uuids = f.readlines()
+        finally: f.close()
         null_uuid="00000000-0000-0000-0000-000000000000\n"
         if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
         if uuids[0] == null_uuid: return "empty"
         if uuids[1] == null_uuid: return "dirty"
         return "clean"
-        
+
 class Cluster:
     """A cluster of brokers in a test."""
+    # Client connection options for use in failover tests.
+    CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
 
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
@@ -443,16 +412,19 @@ class Cluster:
         self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
         assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
-        self.start_n(count, expect=expect, wait=wait)
+        self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
 
-    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
-        self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
+        self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
         return self._brokers[-1]
 
-    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
-        for i in range(count): self.start(expect=expect, wait=wait, args=args)
+    def ready(self):
+        for b in self: b.ready()
+
+    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
+        for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -481,7 +453,7 @@ class BrokerTest(TestCase):
     rootdir = os.getcwd()
 
     def configure(self, config): self.config=config
-    
+
     def setUp(self):
         outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
         self.dir = os.path.join(self.rootdir, outdir, self.id())
@@ -502,41 +474,50 @@ class BrokerTest(TestCase):
         """Call thing.stop at end of test"""
         self.stopem.append(stopable)
 
-    def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+    def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
         """Start a process that will be killed at end of test, in the test dir."""
         os.chdir(self.dir)
-        p = Popen(cmd, expect, drain)
+        p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False):
         """Create and return a broker ready for use"""
-        b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
+        b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd)
         if (wait):
             try: b.ready()
             except Exception, e:
                 raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait=wait)
+        cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
         return cluster
 
+    def browse(self, session, queue, timeout=0):
+        """Return a list with the contents of each message on queue."""
+        r = session.receiver("%s;{mode:browse}"%(queue))
+        r.capacity = 100
+        try:
+            contents = []
+            try:
+                while True: contents.append(r.fetch(timeout=timeout).content)
+            except messaging.Empty: pass
+        finally: r.close()
+        return contents
+
     def assert_browse(self, session, queue, expect_contents, timeout=0):
         """Assert that the contents of messages on queue (as retrieved
         using session and timeout) exactly match the strings in
         expect_contents"""
-
-        r = session.receiver("%s;{mode:browse}"%(queue))
-        actual_contents = []
-        try:
-            for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
-            while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
-        except messaging.Empty: pass
-        r.close()
+        actual_contents = self.browse(session, queue, timeout)
         self.assertEqual(expect_contents, actual_contents)
 
+def join(thread, timeout=10):
+    thread.join(timeout)
+    if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
+
 class RethrownException(Exception):
     """Captures the stack trace of the current exception to be thrown later"""
     def __init__(self, msg=""):
@@ -554,15 +535,16 @@ class StoppableThread(Thread):
 
     def stop(self):
         self.stopped = True
-        self.join()
+        join(self)
         if self.error: raise self.error
-    
+
 class NumberedSender(Thread):
     """
     Thread to run a sender client and send numbered messages until stopped.
     """
 
-    def __init__(self, broker, max_depth=None, queue="test-queue"):
+    def __init__(self, broker, max_depth=None, queue="test-queue",
+                 connection_options=Cluster.CONNECTION_OPTIONS):
         """
         max_depth: enable flow control, ensure sent - received <= max_depth.
         Requires self.notify_received(n) to be called each time messages are received.
@@ -573,9 +555,11 @@ class NumberedSender(Thread):
              "--broker", "localhost:%s"%broker.port(),
              "--address", "%s;{create:always}"%queue,
              "--failover-updates",
+             "--connection-options", "{%s}"%(connection_options),
              "--content-stdin"
              ],
-            expect=EXPECT_RUNNING)
+            expect=EXPECT_RUNNING,
+            stdin=PIPE)
         self.condition = Condition()
         self.max = max_depth
         self.received = 0
@@ -590,6 +574,7 @@ class NumberedSender(Thread):
         try:
             self.sent = 0
             while not self.stopped:
+                self.sender.assert_running()
                 if self.max:
                     self.condition.acquire()
                     while not self.stopped and self.sent - self.received > self.max:
@@ -612,16 +597,17 @@ class NumberedSender(Thread):
             self.stopped = True
             self.condition.notify()
         finally: self.condition.release()
-        self.join()
+        join(self)
         self.write_message(-1)          # end-of-messages marker.
         if self.error: raise self.error
-        
+
 class NumberedReceiver(Thread):
     """
     Thread to run a receiver client and verify it receives
     sequentially numbered messages.
     """
-    def __init__(self, broker, sender = None, queue="test-queue"):
+    def __init__(self, broker, sender = None, queue="test-queue",
+                 connection_options=Cluster.CONNECTION_OPTIONS):
         """
         sender: enable flow control. Call sender.received(n) for each message received.
         """
@@ -632,22 +618,24 @@ class NumberedReceiver(Thread):
              "--broker", "localhost:%s"%broker.port(),
              "--address", "%s;{create:always}"%queue,
              "--failover-updates",
+             "--connection-options", "{%s}"%(connection_options),
              "--forever"
              ],
             expect=EXPECT_RUNNING,
-            drain=False)
+            stdout=PIPE)
         self.lock = Lock()
         self.error = None
         self.sender = sender
+        self.received = 0
 
     def read_message(self):
         return int(self.receiver.stdout.readline())
-    
+
     def run(self):
         try:
-            self.received = 0
             m = self.read_message()
             while m != -1:
+                self.receiver.assert_running()
                 assert(m <= self.received) # Check for missing messages
                 if (m == self.received): # Ignore duplicates
                     self.received += 1
@@ -659,7 +647,7 @@ class NumberedReceiver(Thread):
 
     def stop(self):
         """Returns when termination message is received"""
-        self.join()
+        join(self)
         if self.error: raise self.error
 
 class ErrorGenerator(StoppableThread):
@@ -674,7 +662,7 @@ class ErrorGenerator(StoppableThread):
         self.broker=broker
         broker.test.cleanup_stop(self)
         self.start()
-        
+
     def run(self):
         c = self.broker.connect_old()
         try:

Modified: qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py Fri Oct 21 14:42:12 2011
@@ -365,6 +365,26 @@ class CliTests(TestBase010):
                 self.assertEqual(queue._altExchange_.name, altName)
         self.assertEqual(found, True)
 
+    def test_qpid_config_list_queues_arguments(self):
+        """
+        Test to verify that when the type of a policy limit is
+        actually a string (though still a valid value), it does not
+        upset qpid-config
+        """
+        self.startQmf();
+        qmf = self.qmf
+
+        names = ["queue_capacity%s" % (i) for i in range(1, 6)]
+        for name in names:
+            self.session.queue_declare(queue=name, exclusive=True,
+                                       arguments={'qpid.max_count' : str(i), 'qpid.max_size': '100'})
+
+        output = os.popen(self.qpid_config_command(" queues")).readlines()
+        queues = [line.split()[0] for line in output[2:len(output)]] #ignore first two lines (header)
+
+        for name in names:
+            assert name in queues, "%s not in %s" % (name, queues)
+
     def test_qpid_route(self):
         self.startQmf();
         qmf = self.qmf
@@ -405,7 +425,7 @@ class CliTests(TestBase010):
         qmf = self.qmf
 
         ret = self.qpid_route_api("dynamic add "
-                                  + " --sasl-mechanism PLAIN "
+                                  + " --client-sasl-mechanism PLAIN "
                                   + "guest/guest@localhost:"+str(self.broker.port) + " "
                                   + str(self.remote_host())+":"+str(self.remote_port()) + " "
                                   +"amq.direct")
@@ -424,7 +444,7 @@ class CliTests(TestBase010):
         qmf = self.qmf
 
         ret = self.qpid_route_api("dynamic add "
-                                  + " --sasl-mechanism PLAIN "
+                                  + " --client-sasl-mechanism PLAIN "
                                   + "localhost:"+str(self.broker.port) + " "
                                   + str(self.remote_host())+":"+str(self.remote_port()) + " "
                                   +"amq.direct")

Modified: qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt Fri Oct 21 14:42:12 2011
@@ -1,32 +1,4 @@
 qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue
 qpid_tests.broker_0_10.management.ManagementTest.test_connection_close
-qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid
-qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_recover
-qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_select_required
-qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume
 qpid_tests.broker_0_10.message.MessageTests.test_ttl
 qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI

Modified: qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py Fri Oct 21 14:42:12 2011
@@ -53,16 +53,19 @@ def filter_log(log):
         'stall for update|unstall, ignore update|cancelled offer .* unstall',
         'caught up',
         'active for links|Passivating links|Activating links',
+        'info Connecting: .*', # UpdateClient connection
         'info Connection.* connected to', # UpdateClient connection
-        'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+        'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
         'warning Broker closed connection: 200, OK',
         'task late',
         'task overran',
         'warning CLOSING .* unsent data',
         'Inter-broker link ',
-        'Running in a cluster, marking store'
+        'Running in a cluster, marking store',
+        'debug Sending keepalive signal to watchdog', # Watchdog timer thread
+        'last broker standing joined by 1 replicas, updating queue policies.',
+        'Connection .* timed out: closing' # heartbeat connection close
         ])
-    skip_re = re.compile(skip)
     # Regex to match a UUID
     uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
     # Substitutions to remove expected differences
@@ -80,6 +83,13 @@ def filter_log(log):
         (r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
         (r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
         ]
+    # Substitutions to mask known issue: durable test shows inconsistent "changed stats for com.redhat.rhm.store:journal" messages.
+    skip += '|Changed V[12] statistics com.redhat.rhm.store:journal'
+    subs += [(r'to=console.obj.1.0.com.redhat.rhm.store.journal props=\d+ stats=\d+',
+              'to=console.obj.1.0.com.redhat.rhm.store.journal props=NN stats=NN')]
+
+    skip_re = re.compile(skip)
+    subs = [(re.compile(pattern), subst) for pattern, subst in subs]
     for l in open(log):
         if skip_re.search(l): continue
         for pattern,subst in subs: l = re.sub(pattern,subst,l)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org