You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/07 14:20:03 UTC

svn commit: r634661 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ python/ python/tests_0-10/

Author: gsim
Date: Fri Mar  7 05:20:02 2008
New Revision: 634661

URL: http://svn.apache.org/viewvc?rev=634661&view=rev
Log:
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Fri Mar  7 05:20:02 2008
@@ -35,7 +35,7 @@
         {
             intrusive_ptr<Message> payload;
             framing::SequenceNumber position;
-			Queue* queue;
+            Queue* queue;
 			
             QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
 			               payload(msg), position(sn), queue(q) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Mar  7 05:20:02 2008
@@ -32,16 +32,20 @@
                                const std::string _tag,
                                DeliveryToken::shared_ptr _token, 
                                const DeliveryId _id,
-                               bool _acquired, bool _confirmed) : msg(_msg), 
-                                                                  queue(_queue), 
-                                                                  tag(_tag),
-                                                                  token(_token),
-                                                                  id(_id),
-                                                                  acquired(_acquired),
-                                                                  confirmed(_confirmed),
-                                                                  pull(false), 
-                                                                  cancelled(false)
+                               bool _acquired, bool accepted) : msg(_msg), 
+                                                                queue(_queue), 
+                                                                tag(_tag),
+                                                                token(_token),
+                                                                id(_id),
+                                                                acquired(_acquired),
+                                                                pull(false), 
+                                                                cancelled(false),
+                                                                credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
+                                                                size(msg.payload ? msg.payload->contentSize() : 0),
+                                                                completed(false),
+                                                                ended(accepted)
 {
+    if (accepted) setEnded();
 }
 
 DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
@@ -50,14 +54,23 @@
                                                        queue(_queue), 
                                                        id(_id),
                                                        acquired(true),
-                                                       confirmed(false),
                                                        pull(true),
-                                                       cancelled(false)
+                                                       cancelled(false),
+                                                       credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
+                                                       size(msg.payload ? msg.payload->contentSize() : 0),
+                                                       completed(false),
+                                                       ended(false)
 {}
 
+void DeliveryRecord::setEnded()
+{
+    ended = true;
+    //reset msg pointer, don't need to hold on to it anymore
+    msg.payload = boost::intrusive_ptr<Message>();
+}
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
-    if (acquired && !confirmed) {
+    if (acquired && !ended) {
         queue->dequeue(ctxt, msg.payload);
     }
 }
