You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/10/10 19:15:31 UTC

svn commit: r823890 - in /qpid/trunk/qpid/python/qpid: address.py driver.py messaging.py ops.py tests/messaging.py

Author: rhs
Date: Sat Oct 10 17:15:31 2009
New Revision: 823890

URL: http://svn.apache.org/viewvc?rev=823890&view=rev
Log:
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach

Modified:
    qpid/trunk/qpid/python/qpid/address.py
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/ops.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/address.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/address.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/address.py (original)
+++ qpid/trunk/qpid/python/qpid/address.py Sat Oct 10 17:15:31 2009
@@ -35,7 +35,8 @@
 RBRACE = Type("RBRACE", r"\}")
 COLON = Type("COLON", r":")
 COMMA = Type("COMMA", r",")
-ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*')
+SLASH = Type("SLASH", r"/")
+ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_.-]*')
 NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
 STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
 WSPACE = Type("WSPACE", r"[ \n\r\t]+")
@@ -53,12 +54,36 @@
 joined = "|".join(["(%s)" % t.pattern for t in TYPES])
 LEXER = re.compile(joined)
 
+class LexError(Exception):
+  pass
+
+def line_info(st, pos):
+  idx = 0
+  lineno = 1
+  column = 0
+  line_pos = 0
+  while idx < pos:
+    if st[idx] == "\n":
+      lineno += 1
+      column = 0
+      line_pos = idx
+    column += 1
+    idx += 1
+
+  end = st.find("\n", line_pos)
+  if end < 0:
+    end = len(st)
+  line = st[line_pos:end]
+
+  return line, lineno, column
+
 def lex(st):
   pos = 0
   while pos < len(st):
     m = LEXER.match(st, pos)
     if m is None:
-      raise ValueError(repr(st[pos:]))
+      line, ln, col = line_info(st, pos)
+      raise LexError("unrecognized character in <string>:%s,%s: %s" % (ln, col, line))
     else:
       idx = m.lastindex
       t = Token(TYPES[idx - 1], m.group(idx))
@@ -68,8 +93,6 @@
 
 class ParseError(Exception): pass
 
-class EOF(Exception): pass
-
 class Parser:
 
   def __init__(self, tokens):
@@ -97,11 +120,17 @@
 
   def address(self):
     name = self.eat(ID).value
-    if self.matches(LBRACE):
+    subject = None
+    options = None
+    if self.matches(SLASH):
+      self.eat(SLASH)
+      if self.matches(ID):
+        subject = self.eat(ID).value
+      else:
+        subject = ""
+    elif self.matches(LBRACE):
       options = self.map()
-    else:
-      options = None
-    return name, options
+    return name, subject, options
 
   def map(self):
     self.eat(LBRACE)
@@ -129,6 +158,8 @@
   def value(self):
     if self.matches(NUMBER, STRING):
       return eval(self.eat().value)
+    elif self.matches(ID):
+      return self.eat().value
     elif self.matches(LBRACE):
       return self.map()
     else:

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Sat Oct 10 17:15:31 2009
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import compat, connection, socket, struct, sys, time
+import address, compat, connection, socket, struct, sys, time
 from concurrency import synchronized
 from datatypes import RangedSet, Serial
 from exceptions import Timeout, VersionError
@@ -26,18 +26,14 @@
 from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
 from ops import *
 from selector import Selector
-from session import Client, INCOMPLETE, SessionDetached
 from threading import Condition, Thread
 from util import connect
 
 log = getLogger("qpid.messaging")
 
-def parse_addr(address):
-  parts = address.split("/", 1)
-  if len(parts) == 1:
-    return parts[0], None
-  else:
-    return parts[0], parts[i1]
+def addr2reply_to(addr):
+  name, subject, options = address.parse(addr)
+  return ReplyTo(name, subject)
 
 def reply_to2addr(reply_to):
   if reply_to.routing_key is None:
@@ -100,9 +96,12 @@
 
   def write_query(self, query, handler):
     id = self.sent
+    query.sync = True
     self.write_cmd(query, lambda: handler(self.results.pop(id)))
 
   def write_cmd(self, cmd, completion=noop):
