You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/17 15:45:52 UTC
svn commit: r910999 - in /qpid/trunk/qpid/python: qpid/driver.py
qpid/tests/messaging.py todo.txt
Author: rhs
Date: Wed Feb 17 14:45:52 2010
New Revision: 910999
URL: http://svn.apache.org/viewvc?rev=910999&view=rev
Log:
added support for browsing
Modified:
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
qpid/trunk/qpid/python/todo.txt
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=910999&r1=910998&r2=910999&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Feb 17 14:45:52 2010
@@ -132,10 +132,44 @@
op.channel = self.channel
self.driver.write_op(op)
+POLICIES = Values("always", "sender", "receiver", "never")
+
+class Bindings:
+
+ def validate(self, o, ctx):
+ t = ctx.containers[1].get("type", "queue")
+ if t != "queue":
+ return "bindings are only permitted on nodes of type queue"
+
+COMMON_OPTS = {
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node-properties": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-properties": Map({
+ "type": Types(basestring),
+ "bindings": And(Types(list), Bindings())
+ },
+ restricted=False)
+ })
+ }
+
+RECEIVE_MODES = Values("browse", "consume")
+
+SOURCE_OPTS = COMMON_OPTS.copy()
+SOURCE_OPTS.update({
+ "mode": RECEIVE_MODES
+ })
+
+TARGET_OPTS = COMMON_OPTS.copy()
+
class LinkIn:
ADDR_NAME = "source"
DIR_NAME = "receiver"
+ VALIDATOR = Map(SOURCE_OPTS)
def init_link(self, sst, rcv, _rcv):
_rcv.destination = str(rcv.id)
@@ -143,6 +177,8 @@
_rcv.draining = False
def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ acq_mode = acquire_mode.pre_acquired
+
if type == "topic":
_rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
@@ -161,8 +197,11 @@
f._bind(sst, _rcv.name, _rcv._queue)
elif type == "queue":
_rcv._queue = _rcv.name
+ if _rcv.options.get("mode", "consume") == "browse":
+ acq_mode = acquire_mode.not_acquired
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
+ acquire_mode = acq_mode))
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
@@ -175,6 +214,7 @@
ADDR_NAME = "target"
DIR_NAME = "sender"
+ VALIDATOR = Map(TARGET_OPTS)
def init_link(self, sst, snd, _snd):
_snd.closing = False
@@ -582,7 +622,7 @@
_lnk.closing = False
dir.init_link(sst, lnk, _lnk)
- err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
+ err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
if err:
lnk.error = (err,)
lnk.closed = True
@@ -626,33 +666,9 @@
except address.ParseError, e:
return e
- POLICIES = Values("always", "sender", "receiver", "never")
-
- class Bindings:
-
- def validate(self, o, ctx):
- t = ctx.containers[1].get("type", "queue")
- if t != "queue":
- return "bindings are only permitted on nodes of type queue"
-
- OPTS = Map({
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node-properties": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-properties": Map({
- "type": Types(basestring),
- "bindings": And(Types(list), Bindings())
- },
- restricted=False)
- })
- })
-
- def validate_options(self, lnk):
+ def validate_options(self, lnk, dir):
ctx = Context()
- err = Driver.OPTS.validate(lnk.options, ctx)
+ err = dir.VALIDATOR.validate(lnk.options, ctx)
if err: return "error in options: %s" % err
def resolve_declare(self, sst, lnk, dir, action):
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=910999&r1=910998&r2=910999&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Wed Feb 17 14:45:52 2010
@@ -580,6 +580,22 @@
# XXX: need testClose
+ def testMode(self):
+ msgs = [self.content("testMode", 1),
+ self.content("testMode", 2),
+ self.content("testMode", 3)]
+
+ for m in msgs:
+ self.snd.send(m)
+
+ rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
+ rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
+ self.drain(rb, expected=msgs)
+ self.drain(rc, expected=msgs)
+ rb2 = self.ssn.receiver(rb.source)
+ self.assertEmpty(rb2)
+ self.drain(self.rcv, expected=[])
+
class AddressTests(Base):
def setup_connection(self):
@@ -846,6 +862,12 @@
self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
lambda e: "unrecognized characters" in str(e))
+ def testInvalidMode(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
+ ReceiveError,
+ lambda e: "not in ('browse', 'consume')" in str(e))
+
SENDER_Q = 'test-sender-q; {create: always, delete: always}'
class SenderTests(Base):
Modified: qpid/trunk/qpid/python/todo.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/todo.txt?rev=910999&r1=910998&r2=910999&view=diff
==============================================================================
--- qpid/trunk/qpid/python/todo.txt (original)
+++ qpid/trunk/qpid/python/todo.txt Wed Feb 17 14:45:52 2010
@@ -167,7 +167,7 @@
+ need to handle cleanup of temp queues/topics: F, NR
+ passthrough options for creating exchanges/queues: F, NR
- integration with java: NF
- - queue browsing: NF
+ - queue browsing: F, NR
- temporary queues: NF
- xquery: NF
@@ -190,3 +190,5 @@
Miscellaneous:
- standard ping-like (drain/spout) utilities for all clients: NF
+ - caching of resolved addresses
+ - consider using separate session for query/deletion/creation of addresses
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org