You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/11 22:56:53 UTC

svn commit: r636121 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/xml/ python/ python/qpid/ python/tests_0-10/

Author: gsim
Date: Tue Mar 11 14:56:49 2008
New Revision: 636121

URL: http://svn.apache.org/viewvc?rev=636121&view=rev
Log:
Enabled tx methods on final 0-10 path and converted tests accordingly
Added read/write- uuid to codec010


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    incubator/qpid/trunk/qpid/cpp/xml/extra.xml
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/qpid/codec010.py
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py
    incubator/qpid/trunk/qpid/python/tests_0-10/tx.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Tue Mar 11 14:56:49 2008
@@ -83,6 +83,7 @@
     Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented");  }
     Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented");  }
     Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented");  }
+    Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented");  }
     Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented");  }
 
     // Handlers no longer implemented in BrokerAdapter:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Mar 11 14:56:49 2008
@@ -38,7 +38,8 @@
     exchangeImpl(s),
     queueImpl(s),
     messageImpl(s),
-    executionImpl(s)
+    executionImpl(s),
+    txImpl(s)
 {}
 
 
@@ -402,6 +403,23 @@
                                                      const framing::FieldTable& /*errorInfo*/)
 {
     //TODO
+}
+
+
+
+void SessionAdapter::TxHandlerImpl::select()
+{
+    state.startTx();
+}
+
+void SessionAdapter::TxHandlerImpl::commit()
+{
+    state.commit(&getBroker().getStore());
+}
+
+void SessionAdapter::TxHandlerImpl::rollback()
+{    
+    state.rollback();
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Tue Mar 11 14:56:49 2008
@@ -56,7 +56,7 @@
     Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
     Queue010Handler* getQueue010Handler(){ return &queueImpl;  }
     Execution010Handler* getExecution010Handler(){ return &executionImpl; }
-
+    Tx010Handler* getTx010Handler(){ return &txImpl; }
 
     BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented");  }
     ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented");  }
@@ -201,10 +201,22 @@
 
     };
 
+    class TxHandlerImpl : public Tx010Handler, public HandlerHelper
+    {
+      public:
+        TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+        
+        void select();
+        void commit();
+        void rollback();
+    };
+
+
     ExchangeHandlerImpl exchangeImpl;
     QueueHandlerImpl queueImpl;
     MessageHandlerImpl messageImpl;
     ExecutionHandlerImpl executionImpl;
+    TxHandlerImpl txImpl;
 };
 }} // namespace qpid::broker
 

Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Tue Mar 11 14:56:49 2008
@@ -723,6 +723,22 @@
     </method>
 </class>
 
+<class name="tx010" index="5">
+    <doc>blah, blah</doc>
+    <method name = "select" index="1">
+        <doc>blah, blah</doc>
+        <chassis name="server" implement="MUST" />
+    </method>
+    <method name = "commit" index="2">
+        <doc>blah, blah</doc>
+        <chassis name="server" implement="MUST" />
+    </method>
+    <method name = "rollback" index="3">
+        <doc>blah, blah</doc>
+        <chassis name="server" implement="MUST" />
+    </method>
+</class>
+
 <class name="exchange010" index="7">
     <doc>blah, blah</doc>
     <method name = "declare" index="1">

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Tue Mar 11 14:56:49 2008
@@ -1,9 +1,6 @@
 tests.codec.FieldTableTestCase.test_field_table_decode
 tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
 tests.codec.FieldTableTestCase.test_field_table_name_value_pair
-tests_0-10.tx.TxTests.test_auto_rollback
-tests_0-10.tx.TxTests.test_commit
-tests_0-10.tx.TxTests.test_rollback
 tests_0-10.execution.ExecutionTests.test_flush
 tests_0-10.dtx.DtxTests.test_bad_resume
 tests_0-10.dtx.DtxTests.test_end
@@ -26,7 +23,6 @@
 tests_0-10.dtx.DtxTests.test_start_join_and_resume
 tests_0-10.dtx.DtxTests.test_suspend_resume
 tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
-tests_0-10.message.MessageTests.test_consume_exclusive
 tests_0-10.message.MessageTests.test_consume_no_local
 tests_0-10.message.MessageTests.test_consume_no_local_awkward
 tests_0-10.message.MessageTests.test_no_size

Modified: incubator/qpid/trunk/qpid/python/qpid/codec010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec010.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec010.py Tue Mar 11 14:56:49 2008
@@ -221,6 +221,12 @@
       attr = "write_uint%d" % (width*8)
       getattr(self, attr)(n)
 
+  def write_uuid(self, s):
+    self.pack("16s", s)
+
+  def read_uuid(self):
+    return self.unpack("16s")
+
 
 
 class StringCodec(Codec):

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Tue Mar 11 14:56:49 2008
@@ -352,6 +352,12 @@
         self.conn.start(timeout=10)        
         self.session = self.conn.session("test-session", timeout=10)
 