+    if self.detached:
+      raise Exception("detached")
     cmd.id = self.sent
     self.sent += 1
     self.completions[cmd.id] = completion
@@ -187,6 +186,7 @@
       if data:
         log.debug("READ: %r", data)
       else:
+        log.debug("ABORTED: %s", self._socket.getpeername())
         error = ("connection aborted",)
         recoverable = True
     except socket.error, e:
@@ -285,7 +285,7 @@
 
   def do_connection_close(self, close):
     self.write_op(ConnectionCloseOk())
-    if close.reply_ok != close_code.normal:
+    if close.reply_code != close_code.normal:
       self.connection.error = (close.reply_code, close.reply_text)
     # XXX: should we do a half shutdown on the socket here?
     # XXX: we really need to test this, we may end up reporting a
@@ -343,6 +343,10 @@
     sst = self.get_sst(er)
     sst.results[er.command_id] = er.value
 
+  def do_execution_exception(self, ex):
+    sst = self.get_sst(ex)
+    sst.session.error = (ex,)
+
   def dispatch(self):
     try:
       if self._socket is None and self.connection._connected and not self._opening:
@@ -381,7 +385,7 @@
 
   def attach(self, ssn):
     sst = self._attachments.get(ssn)
-    if sst is None:
+    if sst is None and not ssn.closed:
       for i in xrange(0, self.channel_max):
         if not self._sessions.has_key(i):
           ch = i
@@ -403,7 +407,7 @@
     for rcv in ssn.receivers:
       self.link_in(rcv)
 
-    if ssn.closing and not sst.detached:
+    if sst is not None and ssn.closing and not sst.detached:
       sst.detached = True
       sst.write_op(SessionDetach(name=ssn.name))
 
@@ -416,24 +420,66 @@
     del self._attachments[ssn]
     ssn.closed = True
 
+  def do_session_detach(self, dtc):
+    sst = self.get_sst(dtc)
+    sst.write_op(SessionDetached(name=dtc.name))
+    self.do_session_detached(dtc)
+
   def link_out(self, snd):
-    sst = self._attachments[snd.session]
+    sst = self._attachments.get(snd.session)
     _snd = self._attachments.get(snd)
-    if not snd.closing and _snd is None:
+    if _snd is None and not snd.closing and not snd.closed:
       _snd = Attachment(snd)
-      _snd.linked = False
-      node, _snd._subject = parse_addr(snd.target)
-      def do_link_out(result):
-        if result.not_found:
-          # XXX: should check 'create' option
-          sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
+
+      try:
+        _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
+      except address.LexError, e:
+        snd.error = e
+        snd.closed = True
+        return
+      except address.ParseError, e:
+        snd.error = e
+        snd.closed = True
+        return
+
+      # XXX: subject
+      if _snd.options is None:
+        _snd.options = {}
+
+      def do_link():
+        snd.linked = True
+
+      def do_queue_q(result):
+        if sst.detached:
+          return
+
+        if result.queue:
           _snd._exchange = ""
-          _snd._routing_key = node
+          _snd._routing_key = _snd.name
+          do_link()
         else:
-          _snd._exchange = node
-          _snd._routing_key = _snd._subject
-        _snd.linked = True
-      sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out)
+          snd.error = ("no such queue: %s" % _snd.name,)
+          del self._attachments[snd]
+          snd.closed = True
+
+      def do_exchange_q(result):
+        if sst.detached:
+          return
+
+        if result.not_found:
+          if _snd.options.get("create") in ("always", "receiver"):
+            sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
+            _snd._exchange = ""
+            _snd._routing_key = _snd.name
+          else:
+            sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
+            return
+        else:
+          _snd._exchange = _snd.name
+          _snd._routing_key = _snd.subject
+        do_link()
+
+      sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
       self._attachments[snd] = _snd
 
     if snd.closing and not snd.closed:
@@ -441,41 +487,77 @@
       snd.closed = True
 
   def link_in(self, rcv):
-    sst = self._attachments[rcv.session]
+    sst = self._attachments.get(rcv.session)
     _rcv = self._attachments.get(rcv)
-    if _rcv is None and not rcv.closing:
+    if _rcv is None and not rcv.closing and not rcv.closed:
       _rcv = Attachment(rcv)
