You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC
svn commit: r1371676 [8/8] - in /qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/
cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/
cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Aug 10 12:04:27 2012
@@ -38,8 +38,8 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
#include <iostream>
#include <vector>
@@ -56,196 +56,47 @@ using namespace qpid::sys;
namespace qpid {
namespace tests {
-
class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- QueuedMessage last;
+ QueueCursor lastCursor;
+ Message lastMessage;
bool received;
- TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {};
- virtual bool deliver(QueuedMessage& msg){
- last = msg;
+ virtual bool deliver(const QueueCursor& cursor, const Message& message){
+ lastCursor = cursor;
+ lastMessage = message;
received = true;
return true;
};
void notify() {}
void cancel() {}
- void acknowledged(const QueuedMessage&) {}
+ void acknowledged(const DeliveryRecord&) {}
OwnershipToken* getSession() { return 0; }
};
class FailOnDeliver : public Deliverable
{
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
FailOnDeliver() : msg(MessageUtils::createMessage()) {}
void deliverTo(const boost::shared_ptr<Queue>& queue)
{
throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
- Message& getMessage() { return *(msg.get()); }
+ Message& getMessage() { return msg; }
};
-intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
- return msg;
-}
-
-intrusive_ptr<Message> contentMessage(string content) {
- intrusive_ptr<Message> m(MessageUtils::createMessage());
- MessageUtils::addContent(m, content);
- return m;
-}
-
-string getContent(intrusive_ptr<Message> m) {
- return m->getFrames().getContent();
-}
-
QPID_AUTO_TEST_SUITE(QueueTestSuite)
-QPID_AUTO_TEST_CASE(testAsyncMessage) {
- Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> received;
-
- TestConsumer::shared_ptr c1(new TestConsumer());
- queue->consume(c1);
-
-
- //Test basic delivery:
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
- queue->process(msg1);
- sleep(2);
-
- BOOST_CHECK(!c1->received);
- msg1->enqueueComplete();
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
-}
-
-
-QPID_AUTO_TEST_CASE(testAsyncMessageCount){
- Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
-
- queue->process(msg1);
- sleep(2);
- uint32_t compval=0;
- BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
- msg1->enqueueComplete();
- compval=1;
- BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
- BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
-}
-
-QPID_AUTO_TEST_CASE(testConsumers){
- Queue::shared_ptr queue(new Queue("my_queue", true));
-
- //Test adding consumers:
- TestConsumer::shared_ptr c1(new TestConsumer());
- TestConsumer::shared_ptr c2(new TestConsumer());
- queue->consume(c1);
- queue->consume(c2);
-
- BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
-
- //Test basic delivery:
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- queue->deliver(msg1);
- BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
-
- queue->deliver(msg2);
- BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
-
- c1->received = false;
- queue->deliver(msg3);
- BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
-
- //Test cancellation:
- queue->cancel(c1);
- BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount());
- queue->cancel(c2);
- BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount());
-}
-
-QPID_AUTO_TEST_CASE(testRegistry){
- //Test use of queues in registry:
- QueueRegistry registry;
- registry.declare("queue1", true, true);
- registry.declare("queue2", true, true);
- registry.declare("queue3", true, true);
-
- BOOST_CHECK(registry.find("queue1"));
- BOOST_CHECK(registry.find("queue2"));
- BOOST_CHECK(registry.find("queue3"));
-
- registry.destroy("queue1");
- registry.destroy("queue2");
- registry.destroy("queue3");
-
- BOOST_CHECK(!registry.find("queue1"));
- BOOST_CHECK(!registry.find("queue2"));
- BOOST_CHECK(!registry.find("queue3"));
-}
-
-QPID_AUTO_TEST_CASE(testDequeue){
- Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
- intrusive_ptr<Message> received;
-
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
- BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg2.get(), received.get());
- BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
-
- TestConsumer::shared_ptr consumer(new TestConsumer());
- queue->consume(consumer);
- queue->dispatch(consumer);
- if (!consumer->received)
- sleep(2);
-
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
- BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
- received = queue->get().payload;
- BOOST_CHECK(!received);
- BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
-}
-
QPID_AUTO_TEST_CASE(testBound){
//test the recording of bindings, and use of those to allow a queue to be unbound
string key("my-key");
FieldTable args;
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue"));
ExchangeRegistry exchanges;
//establish bindings from exchange->queue and notify the queue as it is bound:
Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
@@ -273,423 +124,69 @@ QPID_AUTO_TEST_CASE(testBound){
exchange3->route(deliverable);
}
-QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
- client::QueueOptions args;
- args.setPersistLastNode();
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
-
- //change mode
- queue->setLastNodeFailure();
-
- //enqueue 1 message
- queue->deliver(msg3);
-
- //check all have persistent ids.
- BOOST_CHECK(msg1->isPersistent());
- BOOST_CHECK(msg2->isPersistent());
- BOOST_CHECK(msg3->isPersistent());
-
-}
-
-
-QPID_AUTO_TEST_CASE(testSeek){
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
+QPID_AUTO_TEST_CASE(testLVQ){
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
- SequenceNumber seq(2);
- consumer->setPosition(seq);
-
- QueuedMessage qm;
- queue->dispatch(consumer);
-
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
- queue->dispatch(consumer);
- queue->dispatch(consumer); // make sure over-run is safe
-
-}
-
-QPID_AUTO_TEST_CASE(testSearch){
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- SequenceNumber seq(2);
- QueuedMessage qm;
- TestConsumer::shared_ptr c1(new TestConsumer());
-
- BOOST_CHECK(queue->find(seq, qm));
-
- BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
- queue->acquire(qm, c1->getName());
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
- SequenceNumber seq1(3);
- QueuedMessage qm1;
- BOOST_CHECK(queue->find(seq1, qm1));
- BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
-}
-const std::string nullxid = "";
-
-class SimpleDummyCtxt : public TransactionContext {};
+ QueueSettings settings;
+ string key="key";
+ settings.lvqKey = key;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
-class DummyCtxt : public TPCTransactionContext
-{
- const std::string xid;
- public:
- DummyCtxt(const std::string& _xid) : xid(_xid) {}
- static std::string getXid(TransactionContext& ctxt)
- {
- DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
- return c ? c->xid : nullxid;
+ const char* values[] = { "a", "b", "c", "a"};
+ for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+ qpid::types::Variant::Map properties;
+ properties[key] = values[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
-};
+ BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
-class TestMessageStoreOC : public MessageStore
-{
- std::set<std::string> prepared;
- uint64_t nextPersistenceId;
- public:
-
- uint enqCnt;
- uint deqCnt;
- bool error;
-
- TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
- ~TestMessageStoreOC(){}
-
- virtual void dequeue(TransactionContext*,
- const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
- const PersistableQueue& /*queue*/)
- {
- if (error) throw Exception("Dequeue error test");
- deqCnt++;
- }
+ TestConsumer::shared_ptr c(new TestConsumer("test", true));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent());
- virtual void enqueue(TransactionContext*,
- const boost::intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& /* queue */)
- {
- if (error) throw Exception("Enqueue error test");
- enqCnt++;
- msg->enqueueComplete();
- }
- void createError()
- {
- error=true;
+ const char* values2[] = { "a", "b", "c"};
+ for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) {
+ qpid::types::Variant::Map properties;
+ properties[key] = values[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5)));
}
+ BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
- bool init(const Options*) { return true; }
- void truncateInit(const bool) {}
- void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
- void destroy(PersistableQueue&) {}
- void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
- void destroy(const PersistableExchange&) {}
- void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
- void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
- void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
- void destroy(const PersistableConfig&) {}
- void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
- void destroy(PersistableMessage&) {}
- void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
- void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
- std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
- void flush(const qpid::broker::PersistableQueue&) {}
- uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
-
- std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
- std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
- void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
- void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
- void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
- void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
-
- void recover(RecoveryManager&) {}
-};
-
-
-QPID_AUTO_TEST_CASE(testLVQOrdering){
-
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
-
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
- intrusive_ptr<Message> msg4 = createMessage("e", "D");
- intrusive_ptr<Message> received;
-
- //set deliever match for LVQ a,b,c,a
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
-
- //enqueue 4 message
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
- queue->deliver(msg4);
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg3.get(), received.get());
-
- intrusive_ptr<Message> msg5 = createMessage("e", "A");
- intrusive_ptr<Message> msg6 = createMessage("e", "B");
- intrusive_ptr<Message> msg7 = createMessage("e", "C");
- msg5->insertCustomProperty(key,"a");
- msg6->insertCustomProperty(key,"b");
- msg7->insertCustomProperty(key,"c");
- queue->deliver(msg5);
- queue->deliver(msg6);
- queue->deliver(msg7);
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg5.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg6.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg7.get(), received.get());
-
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent());
}
QPID_AUTO_TEST_CASE(testLVQEmptyKey){
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
-
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
- msg1->insertCustomProperty(key,"a");
- queue->deliver(msg1);
- queue->deliver(msg2);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQAcquire){
-
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
- // disable flow control, as this test violates the enqueue/dequeue sequence.
- args.setInt(QueueFlowLimit::flowStopCountKey, 0);
-
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
- intrusive_ptr<Message> msg4 = createMessage("e", "D");
- intrusive_ptr<Message> msg5 = createMessage("e", "F");
- intrusive_ptr<Message> msg6 = createMessage("e", "G");
-
- //set deliever match for LVQ a,b,c,a
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
- msg5->insertCustomProperty(key,"b");
- msg6->insertCustomProperty(key,"c");
-
- //enqueue 4 message
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
- queue->deliver(msg4);
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- framing::SequenceNumber sequence(1);
- QueuedMessage qmsg(queue.get(), msg1, sequence);
- QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
- framing::SequenceNumber sequence1(10);
- QueuedMessage qmsg3(queue.get(), 0, sequence1);
- TestConsumer::shared_ptr dummy(new TestConsumer());
-
- BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
- BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
- // Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
- BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
- queue->deliver(msg5);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- // set mode to no browse and check
- args.setOrdering(client::LVQ_NO_BROWSE);
- queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer("test", false));
-
- queue->dispatch(c1);
- queue->dispatch(c1);
- queue->dispatch(c1);
-
- queue->deliver(msg6);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- intrusive_ptr<Message> received;
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQMultiQueue){
-
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
-
- Queue::shared_ptr queue1(new Queue("my-queue", true ));
- Queue::shared_ptr queue2(new Queue("my-queue", true ));
- intrusive_ptr<Message> received;
- queue1->configure(args);
- queue2->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "A");
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"a");
-
- queue1->deliver(msg1);
- queue2->deliver(msg1);
- queue1->deliver(msg2);
-
- received = queue1->get().payload;
- BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
- received = queue2->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
-
-}
+ QueueSettings settings;
+ string key="key";
+ settings.lvqKey = key;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
-QPID_AUTO_TEST_CASE(testLVQRecover){
-/* simulate this
- 1. start 2 nodes
- 2. create cluster durable lvq
- 3. send a transient message to the queue
- 4. kill one of the nodes (to trigger force persistent behaviour)...
- 5. then restart it (to turn off force persistent behaviour)
- 6. send another transient message with same lvq key as in 3
- 7. kill the second node again (retrigger force persistent)
- 8. stop and recover the first node
-*/
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
- intrusive_ptr<Message> received;
- queue1->create(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "A");
- // 2
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"a");
- // 3
- queue1->deliver(msg1);
- // 4
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- // 5
- queue1->clearLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- // 6
- queue1->deliver(msg2);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
- BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
+ qpid::types::Variant::Map properties;
+ properties["key"] = "a";
+ q->deliver(MessageUtils::createMessage(properties, "one"));
+ properties.clear();
+ q->deliver(MessageUtils::createMessage(properties, "two"));
+ BOOST_CHECK_EQUAL(q->getMessageCount(), 2u);
}
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m->computeExpiration(new broker::ExpiryPolicy);
+ Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
+ m.computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
@@ -706,7 +203,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
QPID_AUTO_TEST_CASE(testQueueCleaner) {
Timer timer;
QueueRegistry queues;
- Queue::shared_ptr queue = queues.declare("my-queue").first;
+ Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
@@ -717,44 +214,57 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
-
-
namespace {
- // helper for group tests
- void verifyAcquire( Queue::shared_ptr queue,
- TestConsumer::shared_ptr c,
- std::deque<QueuedMessage>& results,
- const std::string& expectedGroup,
- const int expectedId )
- {
- BOOST_CHECK(queue->dispatch(c));
- results.push_back(c->last);
- std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+int getIntProperty(const Message& message, const std::string& key)
+{
+ qpid::types::Variant v = message.getProperty(key);
+ int i(0);
+ if (!v.isVoid()) i = v;
+ return i;
+}
+// helper for group tests
+void verifyAcquire( Queue::shared_ptr queue,
+ TestConsumer::shared_ptr c,
+ std::deque<QueueCursor>& results,
+ const std::string& expectedGroup,
+ const int expectedId )
+{
+ bool success = queue->dispatch(c);
+ BOOST_CHECK(success);
+ if (success) {
+ results.push_back(c->lastCursor);
+ std::string group = c->lastMessage.getPropertyAsString("GROUP-ID");
+ int id = getIntProperty(c->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( group, expectedGroup );
BOOST_CHECK_EQUAL( id, expectedId );
}
}
+Message createGroupMessage(int id, const std::string& group)
+{
+ qpid::types::Variant::Map properties;
+ properties["GROUP-ID"] = group;
+ properties["MY-ID"] = id;
+ return MessageUtils::createMessage(properties);
+}
+}
+
QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
//
// Verify that consumers of grouped messages own the groups once a message is acquired,
// and release the groups once all acquired messages have been dequeued or requeued
//
- FieldTable args;
- Queue::shared_ptr queue(new Queue("my_queue", true));
- args.setString("qpid.group_header_key", "GROUP-ID");
- args.setInt("qpid.shared_msg_group", 1);
- queue->configure(args);
+ QueueSettings settings;
+ settings.shareGroups = 1;
+ settings.groupKey = "GROUP-ID";
+ QueueFactory factory;
+ Queue::shared_ptr queue(factory.create("my_queue", settings));
std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
std::string("b"), std::string("b"), std::string("b"),
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
- intrusive_ptr<Message> msg = createMessage("e", "A");
- msg->insertCustomProperty("GROUP-ID", groups[i]);
- msg->insertCustomProperty("MY-ID", i);
- queue->deliver(msg);
+ queue->deliver(createGroupMessage(i, groups[i]));
}
// Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
@@ -768,8 +278,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
queue->consume(c1);
queue->consume(c2);
- std::deque<QueuedMessage> dequeMeC1;
- std::deque<QueuedMessage> dequeMeC2;
+ std::deque<QueueCursor> dequeMeC1;
+ std::deque<QueueCursor> dequeMeC2;
verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
@@ -828,9 +338,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
// Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
// what happens if C-2 "requeues" a-1 and a-2?
- queue->requeue( dequeMeC2.front() );
+ queue->release( dequeMeC2.front() );
dequeMeC2.pop_front();
- queue->requeue( dequeMeC2.front() );
+ queue->release( dequeMeC2.front() );
dequeMeC2.pop_front(); // now just has c-7 acquired
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
@@ -855,9 +365,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- // requeue all of C1's acquired messages, then cancel C1
+ // release all of C1's acquired messages, then cancel C1
while (!dequeMeC1.empty()) {
- queue->requeue(dequeMeC1.front());
+ queue->release(dequeMeC1.front());
dequeMeC1.pop_front();
}
queue->cancel(c1);
@@ -877,7 +387,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
// Owners= ---, ---, ---
TestConsumer::shared_ptr c3(new TestConsumer("C3"));
- std::deque<QueuedMessage> dequeMeC3;
+ std::deque<QueueCursor> dequeMeC3;
verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
@@ -897,11 +407,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
// Queue = a-2,
// Owners= ^C3,
-
- intrusive_ptr<Message> msg = createMessage("e", "A");
- msg->insertCustomProperty("GROUP-ID", "a");
- msg->insertCustomProperty("MY-ID", 9);
- queue->deliver(msg);
+ queue->deliver(createGroupMessage(9, "a"));
// Queue = a-2, a-9
// Owners= ^C3, ^C3
@@ -909,10 +415,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- msg = createMessage("e", "A");
- msg->insertCustomProperty("GROUP-ID", "b");
- msg->insertCustomProperty("MY-ID", 10);
- queue->deliver(msg);
+ queue->deliver(createGroupMessage(10, "b"));
// Queue = a-2, a-9, b-10
// Owners= ^C3, ^C3, ----
@@ -933,17 +436,17 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
// Verify that the same default group name is automatically applied to messages that
// do not specify a group name.
//
- FieldTable args;
- Queue::shared_ptr queue(new Queue("my_queue", true));
- args.setString("qpid.group_header_key", "GROUP-ID");
- args.setInt("qpid.shared_msg_group", 1);
- queue->configure(args);
+ QueueSettings settings;
+ settings.shareGroups = 1;
+ settings.groupKey = "GROUP-ID";
+ QueueFactory factory;
+ Queue::shared_ptr queue(factory.create("my_queue", settings));
for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg = createMessage("e", "A");
+ qpid::types::Variant::Map properties;
// no "GROUP-ID" header
- msg->insertCustomProperty("MY-ID", i);
- queue->deliver(msg);
+ properties["MY-ID"] = i;
+ queue->deliver(MessageUtils::createMessage(properties));
}
// Queue = 0, 1, 2
@@ -956,20 +459,20 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
queue->consume(c1);
queue->consume(c2);
- std::deque<QueuedMessage> dequeMeC1;
- std::deque<QueuedMessage> dequeMeC2;
+ std::deque<QueueCursor> dequeMeC1;
+ std::deque<QueueCursor> dequeMeC2;
queue->dispatch(c1); // c1 now owns default group (acquired 0)
- dequeMeC1.push_back(c1->last);
- int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ dequeMeC1.push_back(c1->lastCursor);
+ int id = getIntProperty(c1->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( id, 0 );
bool gotOne = queue->dispatch(c2); // c2 should get nothing
BOOST_CHECK( !gotOne );
queue->dispatch(c1); // c1 now acquires 1
- dequeMeC1.push_back(c1->last);
- id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ dequeMeC1.push_back(c1->lastCursor);
+ id = getIntProperty(c1->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( id, 1 );
gotOne = queue->dispatch(c2); // c2 should still get nothing
@@ -982,7 +485,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
// now default group should be available...
queue->dispatch(c2); // c2 now owns default group (acquired 2)
- id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ id = c2->lastMessage.getProperty("MY-ID");
BOOST_CHECK_EQUAL( id, 2 );
gotOne = queue->dispatch(c1); // c1 should get nothing
@@ -992,556 +495,128 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
queue->cancel(c2);
}
-QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
-
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
- queue1->create(args);
- Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
- queue2->create(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
-
- queue1->deliver(msg1);
- queue2->deliver(msg1);
-
- //change mode
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
- // check they don't get stored twice
- queue1->setLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- queue1->deliver(msg2);
- queue2->deliver(msg2);
-
- queue1->clearLastNodeFailure();
- queue2->clearLastNodeFailure();
- // check only new messages get forced
- queue1->setLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
-
- // check no failure messages are stored
- queue1->clearLastNodeFailure();
- queue2->clearLastNodeFailure();
-
- intrusive_ptr<Message> msg3 = createMessage("e", "B");
- queue1->deliver(msg3);
- queue2->deliver(msg3);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
- queue1->setLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
-
- /**
- * TODO: Fix or replace the following test which incorrectly requeues a
- * message that was never on the queue in the first place. This relied on
- * internal details not part of the queue abstraction.
-
- // check requeue 1
- intrusive_ptr<Message> msg4 = createMessage("e", "C");
- intrusive_ptr<Message> msg5 = createMessage("e", "D");
-
- framing::SequenceNumber sequence(1);
- QueuedMessage qmsg1(queue1.get(), msg4, sequence);
- QueuedMessage qmsg2(queue2.get(), msg5, ++sequence);
-
- queue1->requeue(qmsg1);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-
- // check requeue 2
- queue2->clearLastNodeFailure();
- queue2->requeue(qmsg2);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-
- queue2->clearLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
- */
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){
-/*
-simulate this:
- 1. start two nodes
- 2. create cluster durable queue and add some messages
- 3. kill one node (trigger force-persistent behaviour)
- 4. stop and recover remaining node
- 5. add another node
- 6. kill that new node again
-make sure that an attempt to re-enqueue a message does not happen which will
-result in the last man standing exiting with an error.
-
-we need to make sure that recover is safe, i.e. messages are
-not requeued to the store.
-*/
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- // set queue mode
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
- intrusive_ptr<Message> received;
- queue1->create(args);
-
- // check requeue 1
- intrusive_ptr<Message> msg1 = createMessage("e", "C");
- intrusive_ptr<Message> msg2 = createMessage("e", "D");
-
- queue1->recover(msg1);
-
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
- queue1->clearLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
- queue1->deliver(msg2);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeJournalError){
-/*
-simulate store exception going into last node standing
-
-*/
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- // set queue mode
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
- intrusive_ptr<Message> received;
- queue1->configure(args);
-
- // check requeue 1
- intrusive_ptr<Message> msg1 = createMessage("e", "C");
-
- queue1->deliver(msg1);
- testStore.createError();
-
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-}
-
-intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false)
-{
- intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable);
- if (content.size()) MessageUtils::addContent(msg, content);
- msg->setStore(&store);
- return msg;
-}
-
-QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
-
- TestMessageStoreOC testStore;
- client::QueueOptions args0; // No size policy
- client::QueueOptions args1;
- args1.setSizePolicy(FLOW_TO_DISK, 0, 1);
- client::QueueOptions args2;
- args2.setSizePolicy(FLOW_TO_DISK, 0, 2);
-
- // --- Fanout exchange bound to single transient queue -------------------------------------------------------------
-
- FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue
- Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit
- tq1->configure(args1);
- sbtFanout1.bind(tq1, "", 0);
-
- intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg01(msg01);
- sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit
- msg01->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg02(msg02);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException);
- }
- msg02->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg03(msg03);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException);
- }
- msg03->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg04(msg04);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException);
- }
- msg04->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg05(msg05);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException);
- }
- msg05->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- // --- Fanout exchange bound to single durable queue ---------------------------------------------------------------
-
- FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue
- Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit
- dq2->configure(args1);
- sbdFanout2.bind(dq2, "", 0);
-
- intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg06(msg06);
- sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit
- msg06->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg07(msg07);
- sbdFanout2.route(dmsg07);
- msg07->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
- BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg08(msg08);
- sbdFanout2.route(dmsg08);
- msg08->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
- BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg09(msg09);
- sbdFanout2.route(dmsg09);
- msg09->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
- BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg10(msg10);
- sbdFanout2.route(dmsg10);
- msg10->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
- BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
-
- // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------
-
- FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues
- Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2
- dq3->configure(args2);
- mbdFanout3.bind(dq3, "", 0);
- Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1
- dq4->configure(args1);
- mbdFanout3.bind(dq4, "", 0);
- Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit
- dq5->configure(args0);
- mbdFanout3.bind(dq5, "", 0);
-
- intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg11(msg11);
- mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit
- msg11->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(1u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(1u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg12(msg12);
- mbdFanout3.route(dmsg12);
- msg12->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
- BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(2u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(2u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg13(msg13);
- mbdFanout3.route(dmsg13);
- msg13->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
- BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(3u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(3u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg14(msg14);
- mbdFanout3.route(dmsg14);
- msg14->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
- BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(4u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(4u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg15(msg15);
- mbdFanout3.route(dmsg15);
- msg15->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
- BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(5u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(5u, dq5->getMessageCount());
-
- // Bind a transient queue, this should block the release of any further messages.
- // Note: this will result in a violation of the count policy of dq3 and dq4 - but this
- // is expected until a better overall multi-queue design is implemented. Similarly
- // for the other tests in this section.
-
- Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit
- tq6->configure(args0);
- mbdFanout3.bind(tq6, "", 0);
-
- intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg16(msg16);
- mbdFanout3.route(dmsg16);
- msg16->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
- BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(6u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(6u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg17(msg17);
- mbdFanout3.route(dmsg17);
- msg17->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
- BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(7u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(7u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg18(msg18);
- mbdFanout3.route(dmsg18);
- msg18->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
- BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(8u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(8u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg19(msg19);
- mbdFanout3.route(dmsg19);
- msg19->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
- BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(9u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(9u, dq5->getMessageCount());
-
-
- // --- Fanout exchange bound to multiple durable and transient queues ----------------------------------------------
-
- FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues
- Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit
- dq7->configure(args0);
- mbmFanout4.bind(dq7, "", 0);
- Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit
- dq8->configure(args1);
- mbmFanout4.bind(dq8, "", 0);
- Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit
- tq9->configure(args0);
- mbmFanout4.bind(tq9, "", 0);
-
- intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg20(msg20);
- mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit
- msg20->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
- BOOST_CHECK_EQUAL(1u, dq8->getMessageCount());
- BOOST_CHECK_EQUAL(1u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg21(msg21);
- mbmFanout4.route(dmsg21);
- msg21->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
- BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(2u, dq8->getMessageCount());
- BOOST_CHECK_EQUAL(2u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg22(msg22);
- mbmFanout4.route(dmsg22);
- msg22->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
- BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(3u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg23(msg23);
- mbmFanout4.route(dmsg23);
- msg23->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
- BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(4u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg24(msg24);
- mbmFanout4.route(dmsg24);
- msg24->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
- BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
-}
-
QPID_AUTO_TEST_CASE(testSetPositionFifo) {
Queue::shared_ptr q(new Queue("my-queue", true));
BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
for (int i = 0; i < 10; ++i)
- q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+ q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1)));
// Verify the front of the queue
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1
- BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1
+ BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
+
// Verify the back of the queue
- QueuedMessage qm;
BOOST_CHECK_EQUAL(10u, q->getPosition());
- BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
- BOOST_CHECK_EQUAL("10", getContent(qm.payload));
BOOST_CHECK_EQUAL(10u, q->getMessageCount());
// Using setPosition to introduce a gap in sequence numbers.
q->setPosition(15);
BOOST_CHECK_EQUAL(10u, q->getMessageCount());
BOOST_CHECK_EQUAL(15u, q->getPosition());
- BOOST_CHECK(q->find(10, qm)); // Back of the queue
- BOOST_CHECK_EQUAL("10", getContent(qm.payload));
- q->deliver(contentMessage("16"));
- c->setPosition(9);
+ q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16"));
+
+ q->seek(*c, Queue::MessagePredicate(), 9);
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(10u, c->last.position);
- BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("10", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(16u, c->last.position);
- BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("16", c->lastMessage.getContent());
// Using setPosition to trunkcate the queue
q->setPosition(5);
BOOST_CHECK_EQUAL(5u, q->getMessageCount());
- q->deliver(contentMessage("6a"));
- c->setPosition(4);
+ q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a"));
+ c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+ q->seek(*c, Queue::MessagePredicate(), 4);
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(5u, c->last.position);
- BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("5", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(6u, c->last.position);
- BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent());
BOOST_CHECK(!q->dispatch(c)); // No more messages.
}
QPID_AUTO_TEST_CASE(testSetPositionLvq) {
- Queue::shared_ptr q(new Queue("my-queue", true));
+ QueueSettings settings;
string key="key";
- framing::FieldTable args;
- args.setString("qpid.last_value_queue_key", "key");
- q->configure(args);
+ settings.lvqKey = key;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
const char* values[] = { "a", "b", "c", "a", "b", "c" };
for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
- intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
- m->insertCustomProperty(key, values[i]);
- q->deliver(m);
+ qpid::types::Variant::Map properties;
+ properties[key] = values[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
BOOST_CHECK_EQUAL(3u, q->getMessageCount());
// Verify the front of the queue
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1
- BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1
+ BOOST_CHECK_EQUAL("4", c->lastMessage.getContent());
// Verify the back of the queue
- QueuedMessage qm;
BOOST_CHECK_EQUAL(6u, q->getPosition());
- BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
- BOOST_CHECK_EQUAL("6", getContent(qm.payload));
q->setPosition(5);
- c->setPosition(4);
+
+ c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+ q->seek(*c, Queue::MessagePredicate(), 4);
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1
BOOST_CHECK(!q->dispatch(c));
}
QPID_AUTO_TEST_CASE(testSetPositionPriority) {
- Queue::shared_ptr q(new Queue("my-queue", true));
- framing::FieldTable args;
- args.setInt("qpid.priorities", 10);
- q->configure(args);
+ QueueSettings settings;
+ settings.priorities = 10;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
const int priorities[] = { 1, 2, 3, 2, 1, 3 };
for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
- intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
- m->getFrames().getHeaders()->get<DeliveryProperties>(true)
- ->setPriority(priorities[i]);
- q->deliver(m);
+ qpid::types::Variant::Map properties;
+ properties["priority"] = priorities[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
// Truncation removes messages in fifo order, not priority order.
q->setPosition(3);
- TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->last.position);
+ BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(2u, c->last.position);
+ BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(3u, c->last.position);
+ BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
BOOST_CHECK(!q->dispatch(c));
- intrusive_ptr<Message> m = contentMessage("4a");
- m->getFrames().getHeaders()->get<DeliveryProperties>(true)
- ->setPriority(4);
- q->deliver(m);
+ qpid::types::Variant::Map properties;
+ properties["priority"] = 4;
+ q->deliver(MessageUtils::createMessage(properties, "4a"));
+
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->last.position);
- BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
// But consumers see priority order
c.reset(new TestConsumer("test", true));
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->last.position);
- BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(3u, c->last.position);
- BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("3", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(2u, c->last.position);
- BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("2", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->last.position);
- BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
}
QPID_AUTO_TEST_SUITE_END()
Modified: qpid/trunk/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TxMocks.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TxMocks.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/TxMocks.h Fri Aug 10 12:04:27 2012
@@ -119,8 +119,6 @@ public:
assertEqualVector(expected, actual);
}
- void accept(TxOpConstVisitor&) const {}
-
~MockTxOp(){}
};
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug 10 12:04:27 2012
@@ -583,15 +583,7 @@ class ReplicationTests(BrokerTest):
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
-
- # FIXME aconway 2012-02-22: there is a bug in priority ring
- # queues that allows a low priority message to displace a high
- # one. The following commented-out assert_browse is for the
- # correct result, the uncommented one is for the actualy buggy
- # result. See https://issues.apache.org/jira/browse/QPID-3866
- #
- # expect = sorted(priorities,reverse=True)[0:5]
- expect = [9,9,9,9,2]
+ expect = sorted(priorities,reverse=True)[0:5]
primary.assert_browse("q", expect, transform=lambda m: m.priority)
backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Fri Aug 10 12:04:27 2012
@@ -34,6 +34,7 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
#include "qpid/Plugin.h"
@@ -95,7 +96,7 @@ class TestStore : public NullMessageStor
const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& )
{
- Message* msg = dynamic_cast<Message*>(pmsg.get());
+ qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
// Dump the message if there is a dump file.
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py Fri Aug 10 12:04:27 2012
@@ -308,8 +308,6 @@ class AlternateExchangeTests(TestBase010
#create a queue using the intermediary as its alternate exchange:
session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
- #bind that queue to the dlq as well:
- session.exchange_bind(exchange="dlq", queue="delivery-queue")
#send it some messages:
dp=self.session.delivery_properties(routing_key="delivery-queue")
for m in ["One", "Two", "Three"]:
@@ -349,5 +347,5 @@ class AlternateExchangeTests(TestBase010
def assertEmpty(self, queue):
try:
msg = queue.get(timeout=1)
- self.fail("Queue not empty: " + msg)
+ self.fail("Queue not empty: " + str(msg))
except Empty: None
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Fri Aug 10 12:04:27 2012
@@ -498,7 +498,7 @@ class ManagementTest (TestBase010):
session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
def test_immediate_method(self):
- url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+ url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host or "localhost", self.broker.port or 5672)
conn = qpid.messaging.Connection(url)
conn.open()
sess = conn.session()
@@ -659,7 +659,7 @@ class ManagementTest (TestBase010):
self.assertEqual(rc.receive, True)
# setup a connection & session to the broker
- url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+ url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host or "localhost", self.broker.port or 5672)
conn = qpid.messaging.Connection(url)
conn.open()
sess = conn.session()
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Fri Aug 10 12:04:27 2012
@@ -787,7 +787,7 @@ class MultiConsumerMsgGroupTests(Base):
except Empty:
pass
assert count == 3 # non-A's
- assert a_count == 2 # pending acquired message included in browse results
+ assert a_count == 1 # assumes the acquired message was not the one purged and regular browsers don't get acquired messages
s1.acknowledge() # ack the consumed A-0
self.qmf_session.delBroker(self.qmf_broker)
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py Fri Aug 10 12:04:27 2012
@@ -106,10 +106,10 @@ class GeneralTests(Base):
self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange")
- # Close sess1; This will cause the queue to be deleted
+ # Close sess1; This will cause the queue to be deleted and all its messages (including those acquired) to be reouted to the alternate exchange
sess1.close()
sleep(1)
- self.assertEqual(rx_alt.available(), 2, "2 of the messages should have been routed to the alt_exchange")
+ self.assertEqual(rx_alt.available(), 5, "All the messages should have been routed to the alt_exchange")
# Close sess2; This will cause the acquired messages to be requeued and routed to the alternate
sess2.close()
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py Fri Aug 10 12:04:27 2012
@@ -112,11 +112,11 @@ class PriorityTests (Base):
#check all messages on the queue were received by the browser; don't relay on any specific ordering at present
assert set([m.content for m in msgs]) == set([m.content for m in received])
- def ring_queue_check(self, msgs):
+ def ring_queue_check(self, msgs, count=10):
"""
Ensure that a ring queue removes lowest priority messages first.
"""
- snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"),
+ snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':%s" % count),
durable=self.durable())
for m in msgs: snd.send(m)
@@ -126,23 +126,31 @@ class PriorityTests (Base):
while True: received.append(rcv.fetch(0))
except Empty: None
- expected = []
- for m in msgs:
- while len(expected) > 9:
- expected=sorted_(expected, key=lambda x: priority_level(x.priority,10))
- expected.pop(0)
- expected.append(m)
- #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received])
+ expected = sorted_(msgs, key=lambda x: priority_level(x.priority,10))[len(msgs)-count:]
+ expected = sorted_(expected, key=lambda x: priority_level(x.priority,10), reverse=True)
+ #print "sent %s; expected %s; got %s" % ([m.priority for m in msgs], [m.priority for m in expected], [m.priority for m in received])
+ #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received])
assert [m.content for m in expected] == [m.content for m in received]
def test_ring_queue_1(self):
priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3]
- seq = content("msg")
+ seq = content("msg")
self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
def test_ring_queue_2(self):
- priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9]
- seq = content("msg")
+ priorities = [9,0,2,3,6,3,4,2,9,2,9,9,1,9,4,7,1,1,3,9,7,3,9,3,9,1,5,1,9,7,2,3,0,9]
+ seq = content("msg")
+ self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+ def test_ring_queue_3(self):
+ #test case given for QPID-3866
+ priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
+ seq = content("msg")
+ self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities], 5)
+
+ def test_ring_queue_4(self):
+ priorities = [9,0,2,3,6,3,4,2,9,2,9,3,1,9,4,7,1,1,3,2,7,3,9,3,6,1,5,1,9,7,2,3,0,2]
+ seq = content("msg")
self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
def test_requeue(self):
@@ -169,6 +177,7 @@ class PriorityTests (Base):
for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,10), reverse=True):
msg = rcv.fetch(0)
#print "expected priority %s got %s" % (expected.priority, msg.priority)
+ #print "expected content %s got %s" % (expected.content, msg.content)
assert msg.content == expected.content
self.ssn.acknowledge(msg)
@@ -231,7 +240,7 @@ def sorted_(msgs, key=None, reverse=Fals
Workaround lack of sorted builtin function in python 2.3 and lack
of keyword arguments to list.sort()
"""
- temp = msgs
+ temp = [m for m in msgs]
temp.sort(key_to_cmp(key, reverse=reverse))
return temp
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py Fri Aug 10 12:04:27 2012
@@ -41,7 +41,7 @@ class ThresholdTests (Base):
snd.send(m)
count = count + 1
size = size + len(m.content)
- event = rcv.fetch()
+ event = rcv.fetch(timeout=1)
schema = event.content[0]["_schema_id"]
assert schema["_class_name"] == "queueThresholdExceeded"
values = event.content[0]["_values"]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org