You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/23 15:57:29 UTC

svn commit: r926604 - in /qpid/trunk/qpid/python/qpid: brokertest.py messaging/driver.py messaging/endpoints.py tests/messaging/__init__.py tests/messaging/endpoints.py validator.py

Author: rhs
Date: Tue Mar 23 14:57:28 2010
New Revision: 926604

URL: http://svn.apache.org/viewvc?rev=926604&view=rev
Log:
Several updates to address options including:
  - renamed node-properties to node
  - added link to permit durable links (with names)
  - split x-properties into x-declare, x-subscribe, and x-bindings
  - removed automatic passthrough of unrecognized options (as this was confusing)

Modified:
    qpid/trunk/qpid/python/qpid/brokertest.py
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/validator.py

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Tue Mar 23 14:57:28 2010
@@ -303,8 +303,8 @@ class Broker(Popen):
         c.close()
     
     def _prep_sender(self, queue, durable, xprops):
-        s = queue + "; {create:always, node-properties:{durable:" + str(durable)
-        if xprops != None: s += ", x-properties:{" + xprops + "}"
+        s = queue + "; {create:always, node:{durable:" + str(durable)
+        if xprops != None: s += ", x-declare:{" + xprops + "}"
         return s + "}}"
 
     def send_message(self, queue, message, durable=True, xprops=None, session=None):

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Mar 23 14:57:28 2010
@@ -33,7 +33,7 @@ from qpid.messaging.message import get_c
 from qpid.ops import *
 from qpid.selector import Selector
 from qpid.util import connect
-from qpid.validator import And, Context, Map, Types, Values
+from qpid.validator import And, Context, List, Map, Types, Values
 from threading import Condition, Thread
 
 log = getLogger("qpid.messaging")
@@ -78,9 +78,8 @@ class Pattern:
     sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
                                binding_key=self.value.replace("*", "#")))
 
-FILTER_DEFAULTS = {
-  "topic": Pattern("*"),
-  "amq.failover": Pattern("DUMMY")
+SUBJECT_DEFAULTS = {
+  "topic": "#"
   }
 
 # XXX
@@ -130,7 +129,14 @@ class SessionState:
     id = self.sent
     self.write_cmd(query, lambda: handler(self.results.pop(id)))
 
-  def write_cmd(self, cmd, action=noop):
+  def apply_overrides(self, cmd, overrides):
+    for k, v in overrides.items():
+      cmd[k.replace('-', '_')] = v
+
+  def write_cmd(self, cmd, action=noop, overrides=None):
+    if overrides:
+      self.apply_overrides(cmd, overrides)
+
     if action != noop:
       cmd.sync = True
     if self.detached:
@@ -154,28 +160,36 @@ class SessionState:
     self.driver.write_op(op)
 
 POLICIES = Values("always", "sender", "receiver", "never")
+RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
+                     "exactly-once")
 
-class Bindings:
-
-  def validate(self, o, ctx):
-    t = ctx.containers[1].get("type", "queue")
-    if t != "queue":
-      return "bindings are only permitted on nodes of type queue"
+DECLARE = Map({}, restricted=False)
+BINDINGS = List(Map({
+      "exchange": Types(basestring),
+      "queue": Types(basestring),
+      "key": Types(basestring),
+      "arguments": Map({}, restricted=False)
+      }))
 
 COMMON_OPTS = {
-    "create": POLICIES,
-    "delete": POLICIES,
-    "assert": POLICIES,
-    "node-properties": Map({
-        "type": Values("queue", "topic"),
-        "durable": Types(bool),
-        "x-properties": Map({
-            "type": Types(basestring),
-            "bindings": And(Types(list), Bindings())
-            },
-            restricted=False)
-        })
-    }
+  "create": POLICIES,
+  "delete": POLICIES,
+  "assert": POLICIES,
+  "node": Map({
+      "type": Values("queue", "topic"),
+      "durable": Types(bool),
+      "x-declare": DECLARE,
+      "x-bindings": BINDINGS
+      }),
+  "link": Map({
+      "name": Types(basestring),
+      "durable": Types(bool),
+      "reliability": RELIABILITY,
+      "x-declare": DECLARE,
+      "x-bindings": BINDINGS,
+      "x-subscribe": Map({}, restricted=False)
+      })
+  }
 
 RECEIVE_MODES = Values("browse", "consume")
 