-      _rcv.linked = False
       _rcv.canceled = False
       _rcv.draining = False
 
-      def do_link_in(result):
+      try:
+        _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+      except address.LexError, e:
+        rcv.error = e
+        rcv.closed = True
+        return
+      except address.ParseError, e:
+        rcv.error = e
+        rcv.closed = True
+        return
+
+      # XXX: subject
+      if _rcv.options is None:
+        _rcv.options = {}
+
+      def do_link():
+        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+        rcv.linked = True
+
+      def do_queue_q(result):
+        if sst.detached:
+          return
+        if result.queue:
+          _rcv._queue = _rcv.name
+          do_link()
+        else:
+          rcv.error = ("no such queue: %s" % _rcv.name,)
+          del self._attachments[rcv]
+          rcv.closed = True
+
+      def do_exchange_q(result):
+        if sst.detached:
+          return
         if result.not_found:
-          _rcv._queue = rcv.source
-          # XXX: should check 'create' option
-          sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+          if _rcv.options.get("create") in ("always", "receiver"):
+            _rcv._queue = _rcv.name
+            sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+          else:
+            sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
+            return
         else:
           _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
           sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
           # XXX
-          if rcv.filter is None:
+          if _rcv.options.get("filter") is None:
             f = FILTER_DEFAULTS[result.type]
           else:
             f = rcv.filter
-          f._bind(sst, rcv.source, _rcv._queue)
-        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
-        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
-        _rcv.linked = True
-      sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in)
+          f._bind(sst, _rcv.name, _rcv._queue)
+        do_link()
+      sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
       self._attachments[rcv] = _rcv
 
     if rcv.closing and not rcv.closed:
-      if not _rcv.canceled:
-        def close_rcv():
-          del self._attachments[rcv]
-          rcv.closed = True
-        sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
-        _rcv.canceled = True
+      if rcv.linked:
+        if not _rcv.canceled:
+          def close_rcv():
+            del self._attachments[rcv]
+            rcv.closed = True
+          sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+          _rcv.canceled = True
+      else:
+        rcv.closed = True
 
   def process(self, ssn):
     if ssn.closing: return
@@ -485,8 +567,9 @@
     while sst.outgoing_idx < len(ssn.outgoing):
       msg = ssn.outgoing[sst.outgoing_idx]
       snd = msg._sender
+      # XXX: should check for sender error here
       _snd = self._attachments.get(snd)
-      if _snd and _snd.linked:
+      if _snd and snd.linked:
         self.send(snd, msg)
         sst.outgoing_idx += 1
       else:
@@ -559,7 +642,7 @@
   def grant(self, rcv):
     sst = self._attachments[rcv.session]
     _rcv = self._attachments.get(rcv)
-    if _rcv is None or not _rcv.linked or _rcv.draining:
+    if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
       return
 
     if rcv.granted is UNLIMITED:
@@ -606,7 +689,7 @@
       rk = _snd._routing_key
     # XXX: do we need to query to figure out how to create the reply-to interoperably?
     if msg.reply_to:
-      rt = ReplyTo(*parse_addr(msg.reply_to))
+      rt = addr2reply_to(msg.reply_to)
     else:
       rt = None
     dp = DeliveryProperties(routing_key=rk)
@@ -650,7 +733,6 @@
     log.debug("RECV [%s] %s", ssn, msg)
     ssn.incoming.append(msg)
     self.connection._waiter.notifyAll()
-    return INCOMPLETE
 
   def _decode(self, xfr):
     dp = EMPTY_DP

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sat Oct 10 17:15:31 2009
@@ -237,9 +237,10 @@
     self.value = value
 
   # XXX: this should become part of the driver
-  def _bind(self, ssn, exchange, queue):
-    ssn.exchange_bind(exchange=exchange, queue=queue,
-                      binding_key=self.value.replace("*", "#"))
+  def _bind(self, sst, exchange, queue):
+    from qpid.ops import ExchangeBind
+    sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+                               binding_key=self.value.replace("*", "#")))
 
 class SessionError(Exception):
   pass
@@ -289,6 +290,7 @@
     # XXX: I hate this name.
     self.ack_capacity = UNLIMITED
 
