You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/10/26 16:49:03 UTC
svn commit: r829837 - in /qpid/trunk/qpid/python/qpid: compat.py
concurrency.py driver.py messaging.py
Author: rhs
Date: Mon Oct 26 15:49:02 2009
New Revision: 829837
URL: http://svn.apache.org/viewvc?rev=829837&view=rev
Log:
improved request/response performance by using an I/O based condition instead of the default condition from the python threading module
Modified:
qpid/trunk/qpid/python/qpid/compat.py
qpid/trunk/qpid/python/qpid/concurrency.py
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
Modified: qpid/trunk/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/compat.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/compat.py (original)
+++ qpid/trunk/qpid/python/qpid/compat.py Mon Oct 26 15:49:02 2009
@@ -43,27 +43,47 @@
else:
from select import select
+class BaseWaiter:
+
+ def wakeup(self):
+ self._do_write()
+
+ def wait(self, timeout=None):
+ if timeout is not None:
+ ready, _, _ = select([self], [], [], timeout)
+ else:
+ ready = True
+
+ if ready:
+ self._do_read()
+ return True
+ else:
+ return False
+
+ def reading(self):
+ return True
+
+ def readable(self):
+ self._do_read()
+
if sys.platform in ('win32', 'cygwin'):
import socket
- class SockWaiter:
+ class SockWaiter(BaseWaiter):
def __init__(self, read_sock, write_sock):
self.read_sock = read_sock
self.write_sock = write_sock
- def wakeup(self):
+ def _do_write(self):
self.write_sock.send("\0")
+ def _do_read(self):
+ self.read_sock.recv(65536)
+
def fileno(self):
return self.read_sock.fileno()
- def reading(self):
- return True
-
- def readable(self):
- self.read_sock.recv(65536)
-
def __repr__(self):
return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
@@ -80,24 +100,21 @@
else:
import os
- class PipeWaiter:
+ class PipeWaiter(BaseWaiter):
def __init__(self, read_fd, write_fd):
self.read_fd = read_fd
self.write_fd = write_fd
- def wakeup(self):
+ def _do_write(self):
os.write(self.write_fd, "\0")
+ def _do_read(self):
+ os.read(self.read_fd, 65536)
+
def fileno(self):
return self.read_fd
- def reading(self):
- return True
-
- def readable(self):
- os.read(self.read_fd, 65536)
-
def __repr__(self):
return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
Modified: qpid/trunk/qpid/python/qpid/concurrency.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/concurrency.py (original)
+++ qpid/trunk/qpid/python/qpid/concurrency.py Mon Oct 26 15:49:02 2009
@@ -17,7 +17,7 @@
# under the License.
#
-import inspect, time
+import compat, inspect, time
def synchronized(meth):
args, vargs, kwargs, defs = inspect.getargspec(meth)
@@ -48,13 +48,17 @@
start = time.time()
while not predicate():
if timeout is None:
+ # XXX: this timed wait thing is not necessary for the fast
+ # condition from this module, only for the condition impl from
+ # the threading module
+
# using the timed wait prevents keyboard interrupts from being
# blocked while waiting
self.condition.wait(3)
elif passed < timeout:
self.condition.wait(timeout - passed)
else:
- return False
+ return bool(predicate())
passed = time.time() - start
return True
@@ -63,3 +67,34 @@
def notifyAll(self):
self.condition.notifyAll()
+
+class Condition:
+
+ def __init__(self, lock):
+ self.lock = lock
+ self.waiters = []
+ self.waiting = []
+
+ def notify(self):
+ assert self.lock._is_owned()
+ if self.waiting:
+ self.waiting[0].wakeup()
+
+ def notifyAll(self):
+ assert self.lock._is_owned()
+ for w in self.waiting:
+ w.wakeup()
+
+ def wait(self, timeout=None):
+ assert self.lock._is_owned()
+ if not self.waiters:
+ self.waiters.append(compat.selectable_waiter())
+ sw = self.waiters.pop(0)
+ self.waiting.append(sw)
+ try:
+ st = self.lock._release_save()
+ sw.wait(timeout)
+ finally:
+ self.lock._acquire_restore(st)
+ self.waiting.remove(sw)
+ self.waiters.append(sw)
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Mon Oct 26 15:49:02 2009
@@ -755,7 +755,7 @@
sst.outgoing_idx -= 1
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body, sync=True), msg_acked)
+ payload=body, sync=True), msg_acked)
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Mon Oct 26 15:49:02 2009
@@ -31,11 +31,11 @@
"""
from codec010 import StringCodec
-from concurrency import synchronized, Waiter
+from concurrency import synchronized, Waiter, Condition
from datatypes import timestamp, uuid4, Serial
from logging import getLogger
from ops import PRIMITIVE
-from threading import Thread, RLock, Condition
+from threading import Thread, RLock
from util import default
log = getLogger("qpid.messaging")
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org