You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/10/26 16:49:03 UTC

svn commit: r829837 - in /qpid/trunk/qpid/python/qpid: compat.py concurrency.py driver.py messaging.py

Author: rhs
Date: Mon Oct 26 15:49:02 2009
New Revision: 829837

URL: http://svn.apache.org/viewvc?rev=829837&view=rev
Log:
improved request/response performance by using an I/O based condition instead of the default condition from the python threading module

Modified:
    qpid/trunk/qpid/python/qpid/compat.py
    qpid/trunk/qpid/python/qpid/concurrency.py
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py

Modified: qpid/trunk/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/compat.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/compat.py (original)
+++ qpid/trunk/qpid/python/qpid/compat.py Mon Oct 26 15:49:02 2009
@@ -43,27 +43,47 @@
 else:
   from select import select
 
+class BaseWaiter:
+
+  def wakeup(self):
+    self._do_write()
+
+  def wait(self, timeout=None):
+    if timeout is not None:
+      ready, _, _ = select([self], [], [], timeout)
+    else:
+      ready = True
+
+    if ready:
+      self._do_read()
+      return True
+    else:
+      return False
+
+  def reading(self):
+    return True
+
+  def readable(self):
+    self._do_read()
+
 if sys.platform in ('win32', 'cygwin'):
   import socket
 
-  class SockWaiter:
+  class SockWaiter(BaseWaiter):
 
     def __init__(self, read_sock, write_sock):
       self.read_sock = read_sock
       self.write_sock = write_sock
 
-    def wakeup(self):
+    def _do_write(self):
       self.write_sock.send("\0")
 
+    def _do_read(self):
+      self.read_sock.recv(65536)
+
     def fileno(self):
       return self.read_sock.fileno()
 
-    def reading(self):
-      return True
-
-    def readable(self):
-      self.read_sock.recv(65536)
-
   def __repr__(self):
     return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
 
@@ -80,24 +100,21 @@
 else:
   import os
 
-  class PipeWaiter:
+  class PipeWaiter(BaseWaiter):
 
     def __init__(self, read_fd, write_fd):
       self.read_fd = read_fd
       self.write_fd = write_fd
 
-    def wakeup(self):
+    def _do_write(self):
       os.write(self.write_fd, "\0")
 
+    def _do_read(self):
+      os.read(self.read_fd, 65536)
+
     def fileno(self):
       return self.read_fd
 
-    def reading(self):
-      return True
-
-    def readable(self):
-      os.read(self.read_fd, 65536)
-
     def __repr__(self):
       return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
 

Modified: qpid/trunk/qpid/python/qpid/concurrency.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/concurrency.py (original)
+++ qpid/trunk/qpid/python/qpid/concurrency.py Mon Oct 26 15:49:02 2009
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import inspect, time
+import compat, inspect, time
 
 def synchronized(meth):
   args, vargs, kwargs, defs = inspect.getargspec(meth)
@@ -48,13 +48,17 @@
     start = time.time()
     while not predicate():
       if timeout is None:
+        # XXX: this timed wait thing is not necessary for the fast
+        # condition from this module, only for the condition impl from
+        # the threading module
+
         # using the timed wait prevents keyboard interrupts from being
         # blocked while waiting
         self.condition.wait(3)
       elif passed < timeout:
         self.condition.wait(timeout - passed)
       else:
-        return False
+        return bool(predicate())
       passed = time.time() - start
     return True
 
@@ -63,3 +67,34 @@
 
   def notifyAll(self):
     self.condition.notifyAll()
+
+class Condition:
+
+  def __init__(self, lock):
+    self.lock = lock
+    self.waiters = []
+    self.waiting = []
+
+  def notify(self):
+    assert self.lock._is_owned()
+    if self.waiting:
+      self.waiting[0].wakeup()
+
+  def notifyAll(self):
+    assert self.lock._is_owned()
+    for w in self.waiting:
+      w.wakeup()
+
+  def wait(self, timeout=None):
+    assert self.lock._is_owned()
+    if not self.waiters:
+      self.waiters.append(compat.selectable_waiter())
+    sw = self.waiters.pop(0)
+    self.waiting.append(sw)
+    try:
+      st = self.lock._release_save()
+      sw.wait(timeout)
+    finally:
+      self.lock._acquire_restore(st)
+      self.waiting.remove(sw)
+      self.waiters.append(sw)

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Mon Oct 26 15:49:02 2009
@@ -755,7 +755,7 @@
       sst.outgoing_idx -= 1
       assert msg == m
     sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
-                                   payload=body, sync=True), msg_acked)
+                                  payload=body, sync=True), msg_acked)
 
   def do_message_transfer(self, xfr):
     sst = self.get_sst(xfr)

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=829837&r1=829836&r2=829837&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Mon Oct 26 15:49:02 2009
@@ -31,11 +31,11 @@
 """
 
 from codec010 import StringCodec
-from concurrency import synchronized, Waiter
+from concurrency import synchronized, Waiter, Condition
 from datatypes import timestamp, uuid4, Serial
 from logging import getLogger
 from ops import PRIMITIVE
-from threading import Thread, RLock, Condition
+from threading import Thread, RLock
 from util import default
 
 log = getLogger("qpid.messaging")



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org