@@ -196,36 +210,46 @@ class LinkIn:
     _rcv.destination = str(rcv.id)
     sst.destinations[_rcv.destination] = _rcv
     _rcv.draining = False
+    _rcv.on_unlink = []
 
   def do_link(self, sst, rcv, _rcv, type, subtype, action):
+    link_opts = _rcv.options.get("link", {})
+    # XXX: default?
+    reliability = link_opts.get("reliability", "unreliable")
+    declare = link_opts.get("x-declare", {})
+    subscribe = link_opts.get("x-subscribe", {})
     acq_mode = acquire_mode.pre_acquired
 
     if type == "topic":
-      _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
-      sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
-      filter = _rcv.options.get("filter")
-      if _rcv.subject is None and filter is None:
-        f = FILTER_DEFAULTS[subtype]
-      elif _rcv.subject and filter:
-        # XXX
-        raise Exception("can't supply both subject and filter")
-      elif _rcv.subject:
-        # XXX
-        f = Pattern(_rcv.subject)
-      else:
-        f = filter
-      f._bind(sst, _rcv.name, _rcv._queue)
+      default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
+      _rcv._queue = link_opts.get("name", default_name)
+      sst.write_cmd(QueueDeclare(queue=_rcv._queue,
+                                 durable=link_opts.get("durable", False),
+                                 exclusive=True,
+                                 auto_delete=(reliability == "unreliable")),
+                    overrides=declare)
+      _rcv.on_unlink = [QueueDelete(_rcv._queue)]
+      subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
+      sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
+      bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
     elif type == "queue":
       _rcv._queue = _rcv.name
       if _rcv.options.get("mode", "consume") == "browse":
         acq_mode = acquire_mode.not_acquired
+      bindings = get_bindings(link_opts, queue=_rcv._queue)
 
+    sst.write_cmds(bindings)
     sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
-                                   acquire_mode = acq_mode))
+                                   acquire_mode = acq_mode),
+                  overrides=subscribe)
     sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
 
   def do_unlink(self, sst, rcv, _rcv, action=noop):
-    sst.write_cmd(MessageCancel(_rcv.destination), action)
+    link_opts = _rcv.options.get("link", {})
+    reliability = link_opts.get("reliability")
+    cmds = [MessageCancel(_rcv.destination)]
+    cmds.extend(_rcv.on_unlink)
+    sst.write_cmds(cmds, action)
 
   def del_link(self, sst, rcv, _rcv):
     del sst.destinations[_rcv.destination]
@@ -240,13 +264,16 @@ class LinkOut:
     _snd.closing = False
 
   def do_link(self, sst, snd, _snd, type, subtype, action):
+    link_opts = _snd.options.get("link", {})
     if type == "topic":
       _snd._exchange = _snd.name
       _snd._routing_key = _snd.subject
+      bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
     elif type == "queue":
       _snd._exchange = ""
       _snd._routing_key = _snd.name
-    action()
+      bindings = get_bindings(link_opts, queue=_snd.name)
+    sst.write_cmds(bindings, action)
 
   def do_unlink(self, sst, snd, _snd, action=noop):
     action()
@@ -437,6 +464,17 @@ class Driver:
 
 DEFAULT_DISPOSITION = Disposition(None)
 
+def get_bindings(opts, queue=None, exchange=None, key=None):
+  bindings = opts.get("x-bindings", [])
+  cmds = []
+  for b in bindings:
+    exchange = b.get("exchange", exchange)
+    queue = b.get("queue", queue)
+    key = b.get("key", key)
+    args = b.get("arguments", {})
+    cmds.append(ExchangeBind(queue, exchange, key, args))
+  return cmds
+
 class Engine:
 
   def __init__(self, connection):
@@ -785,12 +823,6 @@ class Engine:
           err = self.declare(sst, lnk, action)
         else:
           err = ("no such queue: %s" % lnk.name,)
-      elif type == "queue":
-        try:
-          cmds = self.bindings(lnk)
-          sst.write_cmds(cmds, lambda: action(type, subtype))
-        except address.ParseError, e:
-          err = (e,)
       else:
         action(type, subtype)
 
@@ -831,23 +863,21 @@ class Engine:
 
   def declare(self, sst, lnk, action):
     name = lnk.name
-    props = lnk.options.get("node-properties", {})
+    props = lnk.options.get("node", {})
     durable = props.get("durable", DURABLE_DEFAULT)
     type = props.get("type", "queue")