+    self.error = None
     self.closing = False
     self.closed = False
 
@@ -309,9 +311,13 @@
 
   def _check_error(self, exc=SessionError):
     self.connection._check_error(exc)
+    if self.error:
+      raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=SessionError):
-    return self.connection._ewait(predicate, timeout, exc)
+    result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+    self._check_error(exc)
+    return result
 
   @synchronized
   def sender(self, target, **options):
@@ -520,6 +526,8 @@
     self.capacity = options.get("capacity", UNLIMITED)
     self.queued = Serial(0)
     self.acked = Serial(0)
+    self.error = None
+    self.linked = False
     self.closing = False
     self.closed = False
     self._lock = self.session._lock
@@ -529,9 +537,13 @@
 
   def _check_error(self, exc=SendError):
     self.session._check_error(exc)
+    if self.error:
+      raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=SendError):
-    return self.session._ewait(predicate, timeout, exc)
+    result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+    self._check_error(exc)
+    return result
 
   @synchronized
   def pending(self):
@@ -567,6 +579,8 @@
     if not self.session.connection._connected or self.session.closing:
       raise Disconnected()
 
+    self._ewait(lambda: self.linked)
+
     if isinstance(object, Message):
       message = object
     else:
@@ -637,6 +651,8 @@
     self.received = Serial(0)
     self.returned = Serial(0)
 
+    self.error = None
+    self.linked = False
     self.closing = False
     self.closed = False
     self.listener = None
@@ -647,9 +663,13 @@
 
   def _check_error(self, exc=ReceiveError):
     self.session._check_error(exc)
+    if self.error:
+      raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=ReceiveError):
-    return self.session._ewait(predicate, timeout, exc)
+    result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+    self._check_error(exc)
+    return result
 
   @synchronized
   def pending(self):