@@ -79,7 +92,7 @@
 }
 
 void DeliveryRecord::redeliver(SemanticState* const session) {
-    if (!confirmed) {
+    if (!ended) {
         if(pull || cancelled){
             //if message was originally sent as response to get, we must requeue it
 
@@ -96,7 +109,7 @@
 
 void DeliveryRecord::requeue() const
 {
-    if (acquired && !confirmed) {
+    if (acquired && !ended) {
         msg.payload->redeliver();
         queue->requeue(msg);
     }
@@ -104,9 +117,22 @@
 
 void DeliveryRecord::release() 
 {
-    if (acquired && !confirmed) {
+    if (acquired && !ended) {
         queue->requeue(msg);
         acquired = false;
+        setEnded();
+    }
+}
+
+void DeliveryRecord::complete() 
+{
+    completed = true; 
+}
+
+void DeliveryRecord::accept(TransactionContext* ctxt) {
+    if (acquired && !ended) {
+        queue->dequeue(ctxt, msg.payload);
+        setEnded();
     }
 }
 
@@ -124,9 +150,9 @@
     }
 }
 
-void DeliveryRecord::updateByteCredit(uint32_t& credit) const
+uint32_t DeliveryRecord::getCredit() const
 {
-    credit += msg.payload->getRequiredCredit();
+    return credit;
 }
 
 
@@ -134,7 +160,7 @@
     if(!pull){
         //ignore 'pulled' messages (i.e. those that were sent in
         //response to get) when calculating prefetch
-        prefetch.size += msg.payload->contentSize();
+        prefetch.size += size;
         prefetch.count++;
     }    
 }
@@ -143,7 +169,7 @@
     if(!pull){
         //ignore 'pulled' messages (i.e. those that were sent in
         //response to get) when calculating prefetch
-        prefetch.size -= msg.payload->contentSize();
+        prefetch.size -= size;
         prefetch.count--;
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Mar  7 05:20:02 2008
@@ -47,32 +47,45 @@
     DeliveryToken::shared_ptr token;
     DeliveryId id;
     bool acquired;
-    const bool confirmed;
     const bool pull;
     bool cancelled;
+    const uint32_t credit;
+    const uint64_t size;
+
+    bool completed;
+    bool ended;
+
+    void setEnded();
 
   public:
     DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, 
                    const DeliveryId id, bool acquired, bool confirmed = false);
     DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
             
-    void dequeue(TransactionContext* ctxt = 0) const;
     bool matches(DeliveryId tag) const;
     bool matchOrAfter(DeliveryId tag) const;
     bool after(DeliveryId tag) const;
     bool coveredBy(const framing::AccumulatedAck* const range) const;
+
+    void dequeue(TransactionContext* ctxt = 0) const;
     void requeue() const;
     void release();
     void reject();
     void cancel(const std::string& tag);
     void redeliver(SemanticState* const);
-    void updateByteCredit(uint32_t& credit) const;
+    void acquire(DeliveryIds& results);
+    void complete();
+    void accept(TransactionContext* ctxt);
+
+    bool isAcquired() const { return acquired; }
+    bool isComplete() const { return completed; }
+    bool isRedundant() const { return ended && completed; }
+
+    uint32_t getCredit() const;
     void addTo(Prefetch&) const;
     void subtractFrom(Prefetch&) const;
     const std::string& getTag() const { return tag; } 
     bool isPull() const { return pull; }
-    bool isAcquired() const { return acquired; }
-    void acquire(DeliveryIds& results);
     friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);         
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp Fri Mar  7 05:20:02 2008
@@ -64,7 +64,7 @@
 
 bool MultiVersionConnectionInputHandler::doOutput()
 {
-    return check(false) &&  handler->doOutput();
+    return handler.get() &&  handler->doOutput();
 }
     
 qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
@@ -74,17 +74,14 @@
 
 void MultiVersionConnectionInputHandler::closed()
 {
-    check();
-    handler->closed();
+    if (handler.get()) handler->closed();
+    //else closed before initiated, nothing to do
 }
 
-bool MultiVersionConnectionInputHandler::check(bool fail)
+void MultiVersionConnectionInputHandler::check()
 {
     if (!handler.get()) { 
-        if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!");
-        else return false;
-    } else {
-        return true;
+        throw qpid::framing::InternalErrorException("Handler not initialised!");
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h Fri Mar  7 05:20:02 2008
@@ -38,7 +38,7 @@
     Broker& broker; 
     const std::string id;
 
-    bool check(bool fail = true);
+    void check();
 
 public:
     MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Mar  7 05:20:02 2008
@@ -393,7 +393,7 @@
             ++end;
         }
         
-        for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
+        for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
         
         if (txBuffer.get()) {
             //in transactional mode, don't dequeue or remove, just
@@ -433,20 +433,23 @@
     }
 }
 
-void SemanticState::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::complete(DeliveryRecord& delivery)
 {    
     delivery.subtractFrom(outstanding);
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
-        get_pointer(i)->adjustFlow(delivery);
+        get_pointer(i)->complete(delivery);
     }
 }
 
-void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
 {
-    if (windowing) {
-        if (msgCredit != 0xFFFFFFFF) msgCredit++;
-        if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+    if (!delivery.isComplete()) {
+        delivery.complete();
+        if (windowing) {
+            if (msgCredit != 0xFFFFFFFF) msgCredit++;
+            if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+        }
     }
 }
 
@@ -662,15 +665,16 @@
             dtxBuffer->enlist(txAck);    
         }
     } else {
-        for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
-        unacked.erase(range.start, range.end);
+        for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
+        unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
     }
 }
 
 void SemanticState::completed(DeliveryId first, DeliveryId last)
 {
     AckRange range = findRange(first, last);
-    for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+    for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+    unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
     requestDispatch();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Mar  7 05:20:02 2008
@@ -88,7 +88,7 @@
         void addMessageCredit(uint32_t value);
         void flush();
         void stop();
-        void adjustFlow(const DeliveryRecord&);    
+        void complete(DeliveryRecord&);    
         Queue::shared_ptr getQueue() { return queue; }
         bool isBlocked() const { return blocked; }
 
@@ -122,7 +122,7 @@
     void checkDtxTimeout();
     ConsumerImpl& find(const std::string& destination);
     void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
-    void adjustFlow(const DeliveryRecord&);
+    void complete(DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
     void requestDispatch(ConsumerImpl&);

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Fri Mar  7 05:20:02 2008
@@ -52,8 +52,6 @@
 tests_0-10.message.MessageTests.test_consume_no_local_awkward
 tests_0-10.message.MessageTests.test_consume_queue_errors
 tests_0-10.message.MessageTests.test_consume_unique_consumers
-tests_0-10.message.MessageTests.test_credit_flow_bytes
-tests_0-10.message.MessageTests.test_credit_flow_messages
 tests_0-10.message.MessageTests.test_no_size
 tests_0-10.message.MessageTests.test_qos_prefetch_count
 tests_0-10.message.MessageTests.test_qos_prefetch_size
@@ -63,8 +61,6 @@
 tests_0-10.message.MessageTests.test_subscribe_not_acquired
 tests_0-10.message.MessageTests.test_subscribe_not_acquired_2
 tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
-tests_0-10.message.MessageTests.test_window_flow_bytes
-tests_0-10.message.MessageTests.test_window_flow_messages
 tests_0-10.testlib.TestBaseTest.testAssertEmptyFail
 tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
 tests_0-10.testlib.TestBaseTest.testMessageProperties

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Mar  7 05:20:02 2008
@@ -464,10 +464,10 @@
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         #create consumer (for now that defaults to infinite credit)
         session.message_subscribe(queue = "q", destination = "c")
-        session.message_flow_mode(mode = 0, destination = "c")
+        session.message_set_flow_mode(flow_mode = 0, destination = "c")
         #send batch of messages to queue
         for i in range(1, 11):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
 
         #set message credit to finite amount (less than enough for all messages)
         session.message_flow(unit = 0, value = 5, destination = "c")
@@ -494,13 +494,13 @@
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         #create consumer (for now that defaults to infinite credit)
         session.message_subscribe(queue = "q", destination = "c")
-        session.message_flow_mode(mode = 0, destination = "c")
+        session.message_set_flow_mode(flow_mode = 0, destination = "c")
         #send batch of messages to queue
         for i in range(10):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
 
         #each message is currently interpreted as requiring msg_size bytes of credit
-        msg_size = 35
+        msg_size = 21
 
         #set byte credit to finite amount (less than enough for all messages)
         session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -527,11 +527,11 @@
         session = self.session
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         #create consumer (for now that defaults to infinite credit)
-        session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
-        session.message_flow_mode(mode = 1, destination = "c")
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 1, destination = "c")
         #send batch of messages to queue
         for i in range(1, 11):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
 
         #set message credit to finite amount (less than enough for all messages)
         session.message_flow(unit = 0, value = 5, destination = "c")
@@ -539,13 +539,16 @@
         session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
         #check that expected number were received
         q = session.incoming("c")
-        for i in range(1, 6):
+        for i in range(1, 6):            
             msg = q.get(timeout = 1)
+            session.receiver._completed.add(msg.id)#TODO: this may be done automatically
             self.assertDataEquals(session, msg, "Message %d" % i)
         self.assertEmpty(q)
 
         #acknowledge messages and check more are received
-        msg.complete(cumulative=True)
+        #TODO: there may be a nicer way of doing this
+        session.channel.session_completed(session.receiver._completed)
+
         for i in range(6, 11):
             self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
         self.assertEmpty(q)
@@ -559,14 +562,14 @@
         session = self.session
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         #create consumer (for now that defaults to infinite credit)
-        session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
-        session.message_flow_mode(mode = 1, destination = "c")
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 1, destination = "c")
         #send batch of messages to queue
         for i in range(10):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
 
         #each message is currently interpreted as requiring msg_size bytes of credit
