You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [16/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/QueueTest.cpp Fri Oct 21 14:42:12 2011
@@ -36,6 +36,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+
#include <iostream>
#include "boost/format.hpp"
@@ -53,12 +56,12 @@ class TestConsumer : public virtual Cons
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- intrusive_ptr<Message> last;
+ QueuedMessage last;
bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
- last = msg.payload;
+ last = msg;
received = true;
return true;
};
@@ -78,13 +81,14 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
return msg;
}
@@ -145,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
//Test cancellation:
queue->cancel(c1);
@@ -210,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;
@@ -244,7 +248,7 @@ QPID_AUTO_TEST_CASE(testBound){
exchange2.reset();
//unbind the queue from all exchanges it knows it has been bound to:
- queue->unbind(exchanges, queue);
+ queue->unbind(exchanges);
//ensure the remaining exchanges don't still have the queue bound to them:
FailOnDeliver deliverable;
@@ -254,26 +258,26 @@ QPID_AUTO_TEST_CASE(testBound){
QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
client::QueueOptions args;
- args.setPersistLastNode();
+ args.setPersistLastNode();
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
- //change mode
- queue->setLastNodeFailure();
+ //change mode
+ queue->setLastNodeFailure();
- //enqueue 1 message
+ //enqueue 1 message
queue->deliver(msg3);
- //check all have persistent ids.
+ //check all have persistent ids.
BOOST_CHECK(msg1->isPersistent());
BOOST_CHECK(msg2->isPersistent());
BOOST_CHECK(msg3->isPersistent());
@@ -283,54 +287,58 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeS
QPID_AUTO_TEST_CASE(testSeek){
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
- TestConsumer::shared_ptr consumer(new TestConsumer(false));
+ TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
SequenceNumber seq(2);
consumer->position = seq;
QueuedMessage qm;
queue->dispatch(consumer);
-
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
-
+
}
QPID_AUTO_TEST_CASE(testSearch){
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
SequenceNumber seq(2);
- QueuedMessage qm = queue->find(seq);
-
+ QueuedMessage qm;
+ TestConsumer::shared_ptr c1(new TestConsumer());
+
+ BOOST_CHECK(queue->find(seq, qm));
+
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
- queue->acquire(qm);
+
+ queue->acquire(qm, c1->getName());
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
- QueuedMessage qm1 = queue->find(seq1);
+ QueuedMessage qm1;
+ BOOST_CHECK(queue->find(seq1, qm1));
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
+
}
const std::string nullxid = "";
@@ -416,10 +424,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
client::QueueOptions args;
// set queue mode
- args.setOrdering(client::LVQ);
+ args.setOrdering(client::LVQ);
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
@@ -430,16 +438,16 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
//set deliever match for LVQ a,b,c,a
string key;
- args.getLVQKey(key);
+ args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
- msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
- //enqueue 4 message
+ //enqueue 4 message
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
@@ -459,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
intrusive_ptr<Message> msg5 = create_message("e", "A");
intrusive_ptr<Message> msg6 = create_message("e", "B");
intrusive_ptr<Message> msg7 = create_message("e", "C");
- msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg5->insertCustomProperty(key,"a");
+ msg6->insertCustomProperty(key,"b");
+ msg7->insertCustomProperty(key,"c");
queue->deliver(msg5);
queue->deliver(msg6);
queue->deliver(msg7);
@@ -496,7 +504,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
queue->deliver(msg1);
queue->deliver(msg2);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -508,6 +516,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
client::QueueOptions args;
// set queue mode
args.setOrdering(client::LVQ);
+ // disable flow control, as this test violates the enqueue/dequeue sequence.
+ args.setInt(QueueFlowLimit::flowStopCountKey, 0);
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
@@ -526,12 +536,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
- msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
+ msg5->insertCustomProperty(key,"b");
+ msg6->insertCustomProperty(key,"c");
//enqueue 4 message
queue->deliver(msg1);
@@ -546,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
framing::SequenceNumber sequence1(10);
QueuedMessage qmsg3(queue.get(), 0, sequence1);
+ TestConsumer::shared_ptr dummy(new TestConsumer());
- BOOST_CHECK(!queue->acquire(qmsg));
- BOOST_CHECK(queue->acquire(qmsg2));
+ BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
+ BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
// Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2));
- BOOST_CHECK(!queue->acquire(qmsg3));
+ BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
+ BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -561,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
// set mode to no browse and check
args.setOrdering(client::LVQ_NO_BROWSE);
queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer(false));
+ TestConsumer::shared_ptr c1(new TestConsumer("test", false));
queue->dispatch(c1);
queue->dispatch(c1);
@@ -595,8 +606,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"a");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -630,7 +641,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
- queue1->configure(args);
+ queue1->create(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "A");
@@ -639,9 +650,9 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- // 3
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"a");
+ // 3
queue1->deliver(msg1);
// 4
queue1->setLastNodeFailure();
@@ -660,13 +671,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key");
- if (i % 2) {
- if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
- } else {
- if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
- }
- m->setTimestamp(new broker::ExpiryPolicy);
+ intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
+ m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
@@ -676,7 +682,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
- queue.purgeExpired();
+ queue.purgeExpired(0);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
@@ -687,7 +693,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, timer);
+ QueueCleaner cleaner(queues, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
@@ -695,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
+
+namespace {
+ // helper for group tests
+ void verifyAcquire( Queue::shared_ptr queue,
+ TestConsumer::shared_ptr c,
+ std::deque<QueuedMessage>& results,
+ const std::string& expectedGroup,
+ const int expectedId )
+ {
+ queue->dispatch(c);
+ results.push_back(c->last);
+ std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( group, expectedGroup );
+ BOOST_CHECK_EQUAL( id, expectedId );
+ }
+}
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+ //
+ // Verify that consumers of grouped messages own the groups once a message is acquired,
+ // and release the groups once all acquired messages have been dequeued or requeued
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
+ queue->configure(args);
+
+ std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+ std::string("b"), std::string("b"), std::string("b"),
+ std::string("c"), std::string("c"), std::string("c") };
+ for (int i = 0; i < 9; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", groups[i]);
+ msg->insertCustomProperty("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+ BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+
+ verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
+ verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
+
+ // now let c1 complete the 'a-0' message - this should free the 'a' group
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // now c2 should pick up the next 'a-1', since it is oldest free
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // c1 should only be able to snarf up the first "c" message now...
+ verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+ // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
+ queue->dequeue( 0, dequeMeC2.front() );
+ dequeMeC2.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+ // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+ verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
+
+ // c2 can now only grab a-2, and that's all
+ verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
+
+ // now C2 can't get any more, since C1 owns "b" and "c" group...
+ bool gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+ // c2 can now grab c-7
+ verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+ // what happens if C-2 "requeues" a-1 and a-2?
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front();
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front(); // now just has c-7 acquired
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+ // now c1 will grab a-1 and a-2...
+ verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+ verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+ // c2 can now acquire c-8 only
+ verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
+
+ // and c1 can get b-5
+ verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+ // should be no more acquire-able for anyone now:
+ gotOne = queue->dispatch(c1);
+ BOOST_CHECK( !gotOne );
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // requeue all of C1's acquired messages, then cancel C1
+ while (!dequeMeC1.empty()) {
+ queue->requeue(dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+ queue->cancel(c1);
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ^C2, ^C2
+
+ // b-4, a-1, a-2, b-5 all should be available, right?
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ---, ---, ---
+
+ TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+ std::deque<QueuedMessage> dequeMeC3;
+
+ verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+ verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ^C3, ^C2, ^C2
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2,
+ // Owners= ^C3,
+
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", "a");
+ msg->insertCustomProperty("MY-ID", 9);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9
+ // Owners= ^C3, ^C3
+
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", "b");
+ msg->insertCustomProperty("MY-ID", 10);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9, b-10
+ // Owners= ^C3, ^C3, ----
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+ verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c2);
+ queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+ //
+ // Verify that the same default group name is automatically applied to messages that
+ // do not specify a group name.
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
+ queue->configure(args);
+
+ for (int i = 0; i < 3; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ // no "GROUP-ID" header
+ msg->insertCustomProperty("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = 0, 1, 2
+
+ BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+ queue->dispatch(c1); // c1 now owns default group (acquired 0)
+ dequeMeC1.push_back(c1->last);
+ int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 0 );
+
+ bool gotOne = queue->dispatch(c2); // c2 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->dispatch(c1); // c1 now acquires 1
+ dequeMeC1.push_back(c1->last);
+ id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 1 );
+
+ gotOne = queue->dispatch(c2); // c2 should still get nothing
+ BOOST_CHECK( !gotOne );
+
+ while (!dequeMeC1.empty()) {
+ queue->dequeue(0, dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+
+ // now default group should be available...
+ queue->dispatch(c2); // c2 now owns default group (acquired 2)
+ id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 2 );
+
+ gotOne = queue->dispatch(c1); // c1 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c1);
+ queue->cancel(c2);
+}
+
QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
TestMessageStoreOC testStore;
@@ -702,9 +982,9 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNo
args.setPersistLastNode();
Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
- queue1->configure(args);
+ queue1->create(args);
Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
- queue2->configure(args);
+ queue2->create(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
@@ -790,7 +1070,7 @@ not requeued to the store.
Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
intrusive_ptr<Message> received;
- queue1->configure(args);
+ queue1->create(args);
// check requeue 1
intrusive_ptr<Message> msg1 = create_message("e", "C");
@@ -870,28 +1150,40 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg02(msg02);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ }
msg02->tryReleaseContent();
BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg03(msg03);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ }
msg03->tryReleaseContent();
BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg04(msg04);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ }
msg04->tryReleaseContent();
BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg05(msg05);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ }
msg05->tryReleaseContent();
BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
Modified: qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ReplicationTest.cpp Fri Oct 21 14:42:12 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testReplicationExcha
{
qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<string>("qpidd")
("--replication-exchange-name=qpid.replication")));
- ProxySessionFixture f(brokerOpts);
+ SessionFixture f(brokerOpts);
std::string dataQ("queue-1");
Modified: qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/SessionState.cpp Fri Oct 21 14:42:12 2011
@@ -43,7 +43,7 @@ using namespace qpid::framing;
// Apply f to [begin, end) and accumulate the result
template <class Iter, class T, class F>
T applyAccumulate(Iter begin, Iter end, T seed, const F& f) {
- return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, _2)));
+ return std::accumulate(begin, end, seed, boost::bind(std::plus<T>(), _1, boost::bind(f, _2)));
}
// Create a frame with a one-char string.
@@ -105,8 +105,8 @@ size_t transferN(qpid::SessionState& s,
char last = content[content.size()-1];
content.resize(content.size()-1);
size += applyAccumulate(content.begin(), content.end(), 0,
- bind(&send, ref(s),
- bind(contentFrameChar, _1, false)));
+ boost::bind(&send, boost::ref(s),
+ boost::bind(contentFrameChar, _1, false)));
size += send(s, contentFrameChar(last, true));
}
return size;
@@ -115,7 +115,7 @@ size_t transferN(qpid::SessionState& s,
// Send multiple transfers with single-byte content.
size_t transfers(qpid::SessionState& s, string content) {
return applyAccumulate(content.begin(), content.end(), 0,
- bind(transfer1Char, ref(s), _1));
+ boost::bind(transfer1Char, boost::ref(s), _1));
}
size_t contentFrameSize(size_t n=1) { return AMQFrame(( AMQContentBody())).encodedSize() + n; }
Modified: qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/TimerTest.cpp Fri Oct 21 14:42:12 2011
@@ -77,8 +77,10 @@ class TestTask : public TimerTask
BOOST_CHECK(fired);
BOOST_CHECK_EQUAL(expected_position, position);
Duration actual(start, end);
-#ifdef _WIN32
+#ifdef _MSC_VER
uint64_t difference = _abs64(expected - actual);
+#elif defined(_WIN32)
+ uint64_t difference = labs(expected - actual);
#else
uint64_t difference = abs(expected - actual);
#endif
Modified: qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/TxPublishTest.cpp Fri Oct 21 14:42:12 2011
@@ -50,10 +50,9 @@ struct TxPublishTest
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")),
+ msg(MessageUtils::createMessage("exchange", "routing_key", true)),
op(msg)
{
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}
@@ -74,7 +73,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
- BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete());
+ BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
}
QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +86,7 @@ QPID_AUTO_TEST_CASE(testCommit)
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
- BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
+ BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete());
BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
Modified: qpid/branches/QPID-2519/cpp/src/tests/Url.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Url.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Url.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Url.cpp Fri Oct 21 14:42:12 2011
@@ -60,6 +60,32 @@ QPID_AUTO_TEST_CASE(TestParseXyz) {
BOOST_CHECK_EQUAL(Url("xyz:host").str(), "amqp:xyz:host:5672");
}
+QPID_AUTO_TEST_CASE(TestParseTricky) {
+ BOOST_CHECK_EQUAL(Url("amqp").str(), "amqp:tcp:amqp:5672");
+ BOOST_CHECK_EQUAL(Url("amqp:tcp").str(), "amqp:tcp:tcp:5672");
+ // These are ambiguous parses and arguably not the best result
+ BOOST_CHECK_EQUAL(Url("amqp:876").str(), "amqp:tcp:876:5672");
+ BOOST_CHECK_EQUAL(Url("tcp:567").str(), "amqp:tcp:567:5672");
+}
+
+QPID_AUTO_TEST_CASE(TestParseIPv6) {
+ Url u1("[::]");
+ BOOST_CHECK_EQUAL(u1[0].host, "::");
+ BOOST_CHECK_EQUAL(u1[0].port, 5672);
+ Url u2("[::1]");
+ BOOST_CHECK_EQUAL(u2[0].host, "::1");
+ BOOST_CHECK_EQUAL(u2[0].port, 5672);
+ Url u3("[::127.0.0.1]");
+ BOOST_CHECK_EQUAL(u3[0].host, "::127.0.0.1");
+ BOOST_CHECK_EQUAL(u3[0].port, 5672);
+ Url u4("[2002::222:68ff:fe0b:e61a]");
+ BOOST_CHECK_EQUAL(u4[0].host, "2002::222:68ff:fe0b:e61a");
+ BOOST_CHECK_EQUAL(u4[0].port, 5672);
+ Url u5("[2002::222:68ff:fe0b:e61a]:123");
+ BOOST_CHECK_EQUAL(u5[0].host, "2002::222:68ff:fe0b:e61a");
+ BOOST_CHECK_EQUAL(u5[0].port, 123);
+}
+
QPID_AUTO_TEST_CASE(TestParseMultiAddress) {
Url::addProtocol("xyz");
URL_CHECK_STR("amqp:tcp:host:0,xyz:foo:123,tcp:foo:0,xyz:bar:1");
Modified: qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Variant.cpp Fri Oct 21 14:42:12 2011
@@ -86,6 +86,64 @@ QPID_AUTO_TEST_CASE(testConversions)
BOOST_CHECK_THROW(value.asBool(), InvalidConversion);
}
+QPID_AUTO_TEST_CASE(testConversionsFromString)
+{
+ Variant value;
+ value = "5";
+ BOOST_CHECK_EQUAL(5, value.asInt16());
+ BOOST_CHECK_EQUAL(5u, value.asUint16());
+
+ value = "-5";
+ BOOST_CHECK_EQUAL(-5, value.asInt16());
+ BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+
+ value = "18446744073709551615";
+ BOOST_CHECK_EQUAL(18446744073709551615ull, value.asUint64());
+ BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+
+ value = "9223372036854775808";
+ BOOST_CHECK_EQUAL(9223372036854775808ull, value.asUint64());
+ BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+
+ value = "-9223372036854775809";
+ BOOST_CHECK_THROW(value.asUint64(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+
+ value = "2147483648";
+ BOOST_CHECK_EQUAL(2147483648ul, value.asUint32());
+ BOOST_CHECK_THROW(value.asInt32(), InvalidConversion);
+
+ value = "-2147483649";
+ BOOST_CHECK_THROW(value.asUint32(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asInt32(), InvalidConversion);
+
+ value = "32768";
+ BOOST_CHECK_EQUAL(32768u, value.asUint16());
+ BOOST_CHECK_THROW(value.asInt16(), InvalidConversion);
+
+ value = "-32769";
+ BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asInt16(), InvalidConversion);
+
+ value = "-2.5";
+ BOOST_CHECK_EQUAL(-2.5, value.asFloat());
+
+ value = "-0.875432e10";
+ BOOST_CHECK_EQUAL(-0.875432e10, value.asDouble());
+
+ value = "-0";
+ BOOST_CHECK_EQUAL(0, value.asInt16());
+ BOOST_CHECK_EQUAL(0u, value.asUint16());
+
+ value = "-000";
+ BOOST_CHECK_EQUAL(0, value.asInt16());
+ BOOST_CHECK_EQUAL(0u, value.asUint16());
+
+ value = "-0010";
+ BOOST_CHECK_EQUAL(-10, value.asInt16());
+ BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+}
+
QPID_AUTO_TEST_CASE(testSizeConversionsUint)
{
Variant value;
Modified: qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/XmlClientSessionTest.cpp Fri Oct 21 14:42:12 2011
@@ -90,7 +90,7 @@ struct SimpleListener : public MessageLi
}
};
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
{
void declareSubscribe(const string& q="odd_blue",
const string& dest="xml")
Modified: qpid/branches/QPID-2519/cpp/src/tests/acl.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/acl.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/acl.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/acl.py Fri Oct 21 14:42:12 2011
@@ -26,10 +26,11 @@ from qpid.datatypes import uuid4
from qpid.testlib import TestBase010
from qmf.console import Session
from qpid.datatypes import Message
+import qpid.messaging
class ACLFile:
- def __init__(self):
- self.f = open('data_dir/policy.acl','w');
+ def __init__(self, policy='data_dir/policy.acl'):
+ self.f = open(policy,'w')
def write(self,line):
self.f.write(line)
@@ -50,14 +51,24 @@ class ACLTests(TestBase010):
acl = self.qmf.getObjects(_class="acl")[0]
return acl.reloadACLFile()
+ def get_acl_file(self):
+ return ACLFile(self.config.defines.get("policy-file", "data_dir/policy.acl"))
+
def setUp(self):
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl allow all all\n')
aclf.close()
TestBase010.setUp(self)
self.startQmf()
self.reload_acl()
-
+
+ def tearDown(self):
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all all\n')
+ aclf.close()
+ self.reload_acl()
+ TestBase010.tearDown(self)
+
#=====================================
# ACL general tests
#=====================================
@@ -66,7 +77,7 @@ class ACLTests(TestBase010):
"""
Test the deny all mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl allow anonymous all all\n')
aclf.write('acl allow bob@QPID create queue\n')
aclf.write('acl deny all all')
@@ -94,7 +105,7 @@ class ACLTests(TestBase010):
"""
Test the allow all mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID bind exchange\n')
aclf.write('acl allow all all')
aclf.close()
@@ -126,7 +137,7 @@ class ACLTests(TestBase010):
"""
Test empty groups
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl group\n')
aclf.write('acl group admins bob@QPID joe@QPID\n')
aclf.write('acl allow all all')
@@ -140,7 +151,7 @@ class ACLTests(TestBase010):
"""
Test illegal acl formats
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl group admins bob@QPID joe@QPID\n')
aclf.write('acl allow all all')
aclf.close()
@@ -154,7 +165,7 @@ class ACLTests(TestBase010):
Test illegal extension lines
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('group admins bob@QPID \n')
aclf.write(' \ \n')
aclf.write('joe@QPID \n')
@@ -172,7 +183,7 @@ class ACLTests(TestBase010):
"""
Test proper extention lines
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('group test1 joe@EXAMPLE.com \\ \n') # should be allowed
aclf.write(' jack@EXAMPLE.com \\ \n') # should be allowed
aclf.write('jill@TEST.COM \\ \n') # should be allowed
@@ -189,7 +200,7 @@ class ACLTests(TestBase010):
Test a user defined without a realm
Ex. group admin rajith
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('group admin bob\n') # shouldn't be allowed
aclf.write('acl deny admin bind exchange\n')
aclf.write('acl allow all all')
@@ -204,7 +215,7 @@ class ACLTests(TestBase010):
Test a user defined without a realm
Ex. group admin rajith
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('group test1 joe@EXAMPLE.com\n') # should be allowed
aclf.write('group test2 jack_123-jill@EXAMPLE.com\n') # should be allowed
aclf.write('group test4 host/somemachine.example.com@EXAMPLE.COM\n') # should be allowed
@@ -215,7 +226,7 @@ class ACLTests(TestBase010):
if (result.text.find("ACL format error",0,len(result.text)) != -1):
self.fail(result)
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('group test1 joe$H@EXAMPLE.com\n') # shouldn't be allowed
aclf.write('acl allow all all')
aclf.close()
@@ -233,7 +244,7 @@ class ACLTests(TestBase010):
Test illegal queue policy
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ding\n')
aclf.write('acl allow all all')
aclf.close()
@@ -249,7 +260,7 @@ class ACLTests(TestBase010):
Test illegal queue policy
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=-1\n')
aclf.write('acl allow all all')
aclf.close()
@@ -260,7 +271,7 @@ class ACLTests(TestBase010):
if (result.text != expected):
self.fail(result)
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuesize=9223372036854775808\n')
aclf.write('acl allow all all')
aclf.close()
@@ -277,7 +288,7 @@ class ACLTests(TestBase010):
Test illegal queue policy
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=-1\n')
aclf.write('acl allow all all')
aclf.close()
@@ -288,7 +299,7 @@ class ACLTests(TestBase010):
if (result.text != expected):
self.fail(result)
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q2 maxqueuecount=9223372036854775808\n')
aclf.write('acl allow all all')
aclf.close()
@@ -308,7 +319,7 @@ class ACLTests(TestBase010):
"""
Test cases for queue acl in allow mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create queue name=q1 durable=true passive=true\n')
aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
aclf.write('acl deny bob@QPID access queue name=q3\n')
@@ -411,7 +422,7 @@ class ACLTests(TestBase010):
"""
Test cases for queue acl in deny mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl allow bob@QPID create queue name=q1 durable=true passive=true\n')
aclf.write('acl allow bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
aclf.write('acl allow bob@QPID access queue name=q3\n')
@@ -534,7 +545,7 @@ class ACLTests(TestBase010):
"""
Test cases for exchange acl in allow mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID create exchange name=testEx durable=true passive=true\n')
aclf.write('acl deny bob@QPID create exchange name=ex1 type=direct\n')
aclf.write('acl deny bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
@@ -665,7 +676,7 @@ class ACLTests(TestBase010):
"""
Test cases for exchange acl in deny mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl allow bob@QPID create exchange name=myEx durable=true passive=false\n')
aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
aclf.write('acl allow bob@QPID unbind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
@@ -772,6 +783,52 @@ class ACLTests(TestBase010):
if (403 == e.args[0].error_code):
self.fail("ACL should allow exchange delete request for myEx");
+ def test_create_and_delete_exchange_via_qmf(self):
+ """
+ Test acl is enforced when creating/deleting via QMF
+ methods. Note that in order to be able to send the QMF methods
+ and receive the responses a significant amount of permissions
+ need to be enabled (TODO: can the set below be narrowed down
+ at all?)
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create exchange\n')
+ aclf.write('acl allow admin@QPID delete exchange\n')
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+ bob.create_exchange("my-exchange") #should pass
+ #cleanup by deleting exchange
+ try:
+ bob.delete_exchange("my-exchange") #should fail
+ self.fail("ACL should deny exchange delete request for my-exchange");
+ except Exception, e:
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ admin = BrokerAdmin(self.config.broker, "admin", "admin")
+ admin.delete_exchange("my-exchange") #should pass
+
+ anonymous = BrokerAdmin(self.config.broker)
+ try:
+ anonymous.create_exchange("another-exchange") #should fail
+ self.fail("ACL should deny exchange create request for another-exchange");
+ except Exception, e:
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+
+
#=====================================
# ACL consume tests
#=====================================
@@ -780,7 +837,7 @@ class ACLTests(TestBase010):
"""
Test cases for consume in allow mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID consume queue name=q1\n')
aclf.write('acl deny bob@QPID consume queue name=q2\n')
aclf.write('acl allow all all')
@@ -826,7 +883,7 @@ class ACLTests(TestBase010):
"""
Test cases for consume in allow mode
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl allow bob@QPID consume queue name=q1\n')
aclf.write('acl allow bob@QPID consume queue name=q2\n')
aclf.write('acl allow bob@QPID create queue\n')
@@ -872,7 +929,7 @@ class ACLTests(TestBase010):
"""
Test various publish acl
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl deny bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
aclf.write('acl deny bob@QPID publish exchange name=amq.topic\n')
aclf.write('acl deny bob@QPID publish exchange name=myEx routingkey=rk2\n')
@@ -921,7 +978,7 @@ class ACLTests(TestBase010):
"""
Test various publish acl
"""
- aclf = ACLFile()
+ aclf = self.get_acl_file()
aclf.write('acl allow bob@QPID publish exchange name=amq.direct routingkey=rk1\n')
aclf.write('acl allow bob@QPID publish exchange name=amq.topic\n')
aclf.write('acl allow bob@QPID publish exchange name=myEx routingkey=rk2\n')
@@ -972,3 +1029,113 @@ class ACLTests(TestBase010):
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
+
+ #=====================================
+ # ACL broker configuration tests
+ #=====================================
+
+ def test_broker_timestamp_config(self):
+ """
+ Test ACL control of the broker timestamp configuration
+ """
+ aclf = self.get_acl_file()
+ # enable lots of stuff to allow QMF to work
+ aclf.write('acl allow all create exchange\n')
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all publish exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ # this should let bob access the timestamp configuration
+ aclf.write('acl allow bob@QPID access broker\n')
+ aclf.write('acl allow admin@QPID all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ ts = None
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+ ts = bob.get_timestamp_cfg() #should work
+ bob.set_timestamp_cfg(ts); #should work
+
+ obo = BrokerAdmin(self.config.broker, "obo", "obo")
+ try:
+ ts = obo.get_timestamp_cfg() #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ try:
+ obo.set_timestamp_cfg(ts) #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ admin = BrokerAdmin(self.config.broker, "admin", "admin")
+ ts = admin.get_timestamp_cfg() #should pass
+ admin.set_timestamp_cfg(ts) #should pass
+
+
+class BrokerAdmin:
+ def __init__(self, broker, username=None, password=None):
+ self.connection = qpid.messaging.Connection(broker)
+ if username:
+ self.connection.username = username
+ self.connection.password = password
+ self.connection.sasl_mechanisms = "PLAIN"
+ self.connection.open()
+ self.session = self.connection.session()
+ self.sender = self.session.sender("qmf.default.direct/broker")
+ self.reply_to = "responses-#; {create:always}"
+ self.receiver = self.session.receiver(self.reply_to)
+
+ def invoke(self, method, arguments):
+ content = {
+ "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"},
+ "_method_name": method,
+ "_arguments": arguments
+ }
+ request = qpid.messaging.Message(reply_to=self.reply_to, content=content)
+ request.properties["x-amqp-0-10.app-id"] = "qmf2"
+ request.properties["qmf.opcode"] = "_method_request"
+ self.sender.send(request)
+ response = self.receiver.fetch()
+ self.session.acknowledge()
+ if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
+ if response.properties['qmf.opcode'] == '_method_response':
+ return response.content['_arguments']
+ elif response.properties['qmf.opcode'] == '_exception':
+ raise Exception(response.content['_values'])
+ else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode'])
+ else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id'])
+ def create_exchange(self, name, exchange_type=None, options={}):
+ properties = options
+ if exchange_type: properties["exchange_type"] = exchange_type
+ self.invoke("create", {"type": "exchange", "name":name, "properties":properties})
+
+ def create_queue(self, name, properties={}):
+ self.invoke("create", {"type": "queue", "name":name, "properties":properties})
+
+ def delete_exchange(self, name):
+ self.invoke("delete", {"type": "exchange", "name":name})
+
+ def delete_queue(self, name):
+ self.invoke("delete", {"type": "queue", "name":name})
+
+ def get_timestamp_cfg(self):
+ return self.invoke("getTimestampConfig", {})
+
+ def set_timestamp_cfg(self, receive):
+ return self.invoke("getTimestampConfig", {"receive":receive})
Modified: qpid/branches/QPID-2519/cpp/src/tests/allhosts
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/allhosts?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/allhosts (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/allhosts Fri Oct 21 14:42:12 2011
@@ -29,11 +29,12 @@ Options:
-s SECONDS sleep between starting commands.
-q don't print banner lines for each host.
-o SUFFIX log output of each command to <host>.SUFFIX
+ -X passed to ssh - forward X connection.
"
exit 1
}
-while getopts "tl:bs:dqo:" opt; do
+while getopts "tl:bs:dqo:X" opt; do
case $opt in
l) SSHOPTS="-l$OPTARG $SSHOPTS" ;;
t) SSHOPTS="-t $SSHOPTS" ;;
@@ -42,6 +43,7 @@ while getopts "tl:bs:dqo:" opt; do
s) SLEEP="sleep $OPTARG" ;;
q) NOBANNER=1 ;;
o) SUFFIX=$OPTARG ;;
+ X) SSHOPTS="-X $SSHOPTS" ;;
*) usage;;
esac
done
Modified: qpid/branches/QPID-2519/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/brokertest.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/brokertest.py Fri Oct 21 14:42:12 2011
@@ -29,6 +29,7 @@ from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
+import qmf.console
log = getLogger("qpid.brokertest")
@@ -61,24 +62,6 @@ def is_running(pid):
class BadProcessStatus(Exception):
pass
-class ExceptionWrapper:
- """Proxy object that adds a message to exceptions raised"""
- def __init__(self, obj, msg):
- self.obj = obj
- self.msg = msg
-
- def __getattr__(self, name):
- func = getattr(self.obj, name)
- if type(func) != callable:
- return func
- return lambda *args, **kwargs: self._wrap(func, args, kwargs)
-
- def _wrap(self, func, args, kwargs):
- try:
- return func(*args, **kwargs)
- except Exception, e:
- raise Exception("%s: %s" %(self.msg, str(e)))
-
def error_line(filename, n=1):
"""Get the last n line(s) of filename for error messages"""
result = []
@@ -88,7 +71,8 @@ def error_line(filename, n=1):
for l in f:
if len(result) == n: result.pop(0)
result.append(" "+l)
- finally: f.close()
+ finally:
+ f.close()
except: return ""
return ":\n" + "".join(result)
@@ -96,111 +80,90 @@ def retry(function, timeout=10, delay=.0
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
+ deadline = time.time() + timeout
while not function():
- if delay > timeout: delay = timeout
+ remaining = deadline - time.time()
+ if remaining <= 0: return False
+ delay = min(delay, remaining)
time.sleep(delay)
- timeout -= delay
- if timeout <= 0: return False
delay *= 2
return True
+class AtomicCounter:
+ def __init__(self):
+ self.count = 0
+ self.lock = Lock()
+
+ def next(self):
+ self.lock.acquire();
+ ret = self.count
+ self.count += 1
+ self.lock.release();
+ return ret
+
+_popen_id = AtomicCounter() # Popen identifier for use in output file names.
+
+# Constants for file descriptor arguments to Popen
+FILE = "FILE" # Write to file named after process
+PIPE = subprocess.PIPE
+
class Popen(subprocess.Popen):
"""
Can set and verify expectation of process status at end of test.
Dumps command line, stdout, stderr to data dir for debugging.
"""
- class DrainThread(Thread):
- """Thread to drain a file object and write the data to a file."""
- def __init__(self, infile, outname):
- Thread.__init__(self)
- self.infile, self.outname = infile, outname
- self.outfile = None
-
- def run(self):
- try:
- for line in self.infile:
- if self.outfile is None:
- self.outfile = open(self.outname, "w")
- self.outfile.write(line)
- finally:
- self.infile.close()
- if self.outfile is not None: self.outfile.close()
-
- class OutStream(ExceptionWrapper):
- """Wrapper for output streams, handles exceptions & draining output"""
- def __init__(self, infile, outfile, msg):
- ExceptionWrapper.__init__(self, infile, msg)
- self.infile, self.outfile = infile, outfile
- self.thread = None
-
- def drain(self):
- if self.thread is None:
- self.thread = Popen.DrainThread(self.infile, self.outfile)
- self.thread.start()
-
- def outfile(self, ext): return "%s.%s" % (self.pname, ext)
-
- def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
- """Run cmd (should be a list of arguments)
+ def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
+ """Run cmd (should be a list of program and arguments)
expect - if set verify expectation at end of test.
- drain - if true (default) drain stdout/stderr to files.
+ stdout, stderr - can have the same values as for subprocess.Popen as well as
+ FILE (the default) which means write to a file named after the process.
+ stdin - like subprocess.Popen but defauts to PIPE
"""
self._clean = False
self._clean_lock = Lock()
assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
- self.returncode = None
self.expect = expect
- try:
- subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True)
- except ValueError: # Windows can't do close_fds
- subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE)
- self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
- msg = "Process %s" % self.pname
- self.stdin = ExceptionWrapper(self.stdin, msg)
- self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg)
- self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg)
+ self.id = _popen_id.next()
+ self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
+ if stdout == FILE: stdout = open(self.outfile("out"), "w")
+ if stderr == FILE: stderr = open(self.outfile("err"), "w")
+ try:
+ subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+ stdin=stdin, stdout=stdout, stderr=stderr,
+ close_fds=True)
+ except ValueError: # Windows can't do close_fds
+ subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+ stdin=stdin, stdout=stdout, stderr=stderr)
+
f = open(self.outfile("cmd"), "w")
- try: f.write(self.cmd_str())
+ try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
finally: f.close()
log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
- if drain: self.drain()
- def __str__(self): return "Popen<%s>"%(self.pname)
+ def __str__(self): return "Popen<%s>"%(self.pname)
- def drain(self):
- """Start threads to drain stdout/err"""
- self.stdout.drain()
- self.stderr.drain()
-
- def _cleanup(self):
- """Close pipes to sub-process"""
- self._clean_lock.acquire()
- try:
- if self._clean: return
- self._clean = True
- self.stdin.close()
- self.drain() # Drain output pipes.
- self.stdout.thread.join() # Drain thread closes pipe.
- self.stderr.thread.join()
- finally: self._clean_lock.release()
+ def outfile(self, ext): return "%s.%s" % (self.pname, ext)
def unexpected(self,msg):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
-
+
def stop(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
except: pass
elif self.expect == EXPECT_RUNNING:
- try:
- self.kill()
- except:
- self.unexpected("expected running, exit code %d" % self.wait())
+ if self.poll() != None:
+ self.unexpected("expected running, exit code %d" % self.returncode)
+ else:
+ try:
+ self.kill()
+ except Exception,e:
+ self.unexpected("exception from kill: %s" % str(e))
else:
retry(lambda: self.poll() is not None)
if self.returncode is None: # Still haven't stopped
@@ -212,40 +175,21 @@ class Popen(subprocess.Popen):
self.unexpected("expected error")
finally:
self.wait() # Clean up the process.
-
+
def communicate(self, input=None):
- if input:
- self.stdin.write(input)
- self.stdin.close()
- outerr = (self.stdout.read(), self.stderr.read())
- self.wait()
- return outerr
+ ret = subprocess.Popen.communicate(self, input)
+ self.cleanup()
+ return ret
- def is_running(self):
- return self.poll() is None
+ def is_running(self): return self.poll() is None
def assert_running(self):
if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
- def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
- if self.returncode is None:
- # Pass _deadstate only if it has been set, there is no _deadstate
- # parameter in Python 2.6
- if _deadstate is None: ret = subprocess.Popen.poll(self)
- else: ret = subprocess.Popen.poll(self, _deadstate)
-
- if (ret != -1):
- self.returncode = ret
- self._cleanup()
- return self.returncode
-
def wait(self):
- if self.returncode is None:
- self.drain()
- try: self.returncode = subprocess.Popen.wait(self)
- except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
- self._cleanup()
- return self.returncode
+ ret = subprocess.Popen.wait(self)
+ self._cleanup()
+ return ret
def terminate(self):
try: subprocess.Popen.terminate(self)
@@ -254,7 +198,8 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
-
+ self._cleanup()
+
def kill(self):
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
@@ -262,6 +207,20 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGKILL)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
+ self._cleanup()
+
+ def _cleanup(self):
+ """Clean up after a dead process"""
+ self._clean_lock.acquire()
+ if not self._clean:
+ self._clean = True
+ try: self.stdin.close()
+ except: pass
+ try: self.stdout.close()
+ except: pass
+ try: self.stderr.close()
+ except: pass
+ self._clean_lock.release()
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
@@ -288,11 +247,11 @@ class Broker(Popen):
while (os.path.exists(self.log)):
self.log = "%s-%d.log" % (self.name, i)
i += 1
-
+
def get_log(self):
return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -318,15 +277,20 @@ class Broker(Popen):
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
+ cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
- Popen.__init__(self, cmd, expect, drain=False)
+ if show_cmd: print cmd
+ Popen.__init__(self, cmd, expect, stdout=PIPE)
test.cleanup_stop(self)
self._host = "127.0.0.1"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
self._log_ready = False
+ def startQmf(self, handler=None):
+ self.qmf_session = qmf.console.Session(handler)
+ self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+
def host(self): return self._host
def port(self):
@@ -357,7 +321,7 @@ class Broker(Popen):
s = c.session(str(qpid.datatypes.uuid4()))
s.queue_declare(queue=queue)
c.close()
-
+
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
if xprops != None: s += ", x-declare:{" + xprops + "}"
@@ -401,13 +365,14 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
- if self._log_ready: return True
- self._log_ready = find_in_file("notice Broker running", self.log)
+ if not self._log_ready:
+ self._log_ready = find_in_file("notice Broker running", self.log)
+ return self._log_ready
def ready(self, **kwargs):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=30):
+ if not retry(self.log_ready, timeout=60):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
# Create a connection and a session. For a cluster broker this will
@@ -416,23 +381,27 @@ class Broker(Popen):
c = self.connect(**kwargs)
try: c.session()
finally: c.close()
- except: raise RethrownException(
- "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
+ except Exception,e: raise RethrownException(
+ "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
def store_state(self):
- uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
+ f = open(os.path.join(self.datadir, "cluster", "store.status"))
+ try: uuids = f.readlines()
+ finally: f.close()
null_uuid="00000000-0000-0000-0000-000000000000\n"
if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
if uuids[0] == null_uuid: return "empty"
if uuids[1] == null_uuid: return "dirty"
return "clean"
-
+
class Cluster:
"""A cluster of brokers in a test."""
+ # Client connection options for use in failover tests.
+ CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
@@ -443,16 +412,19 @@ class Cluster:
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait)
+ self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
+ self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
- for i in range(count): self.start(expect=expect, wait=wait, args=args)
+ def ready(self):
+ for b in self: b.ready()
+
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -481,7 +453,7 @@ class BrokerTest(TestCase):
rootdir = os.getcwd()
def configure(self, config): self.config=config
-
+
def setUp(self):
outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
@@ -502,41 +474,50 @@ class BrokerTest(TestCase):
"""Call thing.stop at end of test"""
self.stopem.append(stopable)
- def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+ def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
- p = Popen(cmd, expect, drain)
+ p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
return cluster
+ def browse(self, session, queue, timeout=0):
+ """Return a list with the contents of each message on queue."""
+ r = session.receiver("%s;{mode:browse}"%(queue))
+ r.capacity = 100
+ try:
+ contents = []
+ try:
+ while True: contents.append(r.fetch(timeout=timeout).content)
+ except messaging.Empty: pass
+ finally: r.close()
+ return contents
+
def assert_browse(self, session, queue, expect_contents, timeout=0):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
-
- r = session.receiver("%s;{mode:browse}"%(queue))
- actual_contents = []
- try:
- for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
- while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
- except messaging.Empty: pass
- r.close()
+ actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
+def join(thread, timeout=10):
+ thread.join(timeout)
+ if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
+
class RethrownException(Exception):
"""Captures the stack trace of the current exception to be thrown later"""
def __init__(self, msg=""):
@@ -554,15 +535,16 @@ class StoppableThread(Thread):
def stop(self):
self.stopped = True
- self.join()
+ join(self)
if self.error: raise self.error
-
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
- def __init__(self, broker, max_depth=None, queue="test-queue"):
+ def __init__(self, broker, max_depth=None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -573,9 +555,11 @@ class NumberedSender(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{%s}"%(connection_options),
"--content-stdin"
],
- expect=EXPECT_RUNNING)
+ expect=EXPECT_RUNNING,
+ stdin=PIPE)
self.condition = Condition()
self.max = max_depth
self.received = 0
@@ -590,6 +574,7 @@ class NumberedSender(Thread):
try:
self.sent = 0
while not self.stopped:
+ self.sender.assert_running()
if self.max:
self.condition.acquire()
while not self.stopped and self.sent - self.received > self.max:
@@ -612,16 +597,17 @@ class NumberedSender(Thread):
self.stopped = True
self.condition.notify()
finally: self.condition.release()
- self.join()
+ join(self)
self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
-
+
class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker, sender = None, queue="test-queue"):
+ def __init__(self, broker, sender = None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -632,22 +618,24 @@ class NumberedReceiver(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{%s}"%(connection_options),
"--forever"
],
expect=EXPECT_RUNNING,
- drain=False)
+ stdout=PIPE)
self.lock = Lock()
self.error = None
self.sender = sender
+ self.received = 0
def read_message(self):
return int(self.receiver.stdout.readline())
-
+
def run(self):
try:
- self.received = 0
m = self.read_message()
while m != -1:
+ self.receiver.assert_running()
assert(m <= self.received) # Check for missing messages
if (m == self.received): # Ignore duplicates
self.received += 1
@@ -659,7 +647,7 @@ class NumberedReceiver(Thread):
def stop(self):
"""Returns when termination message is received"""
- self.join()
+ join(self)
if self.error: raise self.error
class ErrorGenerator(StoppableThread):
@@ -674,7 +662,7 @@ class ErrorGenerator(StoppableThread):
self.broker=broker
broker.test.cleanup_stop(self)
self.start()
-
+
def run(self):
c = self.broker.connect_old()
try:
Modified: qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cli_tests.py Fri Oct 21 14:42:12 2011
@@ -365,6 +365,26 @@ class CliTests(TestBase010):
self.assertEqual(queue._altExchange_.name, altName)
self.assertEqual(found, True)
+ def test_qpid_config_list_queues_arguments(self):
+ """
+ Test to verify that when the type of a policy limit is
+ actually a string (though still a valid value), it does not
+ upset qpid-config
+ """
+ self.startQmf();
+ qmf = self.qmf
+
+ names = ["queue_capacity%s" % (i) for i in range(1, 6)]
+ for name in names:
+ self.session.queue_declare(queue=name, exclusive=True,
+ arguments={'qpid.max_count' : str(i), 'qpid.max_size': '100'})
+
+ output = os.popen(self.qpid_config_command(" queues")).readlines()
+ queues = [line.split()[0] for line in output[2:len(output)]] #ignore first two lines (header)
+
+ for name in names:
+ assert name in queues, "%s not in %s" % (name, queues)
+
def test_qpid_route(self):
self.startQmf();
qmf = self.qmf
@@ -405,7 +425,7 @@ class CliTests(TestBase010):
qmf = self.qmf
ret = self.qpid_route_api("dynamic add "
- + " --sasl-mechanism PLAIN "
+ + " --client-sasl-mechanism PLAIN "
+ "guest/guest@localhost:"+str(self.broker.port) + " "
+ str(self.remote_host())+":"+str(self.remote_port()) + " "
+"amq.direct")
@@ -424,7 +444,7 @@ class CliTests(TestBase010):
qmf = self.qmf
ret = self.qpid_route_api("dynamic add "
- + " --sasl-mechanism PLAIN "
+ + " --client-sasl-mechanism PLAIN "
+ "localhost:"+str(self.broker.port) + " "
+ str(self.remote_host())+":"+str(self.remote_port()) + " "
+"amq.direct")
Modified: qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cluster_python_tests_failing.txt Fri Oct 21 14:42:12 2011
@@ -1,32 +1,4 @@
qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue
qpid_tests.broker_0_10.management.ManagementTest.test_connection_close
-qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid
-qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_recover
-qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_select_required
-qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume
qpid_tests.broker_0_10.message.MessageTests.test_ttl
qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI
Modified: qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cluster_test_logs.py Fri Oct 21 14:42:12 2011
@@ -53,16 +53,19 @@ def filter_log(log):
'stall for update|unstall, ignore update|cancelled offer .* unstall',
'caught up',
'active for links|Passivating links|Activating links',
+ 'info Connecting: .*', # UpdateClient connection
'info Connection.* connected to', # UpdateClient connection
- 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+ 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
'warning CLOSING .* unsent data',
'Inter-broker link ',
- 'Running in a cluster, marking store'
+ 'Running in a cluster, marking store',
+ 'debug Sending keepalive signal to watchdog', # Watchdog timer thread
+ 'last broker standing joined by 1 replicas, updating queue policies.',
+ 'Connection .* timed out: closing' # heartbeat connection close
])
- skip_re = re.compile(skip)
# Regex to match a UUID
uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
# Substitutions to remove expected differences
@@ -80,6 +83,13 @@ def filter_log(log):
(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
]
+ # Substitutions to mask known issue: durable test shows inconsistent "changed stats for com.redhat.rhm.store:journal" messages.
+ skip += '|Changed V[12] statistics com.redhat.rhm.store:journal'
+ subs += [(r'to=console.obj.1.0.com.redhat.rhm.store.journal props=\d+ stats=\d+',
+ 'to=console.obj.1.0.com.redhat.rhm.store.journal props=NN stats=NN')]
+
+ skip_re = re.compile(skip)
+ subs = [(re.compile(pattern), subst) for pattern, subst in subs]
for l in open(log):
if skip_re.search(l): continue
for pattern,subst in subs: l = re.sub(pattern,subst,l)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org