@@ -693,6 +713,9 @@
     @type timeout: float
     @param timeout: the time to wait for a message to be available
     """
+
+    self._ewait(lambda: self.linked)
+
     if self._capacity() == 0:
       self.granted = self.returned + 1
       self._wakeup()
@@ -751,7 +774,7 @@
     self.closing = True
     self._wakeup()
     try:
-      self._ewait(lambda: self.closed)
+      self.session._ewait(lambda: self.closed)
     finally:
       self.session.receivers.remove(self)
 

Modified: qpid/trunk/qpid/python/qpid/ops.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/ops.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/ops.py (original)
+++ qpid/trunk/qpid/python/qpid/ops.py Sat Oct 10 17:15:31 2009
@@ -80,7 +80,7 @@
     return "%s(%s)" % (self.__class__.__name__,
                        ", ".join(["%s=%r" % (f.name, getattr(self, f.name))
                                   for f in self.ARGS
-                                  if getattr(self, f.name) is not f.default]))
+                                  if getattr(self, f.name) != f.default]))
 
 class Command(Compound):
   UNENCODED=[Field("channel", "uint16", 0),

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=823890&r1=823889&r2=823890&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sat Oct 10 17:15:31 2009
@@ -24,7 +24,7 @@
 from qpid.tests import Test
 from qpid.harness import Skipped
 from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
-    InsufficientCapacity, Message, UNLIMITED, uuid4
+    InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
 from Queue import Queue, Empty as QueueEmpty
 
 class Base(Test):
@@ -63,11 +63,12 @@
       return "%s[%s, %s]" % (base, count, self.test_id)
 
   def ping(self, ssn):
+    PING_Q = 'ping-queue {create: always}'
     # send a message
-    sender = ssn.sender("ping-queue")
+    sender = ssn.sender(PING_Q)
     content = self.content("ping")
     sender.send(content)
-    receiver = ssn.receiver("ping-queue")
+    receiver = ssn.receiver(PING_Q)
     msg = receiver.fetch(0)
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
@@ -174,6 +175,8 @@
     self.conn.close()
     assert not self.conn.connected()
 
+ACK_Q = 'test-ack-queue {create: always}'
+
 class SessionTests(Base):
 
   def setup_connection(self):
@@ -183,7 +186,7 @@
     return self.conn.session()
 
   def testSender(self):
-    snd = self.ssn.sender("test-snd-queue")
+    snd = self.ssn.sender('test-snd-queue {create: always}')
     snd2 = self.ssn.sender(snd.target)
     assert snd is not snd2
     snd2.close()
@@ -196,7 +199,7 @@
     self.ssn.acknowledge(msg)
 
   def testReceiver(self):
-    rcv = self.ssn.receiver("test-rcv-queue")
+    rcv = self.ssn.receiver('test-rcv-queue {create: always}')
     rcv2 = self.ssn.receiver(rcv.source)
     assert rcv is not rcv2
     rcv2.close()
@@ -209,34 +212,36 @@
     self.ssn.acknowledge(msg)
 
   def testStart(self):
-    rcv = self.ssn.receiver("test-start-queue")
+    START_Q = 'test-start-queue {create: always}'
+    rcv = self.ssn.receiver(START_Q)
     assert not rcv.started
     self.ssn.start()
     assert rcv.started
-    rcv = self.ssn.receiver("test-start-queue")
+    rcv = self.ssn.receiver(START_Q)
     assert rcv.started
 
   def testStop(self):
+    STOP_Q = 'test-stop-queue {create: always}'
     self.ssn.start()
-    rcv = self.ssn.receiver("test-stop-queue")
+    rcv = self.ssn.receiver(STOP_Q)
     assert rcv.started
     self.ssn.stop()
     assert not rcv.started
-    rcv = self.ssn.receiver("test-stop-queue")
+    rcv = self.ssn.receiver(STOP_Q)
     assert not rcv.started
 
   # XXX, we need a convenient way to assert that required queues are
   # empty on setup, and possibly also to drain queues on teardown
   def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
-    snd = self.ssn.sender("test-ack-queue")
+    snd = self.ssn.sender(ACK_Q)
     contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
 
     # drain the queue, verify the messages are there and then close
     # without acking
-    rcv = self.ssn.receiver(snd.target)
+    rcv = self.ssn.receiver(ACK_Q)
     self.drain(rcv, expected=contents)
     self.ssn.close()
 
@@ -245,7 +250,7 @@
     self.ssn = self.conn.session()
     if ack_capacity is not None:
       self.ssn.ack_capacity = ack_capacity
-    rcv = self.ssn.receiver("test-ack-queue")
+    rcv = self.ssn.receiver(ACK_Q)
     self.drain(rcv, expected=contents)
     acker(self.ssn)
     self.ssn.close()
@@ -253,7 +258,7 @@
     # drain the queue a final time and verify that the messages were
     # dequeued
     self.ssn = self.conn.session()
-    rcv = self.ssn.receiver("test-ack-queue")
+    rcv = self.ssn.receiver(ACK_Q)
     self.assertEmpty(rcv)
 
   def testAcknowledge(self):
@@ -271,7 +276,7 @@
         pass
     finally:
       self.ssn.ack_capacity = UNLIMITED
-      self.drain(self.ssn.receiver("test-ack-queue"))
+      self.drain(self.ssn.receiver(ACK_Q))
       self.ssn.acknowledge()
 
   def testAcknowledgeAsyncAckCap1(self):
@@ -294,10 +299,12 @@
     return contents
 
   def txTest(self, commit):
+    TX_Q = 'test-tx-queue {create: always}'
+    TX_Q_COPY = 'test-tx-queue-copy {create: always}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(self.ssn, "test-tx-queue", "txTest", 3)
-    txrcv = txssn.receiver("test-tx-queue")
-    txsnd = txssn.sender("test-tx-queue-copy")
+    contents = self.send(self.ssn, TX_Q, "txTest", 3)
+    txrcv = txssn.receiver(TX_Q)
+    txsnd = txssn.sender(TX_Q_COPY)
     rcv = self.ssn.receiver(txrcv.source)
     copy_rcv = self.ssn.receiver(txsnd.target)
     self.assertEmpty(copy_rcv)
@@ -323,9 +330,10 @@
     self.txTest(False)
 
   def txTestSend(self, commit):
+    TX_SEND_Q = 'test-tx-send-queue {create: always}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3)
-    rcv = self.ssn.receiver("test-tx-send-queue")
+    contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+    rcv = self.ssn.receiver(TX_SEND_Q)
     self.assertEmpty(rcv)
 
     if commit:
@@ -345,10 +353,11 @@
     self.txTestSend(False)
 
   def txTestAck(self, commit):
+    TX_ACK_Q = 'test-tx-ack-queue {create: always}'
     txssn = self.conn.session(transactional=True)
-    txrcv = txssn.receiver("test-tx-ack-queue")
+    txrcv = txssn.receiver(TX_ACK_Q)
     self.assertEmpty(txrcv)
-    contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3)
+    contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
     assert contents == self.drain(txrcv)
 
     if commit:
@@ -366,11 +375,11 @@
     txssn.close()
 
     txssn = self.conn.session(transactional=True)
-    txrcv = txssn.receiver("test-tx-ack-queue")
+    txrcv = txssn.receiver(TX_ACK_Q)
     assert contents == self.drain(txrcv)
     txssn.acknowledge()
     txssn.commit()
-    rcv = self.ssn.receiver("test-tx-ack-queue")
+    rcv = self.ssn.receiver(TX_ACK_Q)
     self.assertEmpty(rcv)
     txssn.close()
     self.assertEmpty(rcv)
@@ -389,6 +398,8 @@
     except Disconnected:
       pass
 
+RECEIVER_Q = 'test-receiver-queue {create: always}'
+
 class ReceiverTests(Base):
 
   def setup_connection(self):
@@ -398,10 +409,10 @@
     return self.conn.session()
 
   def setup_sender(self):
-    return self.ssn.sender("test-receiver-queue")
+    return self.ssn.sender(RECEIVER_Q)
 
   def setup_receiver(self):
-    return self.ssn.receiver("test-receiver-queue")
+    return self.ssn.receiver(RECEIVER_Q)
 
   def send(self, base, count = None):
     content = self.content(base, count)
@@ -538,6 +549,66 @@
 
   # XXX: need testClose
 
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "{bad address}"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port)
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def sendErrorTest(self, addr, exc, check=lambda e: True):
+    snd = self.ssn.sender(addr)
+    try:
+      snd.send("hello")
+      assert False, "send succeeded"
+    except exc, e:
+      assert check(e), "unexpected error: %s" % e
+      snd.close()
+
+  def fetchErrorTest(self, addr, exc, check=lambda e: True):
+    rcv = self.ssn.receiver(addr)
+    try:
+      rcv.fetch(timeout=0)
+      assert False, "fetch succeeded"
+    except exc, e:
+      assert check(e), "unexpected error: %s" % e
+      rcv.close()
+
+  def testNoTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+  def testNoSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+  def testUnparseableTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
+                       lambda e: "expecting ID" in str(e))
+
+  def testUnparseableSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+                        lambda e: "expecting ID" in str(e))
+
+  def testUnlexableTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(UNLEXABLE_ADDR, SendError,
+                       lambda e: "unrecognized character" in str(e))
+
+  def testUnlexableSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
+                        lambda e: "unrecognized character" in str(e))
+
+SENDER_Q = 'test-sender-q {create: always}'
+
 class SenderTests(Base):
 
   def setup_connection(self):
@@ -547,10 +618,10 @@
     return self.conn.session()
 
   def setup_sender(self):
-    return self.ssn.sender("test-sender-queue")
+    return self.ssn.sender(SENDER_Q)
 
   def setup_receiver(self):
-    return self.ssn.receiver("test-sender-queue")
+    return self.ssn.receiver(SENDER_Q)
 
   def checkContent(self, content):
     self.snd.send(content)
@@ -644,6 +715,8 @@
     m.content = u"<html/>"
     assert m.content_type == "text/html; charset=utf8"
 
+ECHO_Q = 'test-message-echo-queue {create: always}'
+
 class MessageEchoTests(Base):
 
   def setup_connection(self):
@@ -653,10 +726,10 @@
     return self.conn.session()
 
   def setup_sender(self):
-    return self.ssn.sender("test-message-echo-queue")
+    return self.ssn.sender(ECHO_Q)
 
   def setup_receiver(self):
-    return self.ssn.receiver("test-message-echo-queue")
+    return self.ssn.receiver(ECHO_Q)
 
   def check(self, msg):
     self.snd.send(msg)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org