You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/11 22:56:53 UTC
svn commit: r636121 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/xml/ python/ python/qpid/ python/tests_0-10/
Author: gsim
Date: Tue Mar 11 14:56:49 2008
New Revision: 636121
URL: http://svn.apache.org/viewvc?rev=636121&view=rev
Log:
Enabled tx methods on final 0-10 path and converted tests accordingly
Added read/write- uuid to codec010
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
incubator/qpid/trunk/qpid/cpp/xml/extra.xml
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/qpid/codec010.py
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Tue Mar 11 14:56:49 2008
@@ -83,6 +83,7 @@
Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Mar 11 14:56:49 2008
@@ -38,7 +38,8 @@
exchangeImpl(s),
queueImpl(s),
messageImpl(s),
- executionImpl(s)
+ executionImpl(s),
+ txImpl(s)
{}
@@ -402,6 +403,23 @@
const framing::FieldTable& /*errorInfo*/)
{
//TODO
+}
+
+
+
+void SessionAdapter::TxHandlerImpl::select()
+{
+ state.startTx();
+}
+
+void SessionAdapter::TxHandlerImpl::commit()
+{
+ state.commit(&getBroker().getStore());
+}
+
+void SessionAdapter::TxHandlerImpl::rollback()
+{
+ state.rollback();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Tue Mar 11 14:56:49 2008
@@ -56,7 +56,7 @@
Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
Queue010Handler* getQueue010Handler(){ return &queueImpl; }
Execution010Handler* getExecution010Handler(){ return &executionImpl; }
-
+ Tx010Handler* getTx010Handler(){ return &txImpl; }
BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
@@ -201,10 +201,22 @@
};
+ class TxHandlerImpl : public Tx010Handler, public HandlerHelper
+ {
+ public:
+ TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+
+ void select();
+ void commit();
+ void rollback();
+ };
+
+
ExchangeHandlerImpl exchangeImpl;
QueueHandlerImpl queueImpl;
MessageHandlerImpl messageImpl;
ExecutionHandlerImpl executionImpl;
+ TxHandlerImpl txImpl;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Tue Mar 11 14:56:49 2008
@@ -723,6 +723,22 @@
</method>
</class>
+<class name="tx010" index="5">
+ <doc>blah, blah</doc>
+ <method name = "select" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ </method>
+ <method name = "commit" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ </method>
+ <method name = "rollback" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ </method>
+</class>
+
<class name="exchange010" index="7">
<doc>blah, blah</doc>
<method name = "declare" index="1">
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Tue Mar 11 14:56:49 2008
@@ -1,9 +1,6 @@
tests.codec.FieldTableTestCase.test_field_table_decode
tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
-tests_0-10.tx.TxTests.test_auto_rollback
-tests_0-10.tx.TxTests.test_commit
-tests_0-10.tx.TxTests.test_rollback
tests_0-10.execution.ExecutionTests.test_flush
tests_0-10.dtx.DtxTests.test_bad_resume
tests_0-10.dtx.DtxTests.test_end
@@ -26,7 +23,6 @@
tests_0-10.dtx.DtxTests.test_start_join_and_resume
tests_0-10.dtx.DtxTests.test_suspend_resume
tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
-tests_0-10.message.MessageTests.test_consume_exclusive
tests_0-10.message.MessageTests.test_consume_no_local
tests_0-10.message.MessageTests.test_consume_no_local_awkward
tests_0-10.message.MessageTests.test_no_size
Modified: incubator/qpid/trunk/qpid/python/qpid/codec010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec010.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec010.py Tue Mar 11 14:56:49 2008
@@ -221,6 +221,12 @@
attr = "write_uint%d" % (width*8)
getattr(self, attr)(n)
+ def write_uuid(self, s):
+ self.pack("16s", s)
+
+ def read_uuid(self):
+ return self.unpack("16s")
+
class StringCodec(Codec):
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Tue Mar 11 14:56:49 2008
@@ -352,6 +352,12 @@
self.conn.start(timeout=10)
self.session = self.conn.session("test-session", timeout=10)
+ def connect(self):
+ spec = testrunner.spec
+ conn = Connection(connect(testrunner.host, testrunner.port), spec)
+ conn.start(timeout=10)
+ return conn
+
def tearDown(self):
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Tue Mar 11 14:56:49 2008
@@ -96,31 +96,29 @@
def test_consume_exclusive(self):
"""
- Test that the exclusive flag is honoured in the consume method
+ Test an exclusive consumer prevents other consumer being created
"""
session = self.session
- #setup, declare a queue:
session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
try:
- self.subscribe(destination="second", queue="test-queue-2")
+ session.message_subscribe(destination="second", queue="test-queue-2")
self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new session and cleanup last consumer:
- session = self.client.session(2)
- session.session_open()
+ except SessionException, e:
+ self.assertEquals(403, e.args[0].error_code)
- #check that an exclusive consumer cannot be created if a consumer already exists:
- self.subscribe(session, destination="first", queue="test-queue-2")
+ def test_consume_exclusive2(self):
+ """
+ Check that an exclusive consumer cannot be created if a consumer already exists:
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.message_subscribe(destination="first", queue="test-queue-2")
try:
- self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
+ except SessionException, e:
+ self.assertEquals(403, e.args[0].error_code)
def test_consume_queue_not_found(self):
"""
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=636121&r1=636120&r2=636121&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Tue Mar 11 14:56:49 2008
@@ -18,10 +18,10 @@
#
from qpid.client import Client, Closed
from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import testrunner, TestBase010
-class TxTests(TestBase):
+class TxTests(TestBase010):
"""
Tests for 'methods' on the amqp tx 'class'
"""
@@ -30,202 +30,216 @@
"""
Test that commited publishes are delivered and commited acks are not re-delivered
"""
- channel2 = self.client.channel(2)
- channel2.session_open()
- self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel2.tx_commit()
- channel2.session_close()
+ session2 = self.conn.session("worker", 2)
+ self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ session2.tx_commit()
+ session2.close()
- #use a different channel with new subscriptions to ensure
+ #use a different session with new subscriptions to ensure
#there is no redelivery of acked messages:
- channel = self.channel
- channel.tx_select()
+ session = self.session
+ session.tx_select()
- self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
- queue_a = self.client.queue("qa")
+ self.subscribe(session, queue="tx-commit-a", destination="qa")
+ queue_a = session.incoming("qa")
- self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
- queue_b = self.client.queue("qb")
+ self.subscribe(session, queue="tx-commit-b", destination="qb")
+ queue_b = session.incoming("qb")
- self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
- queue_c = self.client.queue("qc")
+ self.subscribe(session, queue="tx-commit-c", destination="qc")
+ queue_c = session.incoming("qc")
#check results
for i in range(1, 5):
msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.content.body)
- msg.complete()
+ self.assertEqual("TxMessage %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.content.body)
- msg.complete()
+ self.assertEqual("TxMessage 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.content.body)
- msg.complete()
+ self.assertEqual("TxMessage 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
#cleanup
- channel.tx_commit()
+ session.tx_commit()
def test_auto_rollback(self):
"""
- Test that a channel closed with an open transaction is effectively rolled back
+ Test that a session closed with an open transaction is effectively rolled back
"""
- channel2 = self.client.channel(2)
- channel2.session_open()
- queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+ session2 = self.conn.session("worker", 2)
+ queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
- channel2.session_close()
- channel = self.channel
- channel.tx_select()
+ session2.close()
- self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1)
- queue_a = self.client.queue("qa")
+ session = self.session
+ session.tx_select()
- self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1)
- queue_b = self.client.queue("qb")
+ self.subscribe(session, queue="tx-autorollback-a", destination="qa")
+ queue_a = session.incoming("qa")
- self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1)
- queue_c = self.client.queue("qc")
+ self.subscribe(session, queue="tx-autorollback-b", destination="qb")
+ queue_b = session.incoming("qb")
+
+ self.subscribe(session, queue="tx-autorollback-c", destination="qc")
+ queue_c = session.incoming("qc")
#check results
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- msg.complete()
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
#cleanup
- channel.tx_commit()
+ session.tx_commit()
def test_rollback(self):
"""
Test that rolled back publishes are not delivered and rolled back acks are re-delivered
"""
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+ session = self.session
+ queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
- #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued)
- for d in ["sub_a", "sub_b", "sub_c"]:
- channel.message_stop(destination=d)
-
- channel.tx_rollback()
-
- #restart susbcriptions
- for d in ["sub_a", "sub_b", "sub_c"]:
- channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
+ session.tx_rollback()
+
+ #need to release messages to get them redelivered now:
+ session.message_release(consumed)
#check results
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- msg.complete()
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
#cleanup
- channel.tx_commit()
+ session.tx_commit()
- def perform_txn_work(self, channel, name_a, name_b, name_c):
+ def perform_txn_work(self, session, name_a, name_b, name_c):
"""
Utility method that does some setup and some work under a transaction. Used for testing both
commit and rollback
"""
#setup:
- channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
- channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
- channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+ session.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
+ session.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
+ session.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
key = "my_key_" + name_b
topic = "my_topic_" + name_c
- channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
- channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+ session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+ session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
+ dp = session.delivery_properties(routing_key=name_a)
for i in range(1, 5):
- channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
+ mp = session.message_properties(message_id="msg%d" % i)
+ session.message_transfer(message=Message(dp, mp, "Message %d" % i))
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
- channel.tx_select()
+ dp = session.delivery_properties(routing_key=topic)
+ mp = session.message_properties(message_id="msg7")
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
+
+ session.tx_select()
#consume and ack messages
- self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
- queue_a = self.client.queue("sub_a")
+ acked = RangedSet()
+ self.subscribe(session, queue=name_a, destination="sub_a")
+ queue_a = session.incoming("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- msg.complete()
+ acked.add(msg.id)
+ self.assertEqual("Message %d" % i, msg.body)
- self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
- queue_b = self.client.queue("sub_b")
+ self.subscribe(session, queue=name_b, destination="sub_b")
+ queue_b = session.incoming("sub_b")
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 6", msg.body)
+ acked.add(msg.id)
- sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
- queue_c = self.client.queue("sub_c")
+ sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+ queue_c = session.incoming("sub_c")
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- msg.complete()
+ self.assertEqual("Message 7", msg.body)
+ acked.add(msg.id)
+ session.message_accept(acked)
+
+ dp = session.delivery_properties(routing_key=topic)
#publish messages
for i in range(1, 5):
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
- body="TxMessage %d" % i))
-
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
- body="TxMessage 6"))
- channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
- body="TxMessage 7"))
- return queue_a, queue_b, queue_c
+ mp = session.message_properties(message_id="tx-msg%d" % i)
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="tx-msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+
+ dp = session.delivery_properties(routing_key=name_a)
+ mp = session.message_properties(message_id="tx-msg7")
+ session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+ return queue_a, queue_b, queue_c, acked
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+
+ def complete(self, session, msg):
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ session.channel.session_completed(session.receiver._completed)
+