-        msg_size = 40
+        msg_size = 19
 
         #set byte credit to finite amount (less than enough for all messages)
         session.message_flow(unit = 1, value = msg_size*5, destination = "c")
@@ -584,7 +587,9 @@
         #ack each message individually and check more are received
         for i in range(5):
             msg = msgs.pop()
-            msg.complete(cumulative=False)
+            #TODO: there may be a nicer way of doing this
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
             self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
             self.assertEmpty(q)
 
@@ -595,13 +600,17 @@
         session = self.session
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         for i in range(1, 6):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
 
-        self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
-        self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+        session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a")
+        session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+        session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b")
+        session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
 
         for i in range(6, 11):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
 
         #both subscribers should see all messages
         qA = session.incoming("a")
@@ -610,8 +619,9 @@
             for q in [qA, qB]:
                 msg = q.get(timeout = 1)
                 self.assertEquals("Message %s" % i, msg.body)
-                msg.complete()
+                session.receiver._completed.add(msg.id)
 
+        session.channel.session_completed(session.receiver._completed)
         #messages should still be on the queue:
         self.assertEquals(10, session.queue_query(queue = "q").message_count)
 
@@ -625,7 +635,7 @@
         #use fanout for now:
         session.exchange_bind(exchange="amq.fanout", queue="q")
         session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
-        #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+        #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
 
         session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
         session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -724,11 +734,11 @@
         #publish some messages
         self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         for i in range(1, 11):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
 
         #consume some of them
         session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
-        session.message_flow_mode(mode = 0, destination = "a")
+        session.message_set_flow_mode(flow_mode = 0, destination = "a")
         session.message_flow(unit = 0, value = 5, destination = "a")
         session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
 
@@ -762,7 +772,7 @@
         #publish some messages
         self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         for i in range(1, 11):
-            session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
 
         #create a not-acquired subscriber
         session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
@@ -861,7 +871,7 @@
     def assertEmpty(self, queue):
         try:
             extra = queue.get(timeout=1)