-    xprops = props.get("x-properties", {})
+    declare = props.get("x-declare", {})
 
     if type == "topic":
       cmd = ExchangeDeclare(exchange=name, durable=durable)
+      bindings = get_bindings(props, exchange=name)
     elif type == "queue":
       cmd = QueueDeclare(queue=name, durable=durable)
+      bindings = get_bindings(props, queue=name)
     else:
       raise ValueError(type)
 
-    for f in cmd.FIELDS:
-      if f.name != "arguments" and xprops.has_key(f.name):
-        cmd[f.name] = xprops.pop(f.name)
-    if xprops:
-      cmd.arguments = xprops
+    sst.apply_overrides(cmd, declare)
 
     if type == "topic":
       if cmd.type is None:
@@ -857,11 +887,7 @@ class Engine:
       subtype = None
 
     cmds = [cmd]
-    if type == "queue":
-      try:
-        cmds.extend(self.bindings(lnk))
-      except address.ParseError, e:
-        return (e,)
+    cmds.extend(bindings)
 
     def declared():
       self.address_cache[name] = (type, subtype)
@@ -869,16 +895,6 @@ class Engine:
 
     sst.write_cmds(cmds, declared)
 
-  def bindings(self, lnk):
-    props = lnk.options.get("node-properties", {})
-    xprops = props.get("x-properties", {})
-    bindings = xprops.get("bindings", [])
-    cmds = []
-    for b in bindings:
-      n, s, o = address.parse(b)
-      cmds.append(ExchangeBind(lnk.name, n, s, o))
-    return cmds
-
   def delete(self, sst, name, action):
     def deleted():
       del self.address_cache[name]

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 14:57:28 2010
@@ -295,14 +295,29 @@ class Session:
       create: <create-policy>,
       delete: <delete-policy>,
       assert: <assert-policy>,
-      node-properties: {
+      node: {
         type: <node-type>,
         durable: <node-durability>,
-        x-properties: {
-          bindings: ["<exchange>/<key>", ...],
-          <passthrough-key>: <passthrough-value>
-        }
+        x-declare: { ... <queue-declare overrides> ... }
+        x-bindings: [<binding_1>, ... <binding_n>]
       }
+      link: {
+        name: <link-name>,
+        durable: <link-durability>,
+        reliability: <link-reliability>,
+        x-declare: { ... <queue-declare overrides> ... }
+        x-bindings: [<binding_1>, ... <binding_n>]
+        x-subscribe: { ... <message-subscribe overrides> ... }
+      }
+    }
+
+  Bindings are specified as a map with the following options::
+
+    {
+      exchange: <exchange>,
+      queue: <queue>,
+      key: <key>,
+      arguments: <arguments>
     }
 
   The create, delete, and assert policies specify who should perfom
@@ -316,14 +331,12 @@ class Session:
   The node-type is one of:
 
     - I{topic}: a topic node will default to the topic exchange,
-      x-properties may be used to specify other exchange types
+      x-declare may be used to specify other exchange types
     - I{queue}: this is the default node-type
 
-  The x-properties map permits arbitrary additional keys and values to
-  be specified. These keys and values are passed through when creating
-  a node or asserting facts about an existing node. Any passthrough
-  keys and values that do not match a standard field of the underlying
-  exchange or queue declare command will be sent in the arguments map.
+  The x-declare map permits protocol specific keys and values to be
+  specified. These keys and values are passed through when creating a
+  node or asserting facts about an existing node.
 
   Examples
   --------
@@ -353,18 +366,18 @@ class Session:
 
   You can customize the properties of the queue::
 
-    my-queue; {create: always, node-properties: {durable: True}}
+    my-queue; {create: always, node: {durable: True}}
 
   You can create a topic instead if you want::
 
-    my-queue; {create: always, node-properties: {type: topic}}
+    my-queue; {create: always, node: {type: topic}}
 
   You can assert that the address resolves to a node with particular
   properties::
 
     my-transient-topic; {
       assert: always,
-      node-properties: {
+      node: {
         type: topic,
         durable: False
       }

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Tue Mar 23 14:57:28 2010
@@ -59,8 +59,8 @@ class Base(Test):
     else:
       return "%s[%s, %s]" % (base, count, self.test_id)
 
-  def message(self, base, count = None):
-    return Message(self.content(base, count))
+  def message(self, base, count = None, **kwargs):
+    return Message(content=self.content(base, count), **kwargs)
 
   def ping(self, ssn):
     PING_Q = 'ping-queue; {create: always, delete: always}'

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 14:57:28 2010
@@ -247,9 +247,9 @@ class SessionTests(Base):
       test-reject-queue; {
         create: always,
         delete: always,
-        node-properties: {
-          x-properties: {
-            alternate_exchange: 'amq.topic'
+        node: {
+          x-declare: {
+            alternate-exchange: 'amq.topic'
           }
         }
       }
@@ -515,7 +515,7 @@ class ReceiverTests(Base):
     snd = self.ssn.sender("""test-double-close; {
   create: always,
   delete: sender,
-  node-properties: {
+  node: {
     type: topic
   }
 }
@@ -571,9 +571,9 @@ class AddressTests(Base):
       assert "error in options: %s" % error == str(e), e
 
   def testIllegalKey(self):
-    self.badOption("{create: always, node-properties: "
+    self.badOption("{create: always, node: "
                    "{this-property-does-not-exist: 3}}",
-                   "node-properties: this-property-does-not-exist: "
+                   "node: this-property-does-not-exist: "
                    "illegal key")
 
   def testWrongValue(self):
@@ -581,23 +581,17 @@ class AddressTests(Base):
                    "('always', 'sender', 'receiver', 'never')")
 
   def testWrongType1(self):
-    self.badOption("{node-properties: asdf}",
-                   "node-properties: asdf is not a map")
+    self.badOption("{node: asdf}",
+                   "node: asdf is not a map")
 
   def testWrongType2(self):
-    self.badOption("{node-properties: {durable: []}}",
-                   "node-properties: durable: [] is not a bool")
-
-  def testNonQueueBindings(self):
-    self.badOption("{node-properties: {type: topic, x-properties: "
-                   "{bindings: []}}}",
-                   "node-properties: x-properties: bindings: "
-                   "bindings are only permitted on nodes of type queue")
+    self.badOption("{node: {durable: []}}",
+                   "node: durable: [] is not a bool")
 
   def testCreateQueue(self):
     snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
-                          "node-properties: {type: queue, durable: False, "
-                          "x-properties: {auto_delete: true}}}")
+                          "node: {type: queue, durable: False, "
+                          "x-declare: {auto_delete: true}}}")
     content = self.content("testCreateQueue")
     snd.send(content)
     rcv = self.ssn.receiver("test-create-queue")
@@ -607,10 +601,10 @@ class AddressTests(Base):
     addr = """test-create-exchange; {
                 create: always,
                 delete: always,
-                node-properties: {
+                node: {
                   type: topic,
                   durable: False,
-                  x-properties: {auto_delete: true, %s}
+                  x-declare: {auto_delete: true, %s}
                 }
               }""" % props
     snd = self.ssn.sender(addr)
@@ -677,15 +671,15 @@ class AddressTests(Base):
     # XXX: need to figure out close after error
     self.conn._remove_session(self.ssn)
 
-  def testBindings(self):
+  def testNodeBindingsQueue(self):
     snd = self.ssn.sender("""
-test-bindings-queue; {
+test-node-bindings-queue; {
   create: always,
   delete: always,
-  node-properties: {
-    x-properties: {
-      bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
-    }
+  node: {
+    x-bindings: [{exchange: "amq.topic", key: "a.#"},
+                 {exchange: "amq.direct", key: "b"},
+                 {exchange: "amq.topic", key: "c.*"}]
   }
 }
 """)
@@ -696,49 +690,80 @@ test-bindings-queue; {
     snd_a.send("two")
     snd_b.send("three")
     snd_c.send("four")
-    rcv = self.ssn.receiver("test-bindings-queue")
+    rcv = self.ssn.receiver("test-node-bindings-queue")
     self.drain(rcv, expected=["one", "two", "three", "four"])
 
-  def testBindingsAdditive(self):
-    m1 = self.content("testBindingsAdditive", 1)
-    m2 = self.content("testBindingsAdditive", 2)
-    m3 = self.content("testBindingsAdditive", 3)
-    m4 = self.content("testBindingsAdditive", 4)
-
+  def testNodeBindingsTopic(self):
+    rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+    rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+    rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+    rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
     snd = self.ssn.sender("""
-test-bindings-additive-queue; {
+test-node-bindings-topic; {
   create: always,
   delete: always,
-  node-properties: {
-    x-properties: {
-      bindings: ["amq.topic/a"]
-    }
+  node: {
+    type: topic,
+    x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+                 {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+                 {queue: test-node-bindings-topic-queue-b, key: "b"},
+                 {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
   }
 }
 """)
+    m1 = Message("one")
+    m2 = Message(subject="a.foo", content="two")
+    m3 = Message(subject="b", content="three")
+    m4 = Message(subject="c.bar", content="four")
+    snd.send(m1)
+    snd.send(m2)
+    snd.send(m3)
+    snd.send(m4)
+    self.drain(rcv, expected=[m1, m2, m3, m4])
+    self.drain(rcv_a, expected=[m2])
+    self.drain(rcv_b, expected=[m3])
+    self.drain(rcv_c, expected=[m4])
+
+  def testLinkBindings(self):
+    m_a = self.message("testLinkBindings", 1, subject="a")
+    m_b = self.message("testLinkBindings", 2, subject="b")
 
-    snd_a = self.ssn.sender("amq.topic/a")
-    snd_b = self.ssn.sender("amq.topic/b")
+    self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+    snd = self.ssn.sender("amq.topic")
 
-    snd_a.send(m1)
-    snd_b.send(m2)
+    snd.send(m_a)
+    snd.send(m_b)
+    snd.close()
 
-    rcv = self.ssn.receiver("test-bindings-additive-queue")
-    self.drain(rcv, expected=[m1])
+    rcv = self.ssn.receiver("test-link-bindings-queue")
+    self.assertEmpty(rcv)
 
-    new_snd = self.ssn.sender("""
-test-bindings-additive-queue; {
-  node-properties: {
-    x-properties: {
-      bindings: ["amq.topic/b"]
-    }
+    snd = self.ssn.sender("""
+amq.topic; {
+  link: {
+    x-bindings: [{queue: test-link-bindings-queue, key: a}]
   }
 }
 """)
 
-    new_snd.send(m3)
-    snd_b.send(m4)
-    self.drain(rcv, expected=[m3, m4])
+    snd.send(m_a)
+    snd.send(m_b)
+
+    self.drain(rcv, expected=[m_a])
+    rcv.close()
+
+    rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+  link: {
+    x-bindings: [{exchange: "amq.topic", key: b}]
+  }
+}
+""")
+
+    snd.send(m_a)
+    snd.send(m_b)
+
+    self.drain(rcv, expected=[m_a, m_b])
 
   def testSubjectOverride(self):
     snd = self.ssn.sender("amq.topic/a")
@@ -764,6 +789,32 @@ test-bindings-additive-queue; {
     assert e2.subject == "b", "subject: %s" % e2.subject
     self.assertEmpty(rcv)
 
+  def doReliabilityTest(self, reliability, messages, expected):
+    snd = self.ssn.sender("amq.topic")
+    rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+    for m in messages:
+      snd.send(m)
+    self.conn.disconnect()
+    self.conn.connect()
+    self.drain(rcv, expected=expected)
+
+  def testReliabilityUnreliable(self):
+    msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+    self.doReliabilityTest("unreliable", msgs, [])
+
+  def testReliabilityAtLeastOnce(self):
+    msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+    self.doReliabilityTest("at-least-once", msgs, msgs)
+
+  def testLinkName(self):
+    msgs = [self.message("testLinkName", i) for i in range(3)]
+    snd = self.ssn.sender("amq.topic")
+    trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+    qrcv = self.ssn.receiver("test-link-name")
+    for m in msgs:
+      snd.send(m)
+    self.drain(qrcv, expected=msgs)
+
 NOSUCH_Q = "this-queue-should-not-exist"
 UNPARSEABLE_ADDR = "name/subject; {bad options"
 UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"

Modified: qpid/trunk/qpid/python/qpid/validator.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/validator.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/validator.py (original)
+++ qpid/trunk/qpid/python/qpid/validator.py Tue Mar 23 14:57:28 2010
@@ -54,6 +54,20 @@ class Types:
     else:
       return "%s is not one of: %s" % (o, ", ".join([t.__name__ for t in self.types]))
 
+class List:
+
+  def __init__(self, condition):
+    self.condition = condition
+
+  def validate(self, o, ctx):
+    if not isinstance(o, list):
+      return "%s is not a list" % o
+
+    ctx.push(o)
+    for v in o:
+      err = self.condition.validate(v, ctx)
+      if err: return err
+
 class Map:
 
   def __init__(self, map, restricted=True):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org