You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/19 21:34:33 UTC
svn commit: r520094 - in /incubator/qpid/trunk/qpid: ./ python/ python/qpid/
python/tests/
Author: aconway
Date: Mon Mar 19 13:34:32 2007
New Revision: 520094
URL: http://svn.apache.org/viewvc?view=rev&rev=520094
Log:
Merged revisions 507491-507559,507561-507601,507603-507621,507623-507671,507673-507959,507961-507992,507994-508097,508099-508149,508151-508155,508157-508232,508234-508378,508380-508390,508392-508459,508461-508704,508707-509615,509617-509737,509739-509753,509756-509833,509835-510106,510108-510160,510162-510179,510181-510552,510554-510704,510706-510911,510913-510985,510987-511003,511005-514750,514752-515720,515722-516156,516158-516458,516461-516484,516486-516488,516490-517823,517825,517827,517829,517831-517832,517834-517848,517850,517852-517854,517856-517858,517860-517877,517879-517886,517888-517891,517893-517903,517905,517907-517928,517930,517932-518197,518201-518206,518208-518230,518232,518235,518237,518239-518240,518243-518245,518247-518255,518257,518259-518260,518262,518264,518266-518292,518294-518707 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r507491 | gsim | 2007-02-14 06:39:26 -0500 (Wed, 14 Feb 2007) | 3 lines
Expanded the use of batched acks to a few other places in tests.
........
r508377 | gsim | 2007-02-16 07:03:37 -0500 (Fri, 16 Feb 2007) | 4 lines
Updated failing list for java (some of these result in test suite blocking)
Added better error handling when connection closes without close method
........
r508396 | gsim | 2007-02-16 08:54:54 -0500 (Fri, 16 Feb 2007) | 3 lines
Fix: use message_resume not channel_resume
........
r509611 | gsim | 2007-02-20 10:39:14 -0500 (Tue, 20 Feb 2007) | 3 lines
Fixed bug where response id rather than request id was being used to get listener for response.
........
r509617 | gsim | 2007-02-20 10:52:31 -0500 (Tue, 20 Feb 2007) | 3 lines
Updated list of failing tests for java broker on this branch.
........
r510096 | gsim | 2007-02-21 11:49:27 -0500 (Wed, 21 Feb 2007) | 5 lines
Fixed bug in references where map wasn't qualified in close
Attach reference to transfer, as it will be deleted on close
Altered tests to get reference from the message on the queue rather than looking them up from channel as they are already gone there
........
r510114 | astitcher | 2007-02-21 12:37:36 -0500 (Wed, 21 Feb 2007) | 3 lines
r1224@fuschia: andrew | 2007-02-21 17:20:59 +0000
Updated expected cpp broker test failures
........
r510128 | gsim | 2007-02-21 13:06:02 -0500 (Wed, 21 Feb 2007) | 3 lines
Ensure socket is closed in tearDown
........
r510913 | gsim | 2007-02-23 06:37:08 -0500 (Fri, 23 Feb 2007) | 3 lines
Revised list of failing tests for java broker on this branch
........
r515363 | aconway | 2007-03-06 18:35:08 -0500 (Tue, 06 Mar 2007) | 6 lines
* python/qpid/peer.py (Channel.__init__): use reliable framing if
version >= 0-8.
* python/qpid/spec.py (Spec.__init__): Remove unused parameter.
* python/qpid/testlib.py (TestRunner._parseargs): Add --errata option,
set default errata only if --spec is not present.
........
r518707 | aconway | 2007-03-15 13:49:44 -0400 (Thu, 15 Mar 2007) | 6 lines
* python/qpid/peer.py (Peer.close): Close delegate *before* channels.
Otherwise we get a race: closing a channel can wake a client thread,
which may see client.closed as still false. Was causing bogus exceptions
in some tests.
........
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/python/cpp_failing.txt
incubator/qpid/trunk/qpid/python/java_failing.txt
incubator/qpid/trunk/qpid/python/qpid/client.py
incubator/qpid/trunk/qpid/python/qpid/connection.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/qpid/reference.py
incubator/qpid/trunk/qpid/python/qpid/spec.py
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/tests/message.py
incubator/qpid/trunk/qpid/python/tests/tx.py
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
svk:merge = 8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1224
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/python/cpp_failing.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing.txt?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing.txt Mon Mar 19 13:34:32 2007
@@ -0,0 +1,4 @@
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reference_large
+tests.message.MessageTests.test_reject
+tests.basic.BasicTests.test_get
Modified: incubator/qpid/trunk/qpid/python/java_failing.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/java_failing.txt?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/java_failing.txt (original)
+++ incubator/qpid/trunk/qpid/python/java_failing.txt Mon Mar 19 13:34:32 2007
@@ -1,4 +1,4 @@
-tests.basic.BasicTests.test_qos_prefetch_count
+ntests.basic.BasicTests.test_qos_prefetch_count
tests.basic.BasicTests.test_ack
tests.basic.BasicTests.test_cancel
tests.basic.BasicTests.test_consume_exclusive
@@ -8,11 +8,11 @@
tests.basic.BasicTests.test_get
tests.basic.BasicTests.test_qos_prefetch_size
tests.basic.BasicTests.test_recover_requeue
+
tests.exchange.RecommendedTypesRuleTests.testTopic
tests.exchange.RequiredInstancesRuleTests.testAmqTopic
-tests.message.MessageTests.test_qos_prefetch_count
-tests.message.MessageTests.test_ack
-tests.message.MessageTests.test_get
-tests.message.MessageTests.test_qos_prefetch_size
-tests.message.MessageTests.test_recover_requeue
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reject
+
+tests.broker.BrokerTests.test_ping_pong
Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Mon Mar 19 13:34:32 2007
@@ -76,7 +76,8 @@
self.locale = locale
self.tune_params = tune_params
- self.conn = Connection(connect(self.host, self.port), self.spec)
+ self.socket = connect(self.host, self.port)
+ self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
self.conn.init()
@@ -90,6 +91,9 @@
def opened(self, ch):
ch.references = References()
+ def close(self):
+ self.socket.close()
+
class ClientDelegate(Delegate):
def __init__(self, client):
@@ -112,9 +116,8 @@
def message_transfer(self, ch, msg):
if isinstance(msg.body, ReferenceId):
- self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
- else:
- self.client.queue(msg.destination).put(msg)
+ msg.reference = ch.references.get(msg.body.id)
+ self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
ch.references.open(msg.reference)
Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Mon Mar 19 13:34:32 2007
@@ -53,6 +53,9 @@
def flush(self):
pass
+ def close(self):
+ self.sock.shutdown(socket.SHUT_RDWR)
+
def connect(host, port):
sock = socket.socket()
sock.connect((host, port))
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Mon Mar 19 13:34:32 2007
@@ -97,9 +97,12 @@
self.fatal()
def close(self, reason):
+ # We must close the delegate first because closing channels
+ # may wake up waiting threads and we don't want them to see
+ # the delegate as open.
+ self.delegate.close(reason)
for ch in self.channels.values():
ch.close(reason)
- self.delegate.close(reason)
def writer(self):
try:
@@ -144,7 +147,7 @@
self.write(frame, content)
def receive(self, channel, frame):
- listener = self.outstanding.pop(frame.id)
+ listener = self.outstanding.pop(frame.request_id)
listener(channel, frame)
class Responder:
@@ -178,8 +181,8 @@
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- # XXX: better switch
- self.reliable = False
+ # Use reliable framing if version == 0-9.
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
def close(self, reason):
Modified: incubator/qpid/trunk/qpid/python/qpid/reference.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/reference.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/reference.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/reference.py Mon Mar 19 13:34:32 2007
@@ -111,7 +111,7 @@
self.get(id).close()
self.lock.acquire()
try:
- del map[id]
+ self.map.pop(id)
finally:
self.lock.release()
Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Mon Mar 19 13:34:32 2007
@@ -79,12 +79,11 @@
PRINT=["major", "minor", "file"]
- def __init__(self, major, minor, file, errata):
+ def __init__(self, major, minor, file):
Metadata.__init__(self)
self.major = major
self.minor = minor
self.file = file
- self.errata = errata
self.constants = SpecContainer()
self.classes = SpecContainer()
# methods indexed by classname_methname
@@ -275,7 +274,7 @@
def load(specfile, *errata):
doc = xmlutil.parse(specfile)
spec_root = doc["amqp"][0]
- spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata)
+ spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata):
# constants
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Mon Mar 19 13:34:32 2007
@@ -26,6 +26,7 @@
import Queue
from getopt import getopt, GetoptError
from qpid.content import Content
+from qpid.message import Message
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -56,15 +57,16 @@
run-tests [options] [test*]
The name of a test is package.module.ClassName.testMethod
Options:
- -?/-h/--help : this message
+ -?/-h/--help : this message
-s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
0-8 - use the default 0-8 specification.
0-9 - use the default 0-9 specification.
+ -e/--errata <errata.xml> : file containing amqp XML errata
-b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
"""
sys.exit(1)
@@ -103,24 +105,27 @@
for opt, value in opts:
if opt in ("-?", "-h", "--help"): self._die()
if opt in ("-s", "--spec"): self.specfile = value
+ if opt in ("-e", "--errata"): self.errata.append(value)
if opt in ("-b", "--broker"): self.setBroker(value)
if opt in ("-v", "--verbose"): self.verbose = 2
if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
if opt in ("-i", "--ignore"): self.ignore.append(value)
if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+ # Abbreviations for default settings.
if (self.specfile == "0-8"):
- self.specfile = "../specs/amqp.0-8.xml"
+ self.specfile = "../specs/amqp.0-8.xml"
if (self.specfile == "0-9"):
- self.specfile = "../specs/amqp.0-9.xml"
- self.errata = ["../specs/amqp-errata.0-9.xml"]
+ self.specfile = "../specs/amqp.0-9.xml"
+ self.errata.append("../specs/amqp-errata.0-9.xml")
+ if (self.specfile == None):
+ self._die("No XML specification provided")
print "Using specification from:", self.specfile
self.spec = qpid.spec.load(self.specfile, *self.errata)
if len(self.tests) == 0:
if self.use08spec():
- testdir="tests_0-8"
+ self.tests=findmodules("tests_0-8")
else:
- testdir="tests"
- self.tests=findmodules(testdir)
+ self.tests=findmodules("tests")
def testSuite(self):
class IgnoringTestSuite(unittest.TestSuite):
@@ -137,7 +142,11 @@
self._parseargs(args)
runner = unittest.TextTestRunner(descriptions=False,
verbosity=self.verbose)
- result = runner.run(self.testSuite())
+ try:
+ result = runner.run(self.testSuite())
+ except:
+ print "Unhandled error in test:", sys.exc_info()
+
if (self.ignore):
print "======================================="
print "NOTE: the following tests were ignored:"
@@ -181,10 +190,18 @@
self.channel.channel_open()
def tearDown(self):
- for ch, q in self.queues:
- ch.queue_delete(queue=q)
- for ch, ex in self.exchanges:
- ch.exchange_delete(exchange=ex)
+ try:
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:", sys.exc_info()
+
+ if not self.client.closed:
+ self.client.channel(0).connection_close(reply_code=200)
+ else:
+ self.client.close()
def connect(self, *args, **keys):
"""Create a new connction, return the Client object"""
@@ -261,13 +278,15 @@
"""
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
- def assertChannelException(self, expectedCode, message):
+ def assertChannelException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected channel_close method")
self.assertEqual("channel", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected connection_close method")
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
Modified: incubator/qpid/trunk/qpid/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/message.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/message.py Mon Mar 19 13:34:32 2007
@@ -171,8 +171,7 @@
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok()
- msg2.ok()
+ msg1.ok(batchoffset=1)#One and Two
msg4.ok()
channel.message_recover(requeue=False)
@@ -216,9 +215,8 @@
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok() #One
- msg2.ok() #Two
- msg4.ok() #Two
+ msg1.ok(batchoffset=1) #One and Two
+ msg4.ok() #Four
channel.message_cancel(destination="consumer_tag")
channel.message_consume(queue="test-requeue", destination="consumer_tag")
@@ -263,11 +261,9 @@
channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
#only 5 messages should have been delivered:
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -275,18 +271,13 @@
#ack messages and check that the next set arrive ok:
#todo: once batching is implmented, send a single response for all messages
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -313,12 +304,9 @@
channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
#only 5 messages should have been delivered (i.e. 45 bytes worth):
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- print "Got Message %d" % i
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
@@ -326,18 +314,13 @@
except Empty: None
#ack messages and check that the next set arrive ok:
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -383,14 +366,13 @@
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
-
- #todo: when batching is available, test ack multiple
- #if(i == 13):
- # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
- #if(i in [15, 17, 19]):
- # channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ if (i==13):
+ msg.ok(batchoffset=-2)#11, 12 & 13
+ if(i in [15, 17, 19]):
+ msg.ok()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -479,8 +461,9 @@
queue = other.queue("c1")
msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg, Reference))
- self.assertEquals(data, msg.get_complete())
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertTrue(msg.reference)
+ self.assertEquals(data, msg.reference.get_complete())
def test_reference_completion(self):
"""
@@ -511,12 +494,165 @@
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", msg1.reference.get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", msg2.reference.get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", msg2.reference.get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_transfer(routing_key = "q", body="blah, blah")
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ channel.message_cancel(destination = "consumer")
+ msg.reject()
+
+ channel.message_consume(queue = "q", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_checkpoint(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_open(reference="my-ref")
+ channel.message_append(reference="my-ref", bytes="abcdefgh")
+ channel.message_append(reference="my-ref", bytes="ijklmnop")
+ channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
+ channel.channel_close()
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ channel.message_consume(queue = "q", destination = "consumer")
+ offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
+ self.assertEquals(offset, 16)
+ channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
+ channel.synchronous = False
+ channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
+ self.assertEmpty(self.client.queue("consumer"))
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = msg.reference.get_complete()
else:
data = msg.body
self.assertEquals("abcdefghijkl", data)
Modified: incubator/qpid/trunk/qpid/python/tests/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/tx.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/tx.py Mon Mar 19 13:34:32 2007
@@ -163,7 +163,8 @@
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+
+ msg.ok(batchoffset=-3)
channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
queue_b = self.client.queue("sub_b")