-            self.fail("Queue not empty, contains: " + extra.content.body)
+            self.fail("Queue not empty, contains: " + extra.body)
         except Empty: None
 
 class SizelessContent(Content):

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=634661&r1=634660&r2=634661&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Fri Mar  7 05:20:02 2008
@@ -18,10 +18,10 @@
 #
 from qpid.client import Client, Closed
 from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
 
-class QueueTests(TestBase):
+class QueueTests(TestBase010):
     """Tests for 'methods' on the amqp queue 'class'"""
 
     def test_purge(self):
@@ -31,9 +31,9 @@
         session = self.session
         #setup, declare a queue and add some messages to it:
         session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
-        session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"}))
-        session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"}))
-        session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"}))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
 
         #check that the queue now reports 3 messages:
         session.queue_declare(queue="test-queue")
@@ -46,15 +46,16 @@
         self.assertEqual(0, reply.message_count)        
 
         #send a further message and consume it, ensuring that the other messages are really gone
-        session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"}))
-        self.subscribe(queue="test-queue", destination="tag")
-        queue = self.client.queue("tag")
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
+        session.message_subscribe(queue="test-queue", destination="tag")
+        session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF)
+        queue = session.incoming("tag")
         msg = queue.get(timeout=1)
-        self.assertEqual("four", msg.content.body)
+        self.assertEqual("four", msg.body)
 
         #check error conditions (use new sessions): 
-        session = self.client.session(2)
-        session.session_open()
+        session = self.conn.session("error-checker")
         try:
             #queue specified but doesn't exist:
             session.queue_purge(queue="invalid-queue")
@@ -62,8 +63,7 @@
         except Closed, e:
             self.assertChannelException(404, e.args[0])
 
-        session = self.client.session(3)
-        session.session_open()
+        session = self.conn.session("error-checker")
         try:
             #queue not specified and none previously declared for channel:
             session.queue_purge()
@@ -71,12 +71,6 @@
         except Closed, e:
             self.assertConnectionException(530, e.args[0])
 
-        #cleanup    
-        other = self.connect()
-        session = other.session(1)
-        session.session_open()
-        session.exchange_delete(exchange="test-exchange")
-
     def test_declare_exclusive(self):
         """
         Test that the exclusive field is honoured in queue.declare
@@ -167,32 +161,32 @@
         self.subscribe(queue="queue-1", destination="queue-1")
         self.subscribe(queue="queue-2", destination="queue-2")
 
-        queue1 = self.client.queue("queue-1")
-        queue2 = self.client.queue("queue-2")
+        queue1 = session.incoming("queue-1")
+        queue2 = session.incoming("queue-2")
 
         session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
         session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
 
         #send a message that will match both bindings
         session.message_transfer(destination=exchange,
-                                 content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
+                                 message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one"))
         
         #unbind first queue
         session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
         
         #send another message
         session.message_transfer(destination=exchange,
-                                 content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
+                                 message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", ))
 
         #check one queue has both messages and the other has only one
-        self.assertEquals("one", queue1.get(timeout=1).content.body)
+        self.assertEquals("one", queue1.get(timeout=1).body)
         try:
             msg = queue1.get(timeout=1)
-            self.fail("Got extra message: %s" % msg.content.body)
+            self.fail("Got extra message: %s" % msg.body)
         except Empty: pass
 
-        self.assertEquals("one", queue2.get(timeout=1).content.body)
-        self.assertEquals("two", queue2.get(timeout=1).content.body)
+        self.assertEquals("one", queue2.get(timeout=1).body)
+        self.assertEquals("two", queue2.get(timeout=1).body)
         try:
             msg = queue2.get(timeout=1)
             self.fail("Got extra message: " + msg)
@@ -207,9 +201,9 @@
 
         #straight-forward case:
         session.queue_declare(queue="delete-me")
-        session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
-        session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
-        session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+        session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me")))
+        session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me")))
+        session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me")))
         session.queue_delete(queue="delete-me")
         #check that it has gone be declaring passively
         try:
@@ -238,7 +232,7 @@
         #create a queue and add a message to it (use default binding):
         session.queue_declare(queue="delete-me-2")
         session.queue_declare(queue="delete-me-2", passive=True)
-        session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+        session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2")))
 
         #try to delete, but only if empty:
         try:
@@ -253,9 +247,9 @@
 
         #empty queue:
         self.subscribe(session, destination="consumer_tag", queue="delete-me-2")
-        queue = self.client.queue("consumer_tag")
+        queue = session.incoming("consumer_tag")
         msg = queue.get(timeout=1)
-        self.assertEqual("message", msg.content.body)
+        self.assertEqual("message", msg.body)
         session.message_cancel(destination="consumer_tag")
 
         #retry deletion on empty queue: