You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/23 15:57:29 UTC
svn commit: r926604 - in /qpid/trunk/qpid/python/qpid: brokertest.py
messaging/driver.py messaging/endpoints.py tests/messaging/__init__.py
tests/messaging/endpoints.py validator.py
Author: rhs
Date: Tue Mar 23 14:57:28 2010
New Revision: 926604
URL: http://svn.apache.org/viewvc?rev=926604&view=rev
Log:
Several updates to address options including:
- renamed node-properties to node
- added link to permit durable links (with names)
- split x-properties into x-declare, x-subscribe, and x-bindings
- removed automatic passthrough of unrecognized options (as this was confusing)
Modified:
qpid/trunk/qpid/python/qpid/brokertest.py
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/validator.py
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Tue Mar 23 14:57:28 2010
@@ -303,8 +303,8 @@ class Broker(Popen):
c.close()
def _prep_sender(self, queue, durable, xprops):
- s = queue + "; {create:always, node-properties:{durable:" + str(durable)
- if xprops != None: s += ", x-properties:{" + xprops + "}"
+ s = queue + "; {create:always, node:{durable:" + str(durable)
+ if xprops != None: s += ", x-declare:{" + xprops + "}"
return s + "}}"
def send_message(self, queue, message, durable=True, xprops=None, session=None):
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Mar 23 14:57:28 2010
@@ -33,7 +33,7 @@ from qpid.messaging.message import get_c
from qpid.ops import *
from qpid.selector import Selector
from qpid.util import connect
-from qpid.validator import And, Context, Map, Types, Values
+from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
log = getLogger("qpid.messaging")
@@ -78,9 +78,8 @@ class Pattern:
sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#")))
-FILTER_DEFAULTS = {
- "topic": Pattern("*"),
- "amq.failover": Pattern("DUMMY")
+SUBJECT_DEFAULTS = {
+ "topic": "#"
}
# XXX
@@ -130,7 +129,14 @@ class SessionState:
id = self.sent
self.write_cmd(query, lambda: handler(self.results.pop(id)))
- def write_cmd(self, cmd, action=noop):
+ def apply_overrides(self, cmd, overrides):
+ for k, v in overrides.items():
+ cmd[k.replace('-', '_')] = v
+
+ def write_cmd(self, cmd, action=noop, overrides=None):
+ if overrides:
+ self.apply_overrides(cmd, overrides)
+
if action != noop:
cmd.sync = True
if self.detached:
@@ -154,28 +160,36 @@ class SessionState:
self.driver.write_op(op)
POLICIES = Values("always", "sender", "receiver", "never")
+RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
+ "exactly-once")
-class Bindings:
-
- def validate(self, o, ctx):
- t = ctx.containers[1].get("type", "queue")
- if t != "queue":
- return "bindings are only permitted on nodes of type queue"
+DECLARE = Map({}, restricted=False)
+BINDINGS = List(Map({
+ "exchange": Types(basestring),
+ "queue": Types(basestring),
+ "key": Types(basestring),
+ "arguments": Map({}, restricted=False)
+ }))
COMMON_OPTS = {
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node-properties": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-properties": Map({
- "type": Types(basestring),
- "bindings": And(Types(list), Bindings())
- },
- restricted=False)
- })
- }
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-declare": DECLARE,
+ "x-bindings": BINDINGS
+ }),
+ "link": Map({
+ "name": Types(basestring),
+ "durable": Types(bool),
+ "reliability": RELIABILITY,
+ "x-declare": DECLARE,
+ "x-bindings": BINDINGS,
+ "x-subscribe": Map({}, restricted=False)
+ })
+ }
RECEIVE_MODES = Values("browse", "consume")
@@ -196,36 +210,46 @@ class LinkIn:
_rcv.destination = str(rcv.id)
sst.destinations[_rcv.destination] = _rcv
_rcv.draining = False
+ _rcv.on_unlink = []
def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ link_opts = _rcv.options.get("link", {})
+ # XXX: default?
+ reliability = link_opts.get("reliability", "unreliable")
+ declare = link_opts.get("x-declare", {})
+ subscribe = link_opts.get("x-subscribe", {})
acq_mode = acquire_mode.pre_acquired
if type == "topic":
- _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
- filter = _rcv.options.get("filter")
- if _rcv.subject is None and filter is None:
- f = FILTER_DEFAULTS[subtype]
- elif _rcv.subject and filter:
- # XXX
- raise Exception("can't supply both subject and filter")
- elif _rcv.subject:
- # XXX
- f = Pattern(_rcv.subject)
- else:
- f = filter
- f._bind(sst, _rcv.name, _rcv._queue)
+ default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
+ _rcv._queue = link_opts.get("name", default_name)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue,
+ durable=link_opts.get("durable", False),
+ exclusive=True,
+ auto_delete=(reliability == "unreliable")),
+ overrides=declare)
+ _rcv.on_unlink = [QueueDelete(_rcv._queue)]
+ subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
+ sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
+ bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
elif type == "queue":
_rcv._queue = _rcv.name
if _rcv.options.get("mode", "consume") == "browse":
acq_mode = acquire_mode.not_acquired
+ bindings = get_bindings(link_opts, queue=_rcv._queue)
+ sst.write_cmds(bindings)
sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
- acquire_mode = acq_mode))
+ acquire_mode = acq_mode),
+ overrides=subscribe)
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
- sst.write_cmd(MessageCancel(_rcv.destination), action)
+ link_opts = _rcv.options.get("link", {})
+ reliability = link_opts.get("reliability")
+ cmds = [MessageCancel(_rcv.destination)]
+ cmds.extend(_rcv.on_unlink)
+ sst.write_cmds(cmds, action)
def del_link(self, sst, rcv, _rcv):
del sst.destinations[_rcv.destination]
@@ -240,13 +264,16 @@ class LinkOut:
_snd.closing = False
def do_link(self, sst, snd, _snd, type, subtype, action):
+ link_opts = _snd.options.get("link", {})
if type == "topic":
_snd._exchange = _snd.name
_snd._routing_key = _snd.subject
+ bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
elif type == "queue":
_snd._exchange = ""
_snd._routing_key = _snd.name
- action()
+ bindings = get_bindings(link_opts, queue=_snd.name)
+ sst.write_cmds(bindings, action)
def do_unlink(self, sst, snd, _snd, action=noop):
action()
@@ -437,6 +464,17 @@ class Driver:
DEFAULT_DISPOSITION = Disposition(None)
+def get_bindings(opts, queue=None, exchange=None, key=None):
+ bindings = opts.get("x-bindings", [])
+ cmds = []
+ for b in bindings:
+ exchange = b.get("exchange", exchange)
+ queue = b.get("queue", queue)
+ key = b.get("key", key)
+ args = b.get("arguments", {})
+ cmds.append(ExchangeBind(queue, exchange, key, args))
+ return cmds
+
class Engine:
def __init__(self, connection):
@@ -785,12 +823,6 @@ class Engine:
err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
- elif type == "queue":
- try:
- cmds = self.bindings(lnk)
- sst.write_cmds(cmds, lambda: action(type, subtype))
- except address.ParseError, e:
- err = (e,)
else:
action(type, subtype)
@@ -831,23 +863,21 @@ class Engine:
def declare(self, sst, lnk, action):
name = lnk.name
- props = lnk.options.get("node-properties", {})
+ props = lnk.options.get("node", {})
durable = props.get("durable", DURABLE_DEFAULT)
type = props.get("type", "queue")
- xprops = props.get("x-properties", {})
+ declare = props.get("x-declare", {})
if type == "topic":
cmd = ExchangeDeclare(exchange=name, durable=durable)
+ bindings = get_bindings(props, exchange=name)
elif type == "queue":
cmd = QueueDeclare(queue=name, durable=durable)
+ bindings = get_bindings(props, queue=name)
else:
raise ValueError(type)
- for f in cmd.FIELDS:
- if f.name != "arguments" and xprops.has_key(f.name):
- cmd[f.name] = xprops.pop(f.name)
- if xprops:
- cmd.arguments = xprops
+ sst.apply_overrides(cmd, declare)
if type == "topic":
if cmd.type is None:
@@ -857,11 +887,7 @@ class Engine:
subtype = None
cmds = [cmd]
- if type == "queue":
- try:
- cmds.extend(self.bindings(lnk))
- except address.ParseError, e:
- return (e,)
+ cmds.extend(bindings)
def declared():
self.address_cache[name] = (type, subtype)
@@ -869,16 +895,6 @@ class Engine:
sst.write_cmds(cmds, declared)
- def bindings(self, lnk):
- props = lnk.options.get("node-properties", {})
- xprops = props.get("x-properties", {})
- bindings = xprops.get("bindings", [])
- cmds = []
- for b in bindings:
- n, s, o = address.parse(b)
- cmds.append(ExchangeBind(lnk.name, n, s, o))
- return cmds
-
def delete(self, sst, name, action):
def deleted():
del self.address_cache[name]
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 14:57:28 2010
@@ -295,14 +295,29 @@ class Session:
create: <create-policy>,
delete: <delete-policy>,
assert: <assert-policy>,
- node-properties: {
+ node: {
type: <node-type>,
durable: <node-durability>,
- x-properties: {
- bindings: ["<exchange>/<key>", ...],
- <passthrough-key>: <passthrough-value>
- }
+ x-declare: { ... <queue-declare overrides> ... }
+ x-bindings: [<binding_1>, ... <binding_n>]
}
+ link: {
+ name: <link-name>,
+ durable: <link-durability>,
+ reliability: <link-reliability>,
+ x-declare: { ... <queue-declare overrides> ... }
+ x-bindings: [<binding_1>, ... <binding_n>]
+ x-subscribe: { ... <message-subscribe overrides> ... }
+ }
+ }
+
+ Bindings are specified as a map with the following options::
+
+ {
+ exchange: <exchange>,
+ queue: <queue>,
+ key: <key>,
+ arguments: <arguments>
}
The create, delete, and assert policies specify who should perfom
@@ -316,14 +331,12 @@ class Session:
The node-type is one of:
- I{topic}: a topic node will default to the topic exchange,
- x-properties may be used to specify other exchange types
+ x-declare may be used to specify other exchange types
- I{queue}: this is the default node-type
- The x-properties map permits arbitrary additional keys and values to
- be specified. These keys and values are passed through when creating
- a node or asserting facts about an existing node. Any passthrough
- keys and values that do not match a standard field of the underlying
- exchange or queue declare command will be sent in the arguments map.
+ The x-declare map permits protocol specific keys and values to be
+ specified. These keys and values are passed through when creating a
+ node or asserting facts about an existing node.
Examples
--------
@@ -353,18 +366,18 @@ class Session:
You can customize the properties of the queue::
- my-queue; {create: always, node-properties: {durable: True}}
+ my-queue; {create: always, node: {durable: True}}
You can create a topic instead if you want::
- my-queue; {create: always, node-properties: {type: topic}}
+ my-queue; {create: always, node: {type: topic}}
You can assert that the address resolves to a node with particular
properties::
my-transient-topic; {
assert: always,
- node-properties: {
+ node: {
type: topic,
durable: False
}
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Tue Mar 23 14:57:28 2010
@@ -59,8 +59,8 @@ class Base(Test):
else:
return "%s[%s, %s]" % (base, count, self.test_id)
- def message(self, base, count = None):
- return Message(self.content(base, count))
+ def message(self, base, count = None, **kwargs):
+ return Message(content=self.content(base, count), **kwargs)
def ping(self, ssn):
PING_Q = 'ping-queue; {create: always, delete: always}'
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 14:57:28 2010
@@ -247,9 +247,9 @@ class SessionTests(Base):
test-reject-queue; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- alternate_exchange: 'amq.topic'
+ node: {
+ x-declare: {
+ alternate-exchange: 'amq.topic'
}
}
}
@@ -515,7 +515,7 @@ class ReceiverTests(Base):
snd = self.ssn.sender("""test-double-close; {
create: always,
delete: sender,
- node-properties: {
+ node: {
type: topic
}
}
@@ -571,9 +571,9 @@ class AddressTests(Base):
assert "error in options: %s" % error == str(e), e
def testIllegalKey(self):
- self.badOption("{create: always, node-properties: "
+ self.badOption("{create: always, node: "
"{this-property-does-not-exist: 3}}",
- "node-properties: this-property-does-not-exist: "
+ "node: this-property-does-not-exist: "
"illegal key")
def testWrongValue(self):
@@ -581,23 +581,17 @@ class AddressTests(Base):
"('always', 'sender', 'receiver', 'never')")
def testWrongType1(self):
- self.badOption("{node-properties: asdf}",
- "node-properties: asdf is not a map")
+ self.badOption("{node: asdf}",
+ "node: asdf is not a map")
def testWrongType2(self):
- self.badOption("{node-properties: {durable: []}}",
- "node-properties: durable: [] is not a bool")
-
- def testNonQueueBindings(self):
- self.badOption("{node-properties: {type: topic, x-properties: "
- "{bindings: []}}}",
- "node-properties: x-properties: bindings: "
- "bindings are only permitted on nodes of type queue")
+ self.badOption("{node: {durable: []}}",
+ "node: durable: [] is not a bool")
def testCreateQueue(self):
snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
- "node-properties: {type: queue, durable: False, "
- "x-properties: {auto_delete: true}}}")
+ "node: {type: queue, durable: False, "
+ "x-declare: {auto_delete: true}}}")
content = self.content("testCreateQueue")
snd.send(content)
rcv = self.ssn.receiver("test-create-queue")
@@ -607,10 +601,10 @@ class AddressTests(Base):
addr = """test-create-exchange; {
create: always,
delete: always,
- node-properties: {
+ node: {
type: topic,
durable: False,
- x-properties: {auto_delete: true, %s}
+ x-declare: {auto_delete: true, %s}
}
}""" % props
snd = self.ssn.sender(addr)
@@ -677,15 +671,15 @@ class AddressTests(Base):
# XXX: need to figure out close after error
self.conn._remove_session(self.ssn)
- def testBindings(self):
+ def testNodeBindingsQueue(self):
snd = self.ssn.sender("""
-test-bindings-queue; {
+test-node-bindings-queue; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
- }
+ node: {
+ x-bindings: [{exchange: "amq.topic", key: "a.#"},
+ {exchange: "amq.direct", key: "b"},
+ {exchange: "amq.topic", key: "c.*"}]
}
}
""")
@@ -696,49 +690,80 @@ test-bindings-queue; {
snd_a.send("two")
snd_b.send("three")
snd_c.send("four")
- rcv = self.ssn.receiver("test-bindings-queue")
+ rcv = self.ssn.receiver("test-node-bindings-queue")
self.drain(rcv, expected=["one", "two", "three", "four"])
- def testBindingsAdditive(self):
- m1 = self.content("testBindingsAdditive", 1)
- m2 = self.content("testBindingsAdditive", 2)
- m3 = self.content("testBindingsAdditive", 3)
- m4 = self.content("testBindingsAdditive", 4)
-
+ def testNodeBindingsTopic(self):
+ rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+ rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+ rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+ rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
snd = self.ssn.sender("""
-test-bindings-additive-queue; {
+test-node-bindings-topic; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a"]
- }
+ node: {
+ type: topic,
+ x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+ {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+ {queue: test-node-bindings-topic-queue-b, key: "b"},
+ {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
}
}
""")
+ m1 = Message("one")
+ m2 = Message(subject="a.foo", content="two")
+ m3 = Message(subject="b", content="three")
+ m4 = Message(subject="c.bar", content="four")
+ snd.send(m1)
+ snd.send(m2)
+ snd.send(m3)
+ snd.send(m4)
+ self.drain(rcv, expected=[m1, m2, m3, m4])
+ self.drain(rcv_a, expected=[m2])
+ self.drain(rcv_b, expected=[m3])
+ self.drain(rcv_c, expected=[m4])
+
+ def testLinkBindings(self):
+ m_a = self.message("testLinkBindings", 1, subject="a")
+ m_b = self.message("testLinkBindings", 2, subject="b")
- snd_a = self.ssn.sender("amq.topic/a")
- snd_b = self.ssn.sender("amq.topic/b")
+ self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+ snd = self.ssn.sender("amq.topic")
- snd_a.send(m1)
- snd_b.send(m2)
+ snd.send(m_a)
+ snd.send(m_b)
+ snd.close()
- rcv = self.ssn.receiver("test-bindings-additive-queue")
- self.drain(rcv, expected=[m1])
+ rcv = self.ssn.receiver("test-link-bindings-queue")
+ self.assertEmpty(rcv)
- new_snd = self.ssn.sender("""
-test-bindings-additive-queue; {
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/b"]
- }
+ snd = self.ssn.sender("""
+amq.topic; {
+ link: {
+ x-bindings: [{queue: test-link-bindings-queue, key: a}]
}
}
""")
- new_snd.send(m3)
- snd_b.send(m4)
- self.drain(rcv, expected=[m3, m4])
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a])
+ rcv.close()
+
+ rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+ link: {
+ x-bindings: [{exchange: "amq.topic", key: b}]
+ }
+}
+""")
+
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a, m_b])
def testSubjectOverride(self):
snd = self.ssn.sender("amq.topic/a")
@@ -764,6 +789,32 @@ test-bindings-additive-queue; {
assert e2.subject == "b", "subject: %s" % e2.subject
self.assertEmpty(rcv)
+ def doReliabilityTest(self, reliability, messages, expected):
+ snd = self.ssn.sender("amq.topic")
+ rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+ for m in messages:
+ snd.send(m)
+ self.conn.disconnect()
+ self.conn.connect()
+ self.drain(rcv, expected=expected)
+
+ def testReliabilityUnreliable(self):
+ msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+ self.doReliabilityTest("unreliable", msgs, [])
+
+ def testReliabilityAtLeastOnce(self):
+ msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+ self.doReliabilityTest("at-least-once", msgs, msgs)
+
+ def testLinkName(self):
+ msgs = [self.message("testLinkName", i) for i in range(3)]
+ snd = self.ssn.sender("amq.topic")
+ trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+ qrcv = self.ssn.receiver("test-link-name")
+ for m in msgs:
+ snd.send(m)
+ self.drain(qrcv, expected=msgs)
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
Modified: qpid/trunk/qpid/python/qpid/validator.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/validator.py?rev=926604&r1=926603&r2=926604&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/validator.py (original)
+++ qpid/trunk/qpid/python/qpid/validator.py Tue Mar 23 14:57:28 2010
@@ -54,6 +54,20 @@ class Types:
else:
return "%s is not one of: %s" % (o, ", ".join([t.__name__ for t in self.types]))
+class List:
+
+ def __init__(self, condition):
+ self.condition = condition
+
+ def validate(self, o, ctx):
+ if not isinstance(o, list):
+ return "%s is not a list" % o
+
+ ctx.push(o)
+ for v in o:
+ err = self.condition.validate(v, ctx)
+ if err: return err
+
class Map:
def __init__(self, map, restricted=True):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org