+    def connect(self):
+        spec = testrunner.spec
+        conn = Connection(connect(testrunner.host, testrunner.port), spec)
+        conn.start(timeout=10)
+        return conn
+
     def tearDown(self):
         if not self.session.error(): self.session.close(timeout=10)
         self.conn.close(timeout=10)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Tue Mar 11 14:56:49 2008
@@ -96,31 +96,29 @@
 
     def test_consume_exclusive(self):
         """
-        Test that the exclusive flag is honoured in the consume method
+        Test an exclusive consumer prevents other consumer being created
         """
         session = self.session
-        #setup, declare a queue:
         session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
-
-        #check that an exclusive consumer prevents other consumer being created:
-        self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
+        session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
         try:
-            self.subscribe(destination="second", queue="test-queue-2")
+            session.message_subscribe(destination="second", queue="test-queue-2")
             self.fail("Expected consume request to fail due to previous exclusive consumer")
-        except Closed, e:
-            self.assertChannelException(403, e.args[0])
-
-        #open new session and cleanup last consumer:
-        session = self.client.session(2)
-        session.session_open()
+        except SessionException, e:
+            self.assertEquals(403, e.args[0].error_code)
 
-        #check that an exclusive consumer cannot be created if a consumer already exists:
-        self.subscribe(session, destination="first", queue="test-queue-2")
+    def test_consume_exclusive2(self):
+        """
+        Check that an exclusive consumer cannot be created if a consumer already exists:
+        """
+        session = self.session
+        session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+        session.message_subscribe(destination="first", queue="test-queue-2")
         try:
-            self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
+            session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
             self.fail("Expected exclusive consume request to fail due to previous consumer")
-        except Closed, e:
-            self.assertChannelException(403, e.args[0])
+        except SessionException, e:
+            self.assertEquals(403, e.args[0].error_code)
 
     def test_consume_queue_not_found(self):
         """

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Tue Mar 11 14:56:49 2008
@@ -18,10 +18,10 @@
 #
 from qpid.client import Client, Closed
 from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import testrunner, TestBase010
 
-class TxTests(TestBase):
+class TxTests(TestBase010):
     """
     Tests for 'methods' on the amqp tx 'class'
     """
@@ -30,202 +30,216 @@
         """
         Test that commited publishes are delivered and commited acks are not re-delivered
         """
-        channel2 = self.client.channel(2)
-        channel2.session_open()
-        self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
-        channel2.tx_commit()
-        channel2.session_close()
+        session2 = self.conn.session("worker", 2)
+        self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+        session2.tx_commit()
+        session2.close()
 
-        #use a different channel with new subscriptions to ensure
+        #use a different session with new subscriptions to ensure
         #there is no redelivery of acked messages:
-        channel = self.channel
-        channel.tx_select()
+        session = self.session
+        session.tx_select()
 
-        self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
-        queue_a = self.client.queue("qa")
+        self.subscribe(session, queue="tx-commit-a", destination="qa")
+        queue_a = session.incoming("qa")
 
-        self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
-        queue_b = self.client.queue("qb")
+        self.subscribe(session, queue="tx-commit-b", destination="qb")
+        queue_b = session.incoming("qb")
 
-        self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
-        queue_c = self.client.queue("qc")
+        self.subscribe(session, queue="tx-commit-c", destination="qc")
+        queue_c = session.incoming("qc")
 
         #check results
         for i in range(1, 5):
             msg = queue_c.get(timeout=1)
-            self.assertEqual("TxMessage %d" % i, msg.content.body)
-            msg.complete()
+            self.assertEqual("TxMessage %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
 
         msg = queue_b.get(timeout=1)
-        self.assertEqual("TxMessage 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("TxMessage 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         msg = queue_a.get(timeout=1)
-        self.assertEqual("TxMessage 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("TxMessage 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
         #cleanup
-        channel.tx_commit()
+        session.tx_commit()
 
     def test_auto_rollback(self):
         """
