You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/17 18:50:29 UTC
svn commit: r911116 - /qpid/trunk/qpid/python/qpid/driver.py
Author: rhs
Date: Wed Feb 17 17:50:29 2010
New Revision: 911116
URL: http://svn.apache.org/viewvc?rev=911116&view=rev
Log:
added caching for resolved addresses
Modified:
qpid/trunk/qpid/python/qpid/driver.py
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=911116&r1=911115&r2=911116&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Feb 17 17:50:29 2010
@@ -234,6 +234,26 @@
def del_link(self, sst, snd, _snd):
pass
+class Cache:
+
+ def __init__(self, ttl):
+ self.ttl = ttl
+ self.entries = {}
+
+ def __setitem__(self, key, value):
+ self.entries[key] = time.time(), value
+
+ def __getitem__(self, key):
+ tstamp, value = self.entries[key]
+ if time.time() - tstamp >= self.ttl:
+ del self.entries[key]
+ raise KeyError(key)
+ else:
+ return value
+
+ def __delitem__(self, key):
+ del self.entries[key]
+
# XXX
HEADER="!4s4B"
@@ -259,6 +279,7 @@
self.connection.backups
self._host = 0
self._retrying = False
+
self.reset()
def reset(self):
@@ -271,6 +292,10 @@
self._channels = 0
self._sessions = {}
+ options = self.connection.options
+
+ self.address_cache = Cache(options.get("address_ttl", 60))
+
self._socket = None
self._buf = ""
self._hdr = ""
@@ -289,7 +314,6 @@
self._sasl.setAttr("password", self.connection.password)
if self.connection.host:
self._sasl.setAttr("host", self.connection.host)
- options = self.connection.options
self._sasl.setAttr("service", options.get("service", "qpidd"))
if "min_ssf" in options:
self._sasl.setAttr("minssf", options["min_ssf"])
@@ -672,21 +696,22 @@
if err: return "error in options: %s" % err
def resolve_declare(self, sst, lnk, dir, action):
- def do_resolved(er, qr):
+ declare = lnk.options.get("create") in ("always", dir)
+ def do_resolved(type, subtype):
err = None
- if er.not_found and not qr.queue:
- if lnk.options.get("create") in ("always", dir):
+ if type is None:
+ if declare:
err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
- elif qr.queue:
+ elif type == "queue":
try:
cmds = self.bindings(lnk)
- sst.write_cmds(cmds, lambda: action("queue", None))
+ sst.write_cmds(cmds, lambda: action(type, subtype))
except address.ParseError, e:
err = (e,)
else:
- action("topic", er.type)
+ action(type, subtype)
if err:
tgt = lnk.target
@@ -694,15 +719,31 @@
del self._attachments[tgt]
tgt.closed = True
return
- self.resolve(sst, lnk.name, do_resolved)
+ self.resolve(sst, lnk.name, do_resolved, force=declare)
+
+ def resolve(self, sst, name, action, force=False):
+ if not force:
+ try:
+ type, subtype = self.address_cache[name]
+ action(type, subtype)
+ return
+ except KeyError:
+ pass
- def resolve(self, sst, name, action):
args = []
def do_result(r):
args.append(r)
def do_action(r):
do_result(r)
- action(*args)
+ er, qr = args
+ if er.not_found and not qr.queue:
+ type, subtype = None, None
+ elif qr.queue:
+ type, subtype = "queue", None
+ else:
+ type, subtype = "topic", er.type
+ self.address_cache[name] = (type, subtype)
+ action(type, subtype)
sst.write_query(ExchangeQuery(name), do_result)
sst.write_query(QueueQuery(name), do_action)
@@ -740,7 +781,11 @@
except address.ParseError, e:
return (e,)
- sst.write_cmds(cmds, lambda: action(type, subtype))
+ def declared():
+ self.address_cache[name] = (type, subtype)
+ action(type, subtype)
+
+ sst.write_cmds(cmds, declared)
def bindings(self, lnk):
props = lnk.options.get("node-properties", {})
@@ -753,14 +798,20 @@
return cmds
def delete(self, sst, name, action):
- def do_delete(er, qr):
- if not er.not_found:
- sst.write_cmd(ExchangeDelete(name), action)
- elif qr.queue:
- sst.write_cmd(QueueDelete(name), action)
- else:
+ def deleted():
+ del self.address_cache[name]
+ action()
+
+ def do_delete(type, subtype):
+ if type == "topic":
+ sst.write_cmd(ExchangeDelete(name), deleted)
+ elif type == "queue":
+ sst.write_cmd(QueueDelete(name), deleted)
+ elif type is None:
action()
- self.resolve(sst, name, do_delete)
+ else:
+ raise ValueError(type)
+ self.resolve(sst, name, do_delete, force=True)
def process(self, ssn):
if ssn.closed or ssn.closing: return
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org