You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/07 14:20:03 UTC
svn commit: r634661 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/
python/ python/tests_0-10/
Author: gsim
Date: Fri Mar 7 05:20:02 2008
New Revision: 634661
URL: http://svn.apache.org/viewvc?rev=634661&view=rev
Log:
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Fri Mar 7 05:20:02 2008
@@ -35,7 +35,7 @@
{
intrusive_ptr<Message> payload;
framing::SequenceNumber position;
- Queue* queue;
+ Queue* queue;
QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Mar 7 05:20:02 2008
@@ -32,16 +32,20 @@
const std::string _tag,
DeliveryToken::shared_ptr _token,
const DeliveryId _id,
- bool _acquired, bool _confirmed) : msg(_msg),
- queue(_queue),
- tag(_tag),
- token(_token),
- id(_id),
- acquired(_acquired),
- confirmed(_confirmed),
- pull(false),
- cancelled(false)
+ bool _acquired, bool accepted) : msg(_msg),
+ queue(_queue),
+ tag(_tag),
+ token(_token),
+ id(_id),
+ acquired(_acquired),
+ pull(false),
+ cancelled(false),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
+ size(msg.payload ? msg.payload->contentSize() : 0),
+ completed(false),
+ ended(accepted)
{
+ if (accepted) setEnded();
}
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
@@ -50,14 +54,23 @@
queue(_queue),
id(_id),
acquired(true),
- confirmed(false),
pull(true),
- cancelled(false)
+ cancelled(false),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
+ size(msg.payload ? msg.payload->contentSize() : 0),
+ completed(false),
+ ended(false)
{}
+void DeliveryRecord::setEnded()
+{
+ ended = true;
+ //reset msg pointer, don't need to hold on to it anymore
+ msg.payload = boost::intrusive_ptr<Message>();
+}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- if (acquired && !confirmed) {
+ if (acquired && !ended) {
queue->dequeue(ctxt, msg.payload);
}
}
@@ -79,7 +92,7 @@
}
void DeliveryRecord::redeliver(SemanticState* const session) {
- if (!confirmed) {
+ if (!ended) {
if(pull || cancelled){
//if message was originally sent as response to get, we must requeue it
@@ -96,7 +109,7 @@
void DeliveryRecord::requeue() const
{
- if (acquired && !confirmed) {
+ if (acquired && !ended) {
msg.payload->redeliver();
queue->requeue(msg);
}
@@ -104,9 +117,22 @@
void DeliveryRecord::release()
{
- if (acquired && !confirmed) {
+ if (acquired && !ended) {
queue->requeue(msg);
acquired = false;
+ setEnded();
+ }
+}
+
+void DeliveryRecord::complete()
+{
+ completed = true;
+}
+
+void DeliveryRecord::accept(TransactionContext* ctxt) {
+ if (acquired && !ended) {
+ queue->dequeue(ctxt, msg.payload);
+ setEnded();
}
}
@@ -124,9 +150,9 @@
}
}
-void DeliveryRecord::updateByteCredit(uint32_t& credit) const
+uint32_t DeliveryRecord::getCredit() const
{
- credit += msg.payload->getRequiredCredit();
+ return credit;
}
@@ -134,7 +160,7 @@
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch.size += msg.payload->contentSize();
+ prefetch.size += size;
prefetch.count++;
}
}
@@ -143,7 +169,7 @@
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch.size -= msg.payload->contentSize();
+ prefetch.size -= size;
prefetch.count--;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Mar 7 05:20:02 2008
@@ -47,32 +47,45 @@
DeliveryToken::shared_ptr token;
DeliveryId id;
bool acquired;
- const bool confirmed;
const bool pull;
bool cancelled;
+ const uint32_t credit;
+ const uint64_t size;
+
+ bool completed;
+ bool ended;
+
+ void setEnded();
public:
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
const DeliveryId id, bool acquired, bool confirmed = false);
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
- void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
bool coveredBy(const framing::AccumulatedAck* const range) const;
+
+ void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
void release();
void reject();
void cancel(const std::string& tag);
void redeliver(SemanticState* const);
- void updateByteCredit(uint32_t& credit) const;
+ void acquire(DeliveryIds& results);
+ void complete();
+ void accept(TransactionContext* ctxt);
+
+ bool isAcquired() const { return acquired; }
+ bool isComplete() const { return completed; }
+ bool isRedundant() const { return ended && completed; }
+
+ uint32_t getCredit() const;
void addTo(Prefetch&) const;
void subtractFrom(Prefetch&) const;
const std::string& getTag() const { return tag; }
bool isPull() const { return pull; }
- bool isAcquired() const { return acquired; }
- void acquire(DeliveryIds& results);
friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp Fri Mar 7 05:20:02 2008
@@ -64,7 +64,7 @@
bool MultiVersionConnectionInputHandler::doOutput()
{
- return check(false) && handler->doOutput();
+ return handler.get() && handler->doOutput();
}
qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
@@ -74,17 +74,14 @@
void MultiVersionConnectionInputHandler::closed()
{
- check();
- handler->closed();
+ if (handler.get()) handler->closed();
+ //else closed before initiated, nothing to do
}
-bool MultiVersionConnectionInputHandler::check(bool fail)
+void MultiVersionConnectionInputHandler::check()
{
if (!handler.get()) {
- if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!");
- else return false;
- } else {
- return true;
+ throw qpid::framing::InternalErrorException("Handler not initialised!");
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h Fri Mar 7 05:20:02 2008
@@ -38,7 +38,7 @@
Broker& broker;
const std::string id;
- bool check(bool fail = true);
+ void check();
public:
MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Mar 7 05:20:02 2008
@@ -393,7 +393,7 @@
++end;
}
- for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -433,20 +433,23 @@
}
}
-void SemanticState::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::complete(DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->adjustFlow(delivery);
+ get_pointer(i)->complete(delivery);
}
}
-void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+ if (!delivery.isComplete()) {
+ delivery.complete();
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+ }
}
}
@@ -662,15 +665,16 @@
dtxBuffer->enlist(txAck);
}
} else {
- for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(range.start, range.end);
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
}
}
void SemanticState::completed(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
requestDispatch();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Mar 7 05:20:02 2008
@@ -88,7 +88,7 @@
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void adjustFlow(const DeliveryRecord&);
+ void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
@@ -122,7 +122,7 @@
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
- void adjustFlow(const DeliveryRecord&);
+ void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Fri Mar 7 05:20:02 2008
@@ -52,8 +52,6 @@
tests_0-10.message.MessageTests.test_consume_no_local_awkward
tests_0-10.message.MessageTests.test_consume_queue_errors
tests_0-10.message.MessageTests.test_consume_unique_consumers
-tests_0-10.message.MessageTests.test_credit_flow_bytes
-tests_0-10.message.MessageTests.test_credit_flow_messages
tests_0-10.message.MessageTests.test_no_size
tests_0-10.message.MessageTests.test_qos_prefetch_count
tests_0-10.message.MessageTests.test_qos_prefetch_size
@@ -63,8 +61,6 @@
tests_0-10.message.MessageTests.test_subscribe_not_acquired
tests_0-10.message.MessageTests.test_subscribe_not_acquired_2
tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
-tests_0-10.message.MessageTests.test_window_flow_bytes
-tests_0-10.message.MessageTests.test_window_flow_messages
tests_0-10.testlib.TestBaseTest.testAssertEmptyFail
tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
tests_0-10.testlib.TestBaseTest.testMessageProperties
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Mar 7 05:20:02 2008
@@ -464,10 +464,10 @@
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow_mode(mode = 0, destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = 0, value = 5, destination = "c")
@@ -494,13 +494,13 @@
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow_mode(mode = 0, destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
#send batch of messages to queue
for i in range(10):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 35
+ msg_size = 21
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -527,11 +527,11 @@
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- session.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = 0, value = 5, destination = "c")
@@ -539,13 +539,16 @@
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
- for i in range(1, 6):
+ for i in range(1, 6):
msg = q.get(timeout = 1)
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
self.assertDataEquals(session, msg, "Message %d" % i)
self.assertEmpty(q)
#acknowledge messages and check more are received
- msg.complete(cumulative=True)
+ #TODO: there may be a nicer way of doing this
+ session.channel.session_completed(session.receiver._completed)
+
for i in range(6, 11):
self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
@@ -559,14 +562,14 @@
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- session.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
#send batch of messages to queue
for i in range(10):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
- msg_size = 40
+ msg_size = 19
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -584,7 +587,9 @@
#ack each message individually and check more are received
for i in range(5):
msg = msgs.pop()
- msg.complete(cumulative=False)
+ #TODO: there may be a nicer way of doing this
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -595,13 +600,17 @@
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 6):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
- self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
- self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
for i in range(6, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
#both subscribers should see all messages
qA = session.incoming("a")
@@ -610,8 +619,9 @@
for q in [qA, qB]:
msg = q.get(timeout = 1)
self.assertEquals("Message %s" % i, msg.body)
- msg.complete()
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
#messages should still be on the queue:
self.assertEquals(10, session.queue_query(queue = "q").message_count)
@@ -625,7 +635,7 @@
#use fanout for now:
session.exchange_bind(exchange="amq.fanout", queue="q")
session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
- #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+ #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -724,11 +734,11 @@
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#consume some of them
session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- session.message_flow_mode(mode = 0, destination = "a")
+ session.message_set_flow_mode(flow_mode = 0, destination = "a")
session.message_flow(unit = 0, value = 5, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
@@ -762,7 +772,7 @@
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
@@ -861,7 +871,7 @@
def assertEmpty(self, queue):
try:
extra = queue.get(timeout=1)
- self.fail("Queue not empty, contains: " + extra.content.body)
+ self.fail("Queue not empty, contains: " + extra.body)
except Empty: None
class SizelessContent(Content):
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Fri Mar 7 05:20:02 2008
@@ -18,10 +18,10 @@
#
from qpid.client import Client, Closed
from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
-class QueueTests(TestBase):
+class QueueTests(TestBase010):
"""Tests for 'methods' on the amqp queue 'class'"""
def test_purge(self):
@@ -31,9 +31,9 @@
session = self.session
#setup, declare a queue and add some messages to it:
session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"}))
- session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"}))
- session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"}))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
#check that the queue now reports 3 messages:
session.queue_declare(queue="test-queue")
@@ -46,15 +46,16 @@
self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
- session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"}))
- self.subscribe(queue="test-queue", destination="tag")
- queue = self.client.queue("tag")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
+ session.message_subscribe(queue="test-queue", destination="tag")
+ session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF)
+ queue = session.incoming("tag")
msg = queue.get(timeout=1)
- self.assertEqual("four", msg.content.body)
+ self.assertEqual("four", msg.body)
#check error conditions (use new sessions):
- session = self.client.session(2)
- session.session_open()
+ session = self.conn.session("error-checker")
try:
#queue specified but doesn't exist:
session.queue_purge(queue="invalid-queue")
@@ -62,8 +63,7 @@
except Closed, e:
self.assertChannelException(404, e.args[0])
- session = self.client.session(3)
- session.session_open()
+ session = self.conn.session("error-checker")
try:
#queue not specified and none previously declared for channel:
session.queue_purge()
@@ -71,12 +71,6 @@
except Closed, e:
self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- session = other.session(1)
- session.session_open()
- session.exchange_delete(exchange="test-exchange")
-
def test_declare_exclusive(self):
"""
Test that the exclusive field is honoured in queue.declare
@@ -167,32 +161,32 @@
self.subscribe(queue="queue-1", destination="queue-1")
self.subscribe(queue="queue-2", destination="queue-2")
- queue1 = self.client.queue("queue-1")
- queue2 = self.client.queue("queue-2")
+ queue1 = session.incoming("queue-1")
+ queue2 = session.incoming("queue-2")
session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
#send a message that will match both bindings
session.message_transfer(destination=exchange,
- content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
+ message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one"))
#unbind first queue
session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
#send another message
session.message_transfer(destination=exchange,
- content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
+ message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", ))
#check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).content.body)
+ self.assertEquals("one", queue1.get(timeout=1).body)
try:
msg = queue1.get(timeout=1)
- self.fail("Got extra message: %s" % msg.content.body)
+ self.fail("Got extra message: %s" % msg.body)
except Empty: pass
- self.assertEquals("one", queue2.get(timeout=1).content.body)
- self.assertEquals("two", queue2.get(timeout=1).content.body)
+ self.assertEquals("one", queue2.get(timeout=1).body)
+ self.assertEquals("two", queue2.get(timeout=1).body)
try:
msg = queue2.get(timeout=1)
self.fail("Got extra message: " + msg)
@@ -207,9 +201,9 @@
#straight-forward case:
session.queue_declare(queue="delete-me")
- session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
- session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
- session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+ session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me")))
+ session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me")))
+ session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me")))
session.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
@@ -238,7 +232,7 @@
#create a queue and add a message to it (use default binding):
session.queue_declare(queue="delete-me-2")
session.queue_declare(queue="delete-me-2", passive=True)
- session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+ session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2")))
#try to delete, but only if empty:
try:
@@ -253,9 +247,9 @@
#empty queue:
self.subscribe(session, destination="consumer_tag", queue="delete-me-2")
- queue = self.client.queue("consumer_tag")
+ queue = session.incoming("consumer_tag")
msg = queue.get(timeout=1)
- self.assertEqual("message", msg.content.body)
+ self.assertEqual("message", msg.body)
session.message_cancel(destination="consumer_tag")
#retry deletion on empty queue: