You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/10/10 19:15:31 UTC
svn commit: r823890 - in /qpid/trunk/qpid/python/qpid: address.py driver.py
messaging.py ops.py tests/messaging.py
Author: rhs
Date: Sat Oct 10 17:15:31 2009
New Revision: 823890
URL: http://svn.apache.org/viewvc?rev=823890&view=rev
Log:
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
Modified:
qpid/trunk/qpid/python/qpid/address.py
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/ops.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/qpid/address.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/address.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/address.py (original)
+++ qpid/trunk/qpid/python/qpid/address.py Sat Oct 10 17:15:31 2009
@@ -35,7 +35,8 @@
RBRACE = Type("RBRACE", r"\}")
COLON = Type("COLON", r":")
COMMA = Type("COMMA", r",")
-ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*')
+SLASH = Type("SLASH", r"/")
+ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_.-]*')
NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
WSPACE = Type("WSPACE", r"[ \n\r\t]+")
@@ -53,12 +54,36 @@
joined = "|".join(["(%s)" % t.pattern for t in TYPES])
LEXER = re.compile(joined)
+class LexError(Exception):
+ pass
+
+def line_info(st, pos):
+ idx = 0
+ lineno = 1
+ column = 0
+ line_pos = 0
+ while idx < pos:
+ if st[idx] == "\n":
+ lineno += 1
+ column = 0
+ line_pos = idx
+ column += 1
+ idx += 1
+
+ end = st.find("\n", line_pos)
+ if end < 0:
+ end = len(st)
+ line = st[line_pos:end]
+
+ return line, lineno, column
+
def lex(st):
pos = 0
while pos < len(st):
m = LEXER.match(st, pos)
if m is None:
- raise ValueError(repr(st[pos:]))
+ line, ln, col = line_info(st, pos)
+ raise LexError("unrecognized character in <string>:%s,%s: %s" % (ln, col, line))
else:
idx = m.lastindex
t = Token(TYPES[idx - 1], m.group(idx))
@@ -68,8 +93,6 @@
class ParseError(Exception): pass
-class EOF(Exception): pass
-
class Parser:
def __init__(self, tokens):
@@ -97,11 +120,17 @@
def address(self):
name = self.eat(ID).value
- if self.matches(LBRACE):
+ subject = None
+ options = None
+ if self.matches(SLASH):
+ self.eat(SLASH)
+ if self.matches(ID):
+ subject = self.eat(ID).value
+ else:
+ subject = ""
+ elif self.matches(LBRACE):
options = self.map()
- else:
- options = None
- return name, options
+ return name, subject, options
def map(self):
self.eat(LBRACE)
@@ -129,6 +158,8 @@
def value(self):
if self.matches(NUMBER, STRING):
return eval(self.eat().value)
+ elif self.matches(ID):
+ return self.eat().value
elif self.matches(LBRACE):
return self.map()
else:
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Sat Oct 10 17:15:31 2009
@@ -17,7 +17,7 @@
# under the License.
#
-import compat, connection, socket, struct, sys, time
+import address, compat, connection, socket, struct, sys, time
from concurrency import synchronized
from datatypes import RangedSet, Serial
from exceptions import Timeout, VersionError
@@ -26,18 +26,14 @@
from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
from ops import *
from selector import Selector
-from session import Client, INCOMPLETE, SessionDetached
from threading import Condition, Thread
from util import connect
log = getLogger("qpid.messaging")
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
+def addr2reply_to(addr):
+ name, subject, options = address.parse(addr)
+ return ReplyTo(name, subject)
def reply_to2addr(reply_to):
if reply_to.routing_key is None:
@@ -100,9 +96,12 @@
def write_query(self, query, handler):
id = self.sent
+ query.sync = True
self.write_cmd(query, lambda: handler(self.results.pop(id)))
def write_cmd(self, cmd, completion=noop):
+ if self.detached:
+ raise Exception("detached")
cmd.id = self.sent
self.sent += 1
self.completions[cmd.id] = completion
@@ -187,6 +186,7 @@
if data:
log.debug("READ: %r", data)
else:
+ log.debug("ABORTED: %s", self._socket.getpeername())
error = ("connection aborted",)
recoverable = True
except socket.error, e:
@@ -285,7 +285,7 @@
def do_connection_close(self, close):
self.write_op(ConnectionCloseOk())
- if close.reply_ok != close_code.normal:
+ if close.reply_code != close_code.normal:
self.connection.error = (close.reply_code, close.reply_text)
# XXX: should we do a half shutdown on the socket here?
# XXX: we really need to test this, we may end up reporting a
@@ -343,6 +343,10 @@
sst = self.get_sst(er)
sst.results[er.command_id] = er.value
+ def do_execution_exception(self, ex):
+ sst = self.get_sst(ex)
+ sst.session.error = (ex,)
+
def dispatch(self):
try:
if self._socket is None and self.connection._connected and not self._opening:
@@ -381,7 +385,7 @@
def attach(self, ssn):
sst = self._attachments.get(ssn)
- if sst is None:
+ if sst is None and not ssn.closed:
for i in xrange(0, self.channel_max):
if not self._sessions.has_key(i):
ch = i
@@ -403,7 +407,7 @@
for rcv in ssn.receivers:
self.link_in(rcv)
- if ssn.closing and not sst.detached:
+ if sst is not None and ssn.closing and not sst.detached:
sst.detached = True
sst.write_op(SessionDetach(name=ssn.name))
@@ -416,24 +420,66 @@
del self._attachments[ssn]
ssn.closed = True
+ def do_session_detach(self, dtc):
+ sst = self.get_sst(dtc)
+ sst.write_op(SessionDetached(name=dtc.name))
+ self.do_session_detached(dtc)
+
def link_out(self, snd):
- sst = self._attachments[snd.session]
+ sst = self._attachments.get(snd.session)
_snd = self._attachments.get(snd)
- if not snd.closing and _snd is None:
+ if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
- _snd.linked = False
- node, _snd._subject = parse_addr(snd.target)
- def do_link_out(result):
- if result.not_found:
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
+
+ try:
+ _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
+ except address.LexError, e:
+ snd.error = e
+ snd.closed = True
+ return
+ except address.ParseError, e:
+ snd.error = e
+ snd.closed = True
+ return
+
+ # XXX: subject
+ if _snd.options is None:
+ _snd.options = {}
+
+ def do_link():
+ snd.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+
+ if result.queue:
_snd._exchange = ""
- _snd._routing_key = node
+ _snd._routing_key = _snd.name
+ do_link()
else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
- _snd.linked = True
- sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out)
+ snd.error = ("no such queue: %s" % _snd.name,)
+ del self._attachments[snd]
+ snd.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
+
+ if result.not_found:
+ if _snd.options.get("create") in ("always", "receiver"):
+ sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ else:
+ sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
+ return
+ else:
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ do_link()
+
+ sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
self._attachments[snd] = _snd
if snd.closing and not snd.closed:
@@ -441,41 +487,77 @@
snd.closed = True
def link_in(self, rcv):
- sst = self._attachments[rcv.session]
+ sst = self._attachments.get(rcv.session)
_rcv = self._attachments.get(rcv)
- if _rcv is None and not rcv.closing:
+ if _rcv is None and not rcv.closing and not rcv.closed:
_rcv = Attachment(rcv)
- _rcv.linked = False
_rcv.canceled = False
_rcv.draining = False
- def do_link_in(result):
+ try:
+ _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+ except address.LexError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+ except address.ParseError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+
+ # XXX: subject
+ if _rcv.options is None:
+ _rcv.options = {}
+
+ def do_link():
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ rcv.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+ if result.queue:
+ _rcv._queue = _rcv.name
+ do_link()
+ else:
+ rcv.error = ("no such queue: %s" % _rcv.name,)
+ del self._attachments[rcv]
+ rcv.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
if result.not_found:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+ if _rcv.options.get("create") in ("always", "receiver"):
+ _rcv._queue = _rcv.name
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+ else:
+ sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
+ return
else:
_rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
# XXX
- if rcv.filter is None:
+ if _rcv.options.get("filter") is None:
f = FILTER_DEFAULTS[result.type]
else:
f = rcv.filter
- f._bind(sst, rcv.source, _rcv._queue)
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
- _rcv.linked = True
- sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in)
+ f._bind(sst, _rcv.name, _rcv._queue)
+ do_link()
+ sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
self._attachments[rcv] = _rcv
if rcv.closing and not rcv.closed:
- if not _rcv.canceled:
- def close_rcv():
- del self._attachments[rcv]
- rcv.closed = True
- sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
- _rcv.canceled = True
+ if rcv.linked:
+ if not _rcv.canceled:
+ def close_rcv():
+ del self._attachments[rcv]
+ rcv.closed = True
+ sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ _rcv.canceled = True
+ else:
+ rcv.closed = True
def process(self, ssn):
if ssn.closing: return
@@ -485,8 +567,9 @@
while sst.outgoing_idx < len(ssn.outgoing):
msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
+ # XXX: should check for sender error here
_snd = self._attachments.get(snd)
- if _snd and _snd.linked:
+ if _snd and snd.linked:
self.send(snd, msg)
sst.outgoing_idx += 1
else:
@@ -559,7 +642,7 @@
def grant(self, rcv):
sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
- if _rcv is None or not _rcv.linked or _rcv.draining:
+ if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
return
if rcv.granted is UNLIMITED:
@@ -606,7 +689,7 @@
rk = _snd._routing_key
# XXX: do we need to query to figure out how to create the reply-to interoperably?
if msg.reply_to:
- rt = ReplyTo(*parse_addr(msg.reply_to))
+ rt = addr2reply_to(msg.reply_to)
else:
rt = None
dp = DeliveryProperties(routing_key=rk)
@@ -650,7 +733,6 @@
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
self.connection._waiter.notifyAll()
- return INCOMPLETE
def _decode(self, xfr):
dp = EMPTY_DP
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sat Oct 10 17:15:31 2009
@@ -237,9 +237,10 @@
self.value = value
# XXX: this should become part of the driver
- def _bind(self, ssn, exchange, queue):
- ssn.exchange_bind(exchange=exchange, queue=queue,
- binding_key=self.value.replace("*", "#"))
+ def _bind(self, sst, exchange, queue):
+ from qpid.ops import ExchangeBind
+ sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#")))
class SessionError(Exception):
pass
@@ -289,6 +290,7 @@
# XXX: I hate this name.
self.ack_capacity = UNLIMITED
+ self.error = None
self.closing = False
self.closed = False
@@ -309,9 +311,13 @@
def _check_error(self, exc=SessionError):
self.connection._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection._ewait(predicate, timeout, exc)
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def sender(self, target, **options):
@@ -520,6 +526,8 @@
self.capacity = options.get("capacity", UNLIMITED)
self.queued = Serial(0)
self.acked = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -529,9 +537,13 @@
def _check_error(self, exc=SendError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SendError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -567,6 +579,8 @@
if not self.session.connection._connected or self.session.closing:
raise Disconnected()
+ self._ewait(lambda: self.linked)
+
if isinstance(object, Message):
message = object
else:
@@ -637,6 +651,8 @@
self.received = Serial(0)
self.returned = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self.listener = None
@@ -647,9 +663,13 @@
def _check_error(self, exc=ReceiveError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -693,6 +713,9 @@
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
+
+ self._ewait(lambda: self.linked)
+
if self._capacity() == 0:
self.granted = self.returned + 1
self._wakeup()
@@ -751,7 +774,7 @@
self.closing = True
self._wakeup()
try:
- self._ewait(lambda: self.closed)
+ self.session._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
Modified: qpid/trunk/qpid/python/qpid/ops.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/ops.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/ops.py (original)
+++ qpid/trunk/qpid/python/qpid/ops.py Sat Oct 10 17:15:31 2009
@@ -80,7 +80,7 @@
return "%s(%s)" % (self.__class__.__name__,
", ".join(["%s=%r" % (f.name, getattr(self, f.name))
for f in self.ARGS
- if getattr(self, f.name) is not f.default]))
+ if getattr(self, f.name) != f.default]))
class Command(Compound):
UNENCODED=[Field("channel", "uint16", 0),
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sat Oct 10 17:15:31 2009
@@ -24,7 +24,7 @@
from qpid.tests import Test
from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, UNLIMITED, uuid4
+ InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -63,11 +63,12 @@
return "%s[%s, %s]" % (base, count, self.test_id)
def ping(self, ssn):
+ PING_Q = 'ping-queue {create: always}'
# send a message
- sender = ssn.sender("ping-queue")
+ sender = ssn.sender(PING_Q)
content = self.content("ping")
sender.send(content)
- receiver = ssn.receiver("ping-queue")
+ receiver = ssn.receiver(PING_Q)
msg = receiver.fetch(0)
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
@@ -174,6 +175,8 @@
self.conn.close()
assert not self.conn.connected()
+ACK_Q = 'test-ack-queue {create: always}'
+
class SessionTests(Base):
def setup_connection(self):
@@ -183,7 +186,7 @@
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender("test-snd-queue")
+ snd = self.ssn.sender('test-snd-queue {create: always}')
snd2 = self.ssn.sender(snd.target)
assert snd is not snd2
snd2.close()
@@ -196,7 +199,7 @@
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver("test-rcv-queue")
+ rcv = self.ssn.receiver('test-rcv-queue {create: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
@@ -209,34 +212,36 @@
self.ssn.acknowledge(msg)
def testStart(self):
- rcv = self.ssn.receiver("test-start-queue")
+ START_Q = 'test-start-queue {create: always}'
+ rcv = self.ssn.receiver(START_Q)
assert not rcv.started
self.ssn.start()
assert rcv.started
- rcv = self.ssn.receiver("test-start-queue")
+ rcv = self.ssn.receiver(START_Q)
assert rcv.started
def testStop(self):
+ STOP_Q = 'test-stop-queue {create: always}'
self.ssn.start()
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert rcv.started
self.ssn.stop()
assert not rcv.started
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert not rcv.started
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender("test-ack-queue")
+ snd = self.ssn.sender(ACK_Q)
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
- rcv = self.ssn.receiver(snd.target)
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
self.ssn.close()
@@ -245,7 +250,7 @@
self.ssn = self.conn.session()
if ack_capacity is not None:
self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
acker(self.ssn)
self.ssn.close()
@@ -253,7 +258,7 @@
# drain the queue a final time and verify that the messages were
# dequeued
self.ssn = self.conn.session()
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.assertEmpty(rcv)
def testAcknowledge(self):
@@ -271,7 +276,7 @@
pass
finally:
self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver("test-ack-queue"))
+ self.drain(self.ssn.receiver(ACK_Q))
self.ssn.acknowledge()
def testAcknowledgeAsyncAckCap1(self):
@@ -294,10 +299,12 @@
return contents
def txTest(self, commit):
+ TX_Q = 'test-tx-queue {create: always}'
+ TX_Q_COPY = 'test-tx-queue-copy {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, "test-tx-queue", "txTest", 3)
- txrcv = txssn.receiver("test-tx-queue")
- txsnd = txssn.sender("test-tx-queue-copy")
+ contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY)
rcv = self.ssn.receiver(txrcv.source)
copy_rcv = self.ssn.receiver(txsnd.target)
self.assertEmpty(copy_rcv)
@@ -323,9 +330,10 @@
self.txTest(False)
def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3)
- rcv = self.ssn.receiver("test-tx-send-queue")
+ contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
@@ -345,10 +353,11 @@
self.txTestSend(False)
def txTestAck(self, commit):
+ TX_ACK_Q = 'test-tx-ack-queue {create: always}'
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3)
+ contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
assert contents == self.drain(txrcv)
if commit:
@@ -366,11 +375,11 @@
txssn.close()
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
assert contents == self.drain(txrcv)
txssn.acknowledge()
txssn.commit()
- rcv = self.ssn.receiver("test-tx-ack-queue")
+ rcv = self.ssn.receiver(TX_ACK_Q)
self.assertEmpty(rcv)
txssn.close()
self.assertEmpty(rcv)
@@ -389,6 +398,8 @@
except Disconnected:
pass
+RECEIVER_Q = 'test-receiver-queue {create: always}'
+
class ReceiverTests(Base):
def setup_connection(self):
@@ -398,10 +409,10 @@
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-receiver-queue")
+ return self.ssn.sender(RECEIVER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-receiver-queue")
+ return self.ssn.receiver(RECEIVER_Q)
def send(self, base, count = None):
content = self.content(base, count)
@@ -538,6 +549,66 @@
# XXX: need testClose
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "{bad address}"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def sendErrorTest(self, addr, exc, check=lambda e: True):
+ snd = self.ssn.sender(addr)
+ try:
+ snd.send("hello")
+ assert False, "send succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ snd.close()
+
+ def fetchErrorTest(self, addr, exc, check=lambda e: True):
+ rcv = self.ssn.receiver(addr)
+ try:
+ rcv.fetch(timeout=0)
+ assert False, "fetch succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ rcv.close()
+
+ def testNoTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnparseableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnlexableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNLEXABLE_ADDR, SendError,
+ lambda e: "unrecognized character" in str(e))
+
+ def testUnlexableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ lambda e: "unrecognized character" in str(e))
+
+SENDER_Q = 'test-sender-q {create: always}'
+
class SenderTests(Base):
def setup_connection(self):
@@ -547,10 +618,10 @@
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-sender-queue")
+ return self.ssn.sender(SENDER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-sender-queue")
+ return self.ssn.receiver(SENDER_Q)
def checkContent(self, content):
self.snd.send(content)
@@ -644,6 +715,8 @@
m.content = u"<html/>"
assert m.content_type == "text/html; charset=utf8"
+ECHO_Q = 'test-message-echo-queue {create: always}'
+
class MessageEchoTests(Base):
def setup_connection(self):
@@ -653,10 +726,10 @@
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-message-echo-queue")
+ return self.ssn.sender(ECHO_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-message-echo-queue")
+ return self.ssn.receiver(ECHO_Q)
def check(self, msg):
self.snd.send(msg)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org