-        Test that a channel closed with an open transaction is effectively rolled back
+        Test that a session closed with an open transaction is effectively rolled back
         """
-        channel2 = self.client.channel(2)
-        channel2.session_open()
-        queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+        session2 = self.conn.session("worker", 2)
+        queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
-        channel2.session_close()
-        channel = self.channel
-        channel.tx_select()
+        session2.close()
 
-        self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1)
-        queue_a = self.client.queue("qa")
+        session = self.session
+        session.tx_select()
 
-        self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1)
-        queue_b = self.client.queue("qb")
+        self.subscribe(session, queue="tx-autorollback-a", destination="qa")
+        queue_a = session.incoming("qa")
 
-        self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1)
-        queue_c = self.client.queue("qc")
+        self.subscribe(session, queue="tx-autorollback-b", destination="qb")
+        queue_b = session.incoming("qb")
+
+        self.subscribe(session, queue="tx-autorollback-c", destination="qc")
+        queue_c = session.incoming("qc")
 
         #check results
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
-            self.assertEqual("Message %d" % i, msg.content.body)
-            msg.complete()
+            self.assertEqual("Message %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
 
         msg = queue_b.get(timeout=1)
-        self.assertEqual("Message 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         msg = queue_c.get(timeout=1)
-        self.assertEqual("Message 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
         #cleanup
-        channel.tx_commit()
+        session.tx_commit()
 
     def test_rollback(self):
         """
         Test that rolled back publishes are not delivered and rolled back acks are re-delivered
         """
-        channel = self.channel
-        queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+        session = self.session
+        queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
-        #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued)
-        for d in ["sub_a", "sub_b", "sub_c"]:
-            channel.message_stop(destination=d)
-
-        channel.tx_rollback()
-
-        #restart susbcriptions
-        for d in ["sub_a", "sub_b", "sub_c"]:
-            channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
-            channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
+        session.tx_rollback()
+
+        #need to release messages to get them redelivered now:
+        session.message_release(consumed)
 
         #check results
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
-            self.assertEqual("Message %d" % i, msg.content.body)
-            msg.complete()
+            self.assertEqual("Message %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
 
         msg = queue_b.get(timeout=1)
-        self.assertEqual("Message 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         msg = queue_c.get(timeout=1)
-        self.assertEqual("Message 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
         #cleanup
-        channel.tx_commit()
+        session.tx_commit()
 
-    def perform_txn_work(self, channel, name_a, name_b, name_c):
+    def perform_txn_work(self, session, name_a, name_b, name_c):
         """
         Utility method that does some setup and some work under a transaction. Used for testing both
         commit and rollback
         """
         #setup:
-        channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
-        channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
-        channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+        session.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
+        session.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
+        session.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
 
         key = "my_key_" + name_b
         topic = "my_topic_" + name_c 
     
-        channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
-        channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+        session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+        session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
 
+        dp = session.delivery_properties(routing_key=name_a)
         for i in range(1, 5):
-            channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
+            mp = session.message_properties(message_id="msg%d" % i)
+            session.message_transfer(message=Message(dp, mp, "Message %d" % i))
 
-        channel.message_transfer(destination="amq.direct",
-                                 content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
-        channel.message_transfer(destination="amq.topic",
-                                 content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
+        dp = session.delivery_properties(routing_key=key)
+        mp = session.message_properties(message_id="msg6")
+        session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
 
-        channel.tx_select()
+        dp = session.delivery_properties(routing_key=topic)
+        mp = session.message_properties(message_id="msg7")
+        session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
+
+        session.tx_select()
 
         #consume and ack messages
-        self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
-        queue_a = self.client.queue("sub_a")
+        acked = RangedSet()
+        self.subscribe(session, queue=name_a, destination="sub_a")
+        queue_a = session.incoming("sub_a")
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
-            self.assertEqual("Message %d" % i, msg.content.body)
-
-        msg.complete()
+            acked.add(msg.id)
+            self.assertEqual("Message %d" % i, msg.body)
 
-        self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
-        queue_b = self.client.queue("sub_b")
+        self.subscribe(session, queue=name_b, destination="sub_b")
+        queue_b = session.incoming("sub_b")
         msg = queue_b.get(timeout=1)
-        self.assertEqual("Message 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 6", msg.body)
+        acked.add(msg.id)
 
-        sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
-        queue_c = self.client.queue("sub_c")
+        sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+        queue_c = session.incoming("sub_c")
         msg = queue_c.get(timeout=1)
-        self.assertEqual("Message 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 7", msg.body)
+        acked.add(msg.id)
 
+        session.message_accept(acked)
+
+        dp = session.delivery_properties(routing_key=topic)
         #publish messages
         for i in range(1, 5):
-            channel.message_transfer(destination="amq.topic",
-                                     content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
-                                                     body="TxMessage %d" % i))
-
-        channel.message_transfer(destination="amq.direct",
-                                 content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
-                                                 body="TxMessage 6"))
-        channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
-                                                 body="TxMessage 7"))
-        return queue_a, queue_b, queue_c
+            mp = session.message_properties(message_id="tx-msg%d" % i)
+            session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+        dp = session.delivery_properties(routing_key=key)
+        mp = session.message_properties(message_id="tx-msg6")
+        session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+        
+        dp = session.delivery_properties(routing_key=name_a)
+        mp = session.message_properties(message_id="tx-msg7")
+        session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+        return queue_a, queue_b, queue_c, acked
+
+    def subscribe(self, session=None, **keys):
+        session = session or self.session
+        consumer_tag = keys["destination"]
+        session.message_subscribe(**keys)
+        session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+
+    def complete(self, session, msg):
+        session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+        session.channel.session_completed(session.receiver._completed)
+