You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/17 15:45:52 UTC

svn commit: r910999 - in /qpid/trunk/qpid/python: qpid/driver.py qpid/tests/messaging.py todo.txt

Author: rhs
Date: Wed Feb 17 14:45:52 2010
New Revision: 910999

URL: http://svn.apache.org/viewvc?rev=910999&view=rev
Log:
added support for browsing

Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py
    qpid/trunk/qpid/python/todo.txt

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=910999&r1=910998&r2=910999&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Feb 17 14:45:52 2010
@@ -132,10 +132,44 @@
     op.channel = self.channel
     self.driver.write_op(op)
 
+POLICIES = Values("always", "sender", "receiver", "never")
+
+class Bindings:
+
+  def validate(self, o, ctx):
+    t = ctx.containers[1].get("type", "queue")
+    if t != "queue":
+      return "bindings are only permitted on nodes of type queue"
+
+COMMON_OPTS = {
+    "create": POLICIES,
+    "delete": POLICIES,
+    "assert": POLICIES,
+    "node-properties": Map({
+        "type": Values("queue", "topic"),
+        "durable": Types(bool),
+        "x-properties": Map({
+            "type": Types(basestring),
+            "bindings": And(Types(list), Bindings())
+            },
+            restricted=False)
+        })
+    }
+
+RECEIVE_MODES = Values("browse", "consume")
+
+SOURCE_OPTS = COMMON_OPTS.copy()
+SOURCE_OPTS.update({
+    "mode": RECEIVE_MODES
+    })
+
+TARGET_OPTS = COMMON_OPTS.copy()
+
 class LinkIn:
 
   ADDR_NAME = "source"
   DIR_NAME = "receiver"
+  VALIDATOR = Map(SOURCE_OPTS)
 
   def init_link(self, sst, rcv, _rcv):
     _rcv.destination = str(rcv.id)
@@ -143,6 +177,8 @@
     _rcv.draining = False
 
   def do_link(self, sst, rcv, _rcv, type, subtype, action):
+    acq_mode = acquire_mode.pre_acquired
+
     if type == "topic":
       _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
       sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
@@ -161,8 +197,11 @@
       f._bind(sst, _rcv.name, _rcv._queue)
     elif type == "queue":
       _rcv._queue = _rcv.name
+      if _rcv.options.get("mode", "consume") == "browse":
+        acq_mode = acquire_mode.not_acquired
 
-    sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+    sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
+                                   acquire_mode = acq_mode))
     sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
 
   def do_unlink(self, sst, rcv, _rcv, action=noop):
@@ -175,6 +214,7 @@
 
   ADDR_NAME = "target"
   DIR_NAME = "sender"
+  VALIDATOR = Map(TARGET_OPTS)
 
   def init_link(self, sst, snd, _snd):
     _snd.closing = False
@@ -582,7 +622,7 @@
       _lnk.closing = False
       dir.init_link(sst, lnk, _lnk)
 
-      err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
+      err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
       if err:
         lnk.error = (err,)
         lnk.closed = True
@@ -626,33 +666,9 @@
       except address.ParseError, e:
         return e
 
-  POLICIES = Values("always", "sender", "receiver", "never")
-
-  class Bindings:
-
-    def validate(self, o, ctx):
-      t = ctx.containers[1].get("type", "queue")
-      if t != "queue":
-        return "bindings are only permitted on nodes of type queue"
-
-  OPTS = Map({
-      "create": POLICIES,
-      "delete": POLICIES,
-      "assert": POLICIES,
-      "node-properties": Map({
-          "type": Values("queue", "topic"),
-          "durable": Types(bool),
-          "x-properties": Map({
-              "type": Types(basestring),
-              "bindings": And(Types(list), Bindings())
-              },
-              restricted=False)
-          })
-        })
-
-  def validate_options(self, lnk):
+  def validate_options(self, lnk, dir):
     ctx = Context()
-    err = Driver.OPTS.validate(lnk.options, ctx)
+    err = dir.VALIDATOR.validate(lnk.options, ctx)
     if err: return "error in options: %s" % err
 
   def resolve_declare(self, sst, lnk, dir, action):

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=910999&r1=910998&r2=910999&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Wed Feb 17 14:45:52 2010
@@ -580,6 +580,22 @@
 
   # XXX: need testClose
 
+  def testMode(self):
+    msgs = [self.content("testMode", 1),
+            self.content("testMode", 2),
+            self.content("testMode", 3)]
+
+    for m in msgs:
+      self.snd.send(m)
+
+    rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
+    rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
+    self.drain(rb, expected=msgs)
+    self.drain(rc, expected=msgs)
+    rb2 = self.ssn.receiver(rb.source)
+    self.assertEmpty(rb2)
+    self.drain(self.rcv, expected=[])
+
 class AddressTests(Base):
 
   def setup_connection(self):
@@ -846,6 +862,12 @@
     self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
                            lambda e: "unrecognized characters" in str(e))
 
+  def testInvalidMode(self):
+    # XXX: should have specific exception for this
+    self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
+                           ReceiveError,
+                           lambda e: "not in ('browse', 'consume')" in str(e))
+
 SENDER_Q = 'test-sender-q; {create: always, delete: always}'
 
 class SenderTests(Base):

Modified: qpid/trunk/qpid/python/todo.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/todo.txt?rev=910999&r1=910998&r2=910999&view=diff
==============================================================================
--- qpid/trunk/qpid/python/todo.txt (original)
+++ qpid/trunk/qpid/python/todo.txt Wed Feb 17 14:45:52 2010
@@ -167,7 +167,7 @@
     + need to handle cleanup of temp queues/topics: F, NR
     + passthrough options for creating exchanges/queues: F, NR
   - integration with java: NF
-  - queue browsing: NF
+  - queue browsing: F, NR
   - temporary queues: NF
   - xquery: NF
 
@@ -190,3 +190,5 @@
 
 Miscellaneous:
   - standard ping-like (drain/spout) utilities for all clients: NF
+  - caching of resolved addresses
+  - consider using separate session for query/deletion/creation of addresses



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org