You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/17 18:50:29 UTC

svn commit: r911116 - /qpid/trunk/qpid/python/qpid/driver.py

Author: rhs
Date: Wed Feb 17 17:50:29 2010
New Revision: 911116

URL: http://svn.apache.org/viewvc?rev=911116&view=rev
Log:
added caching for resolved addresses

Modified:
    qpid/trunk/qpid/python/qpid/driver.py

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=911116&r1=911115&r2=911116&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Feb 17 17:50:29 2010
@@ -234,6 +234,26 @@
   def del_link(self, sst, snd, _snd):
     pass
 
+class Cache:
+
+  def __init__(self, ttl):
+    self.ttl = ttl
+    self.entries = {}
+
+  def __setitem__(self, key, value):
+    self.entries[key] = time.time(), value
+
+  def __getitem__(self, key):
+    tstamp, value = self.entries[key]
+    if time.time() - tstamp >= self.ttl:
+      del self.entries[key]
+      raise KeyError(key)
+    else:
+      return value
+
+  def __delitem__(self, key):
+    del self.entries[key]
+
 # XXX
 HEADER="!4s4B"
 
@@ -259,6 +279,7 @@
         self.connection.backups
     self._host = 0
     self._retrying = False
+
     self.reset()
 
   def reset(self):
@@ -271,6 +292,10 @@
     self._channels = 0
     self._sessions = {}
 
+    options = self.connection.options
+
+    self.address_cache = Cache(options.get("address_ttl", 60))
+
     self._socket = None
     self._buf = ""
     self._hdr = ""
@@ -289,7 +314,6 @@
       self._sasl.setAttr("password", self.connection.password)
     if self.connection.host:
       self._sasl.setAttr("host", self.connection.host)
-    options = self.connection.options
     self._sasl.setAttr("service", options.get("service", "qpidd"))
     if "min_ssf" in options:
       self._sasl.setAttr("minssf", options["min_ssf"])
@@ -672,21 +696,22 @@
     if err: return "error in options: %s" % err
 
   def resolve_declare(self, sst, lnk, dir, action):
-    def do_resolved(er, qr):
+    declare = lnk.options.get("create") in ("always", dir)
+    def do_resolved(type, subtype):
       err = None
-      if er.not_found and not qr.queue:
-        if lnk.options.get("create") in ("always", dir):
+      if type is None:
+        if declare:
           err = self.declare(sst, lnk, action)
         else:
           err = ("no such queue: %s" % lnk.name,)
-      elif qr.queue:
+      elif type == "queue":
         try:
           cmds = self.bindings(lnk)
-          sst.write_cmds(cmds, lambda: action("queue", None))
+          sst.write_cmds(cmds, lambda: action(type, subtype))
         except address.ParseError, e:
           err = (e,)
       else:
-        action("topic", er.type)
+        action(type, subtype)
 
       if err:
         tgt = lnk.target
@@ -694,15 +719,31 @@
         del self._attachments[tgt]
         tgt.closed = True
         return
-    self.resolve(sst, lnk.name, do_resolved)
+    self.resolve(sst, lnk.name, do_resolved, force=declare)
+
+  def resolve(self, sst, name, action, force=False):
+    if not force:
+      try:
+        type, subtype = self.address_cache[name]
+        action(type, subtype)
+        return
+      except KeyError:
+        pass
 
-  def resolve(self, sst, name, action):
     args = []
     def do_result(r):
       args.append(r)
     def do_action(r):
       do_result(r)
-      action(*args)
+      er, qr = args
+      if er.not_found and not qr.queue:
+        type, subtype = None, None
+      elif qr.queue:
+        type, subtype = "queue", None
+      else:
+        type, subtype = "topic", er.type
+      self.address_cache[name] = (type, subtype)
+      action(type, subtype)
     sst.write_query(ExchangeQuery(name), do_result)
     sst.write_query(QueueQuery(name), do_action)
 
@@ -740,7 +781,11 @@
       except address.ParseError, e:
         return (e,)
 
-    sst.write_cmds(cmds, lambda: action(type, subtype))
+    def declared():
+      self.address_cache[name] = (type, subtype)
+      action(type, subtype)
+
+    sst.write_cmds(cmds, declared)
 
   def bindings(self, lnk):
     props = lnk.options.get("node-properties", {})
@@ -753,14 +798,20 @@
     return cmds
 
   def delete(self, sst, name, action):
-    def do_delete(er, qr):
-      if not er.not_found:
-        sst.write_cmd(ExchangeDelete(name), action)
-      elif qr.queue:
-        sst.write_cmd(QueueDelete(name), action)
-      else:
+    def deleted():
+      del self.address_cache[name]
+      action()
+
+    def do_delete(type, subtype):
+      if type == "topic":
+        sst.write_cmd(ExchangeDelete(name), deleted)
+      elif type == "queue":
+        sst.write_cmd(QueueDelete(name), deleted)
+      elif type is None:
         action()
-    self.resolve(sst, name, do_delete)
+      else:
+        raise ValueError(type)
+    self.resolve(sst, name, do_delete, force=True)
 
   def process(self, ssn):
     if ssn.closed or ssn.closing: return



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org