You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/02 17:35:43 UTC

svn commit: r810573 - in /qpid/trunk/qpid/python/qpid: concurrency.py driver.py lockable.py messaging.py

Author: rhs
Date: Wed Sep  2 15:35:42 2009
New Revision: 810573

URL: http://svn.apache.org/viewvc?rev=810573&view=rev
Log:
changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports

Added:
    qpid/trunk/qpid/python/qpid/concurrency.py   (contents, props changed)
      - copied, changed from r810508, qpid/trunk/qpid/python/qpid/lockable.py
Removed:
    qpid/trunk/qpid/python/qpid/lockable.py
Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py

Copied: qpid/trunk/qpid/python/qpid/concurrency.py (from r810508, qpid/trunk/qpid/python/qpid/lockable.py)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?p2=qpid/trunk/qpid/python/qpid/concurrency.py&p1=qpid/trunk/qpid/python/qpid/lockable.py&r1=810508&r2=810573&rev=810573&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/lockable.py (original)
+++ qpid/trunk/qpid/python/qpid/concurrency.py Wed Sep  2 15:35:42 2009
@@ -26,11 +26,11 @@
   exec """
 def %s%s:
   %s
-  %s.lock()
+  %s._lock.acquire()
   try:
     return meth%s
   finally:
-    %s.unlock()
+    %s._lock.release()
 """ % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs),
        repr(inspect.getdoc(meth)), args[0],
        inspect.formatargspec(args, vargs, kwargs, defs,
@@ -38,13 +38,10 @@
        args[0]) in scope
   return scope[meth.__name__]
 
-class Lockable(object):
+class Waiter(object):
 
-  def lock(self):
-    self._lock.acquire()
-
-  def unlock(self):
-    self._lock.release()
+  def __init__(self, condition):
+    self.condition = condition
 
   def wait(self, predicate, timeout=None):
     passed = 0
@@ -53,16 +50,16 @@
       if timeout is None:
         # using the timed wait prevents keyboard interrupts from being
         # blocked while waiting
-        self._condition.wait(3)
+        self.condition.wait(3)
       elif passed < timeout:
-        self._condition.wait(timeout - passed)
+        self.condition.wait(timeout - passed)
       else:
         return False
       passed = time.time() - start
     return True
 
   def notify(self):
-    self._condition.notify()
+    self.condition.notify()
 
   def notifyAll(self):
-    self._condition.notifyAll()
+    self.condition.notifyAll()

Propchange: qpid/trunk/qpid/python/qpid/concurrency.py
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=810573&r1=810572&r2=810573&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Sep  2 15:35:42 2009
@@ -18,11 +18,11 @@
 #
 
 import compat, connection, socket, sys, time
+from concurrency import synchronized
 from datatypes import RangedSet, Message as Message010
 from exceptions import Timeout
-from lockable import synchronized, Lockable
 from logging import getLogger
-from messaging import get_codec, Message, Pattern, UNLIMITED
+from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
 from ops import delivery_mode
 from session import Client, INCOMPLETE, SessionDetached
 from threading import Condition, Thread
@@ -63,12 +63,11 @@
       return handler._message_transfer(session, cmd)
   return Delegate
 
-class Driver(Lockable):
+class Driver:
 
   def __init__(self, connection):
     self.connection = connection
     self._lock = self.connection._lock
-    self._condition = self.connection._condition
     self._wakeup_cond = Condition()
     self._socket = None
     self._conn = None
@@ -134,7 +133,7 @@
         self.connection.error = (msg,)
 
     self._modcount = modcount
-    self.notifyAll()
+    self.connection._waiter.notifyAll()
 
   def connect(self):
     if self._conn is not None:
@@ -177,7 +176,7 @@
       _ssn.auto_sync = False
       _ssn.invoke_lock = self._lock
       _ssn.lock = self._lock
-      _ssn.condition = self._condition
+      _ssn.condition = self.connection._condition
       if ssn.transactional:
         # XXX: adding an attribute to qpid.session.Session
         _ssn.acked = []
@@ -422,7 +421,7 @@
     rcv.received += 1
     log.debug("RECV [%s] %s", ssn, msg)
     ssn.incoming.append(msg)
-    self.notifyAll()
+    self.connection._waiter.notifyAll()
     return INCOMPLETE
 
   def _decode(self, message):

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=810573&r1=810572&r2=810573&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep  2 15:35:42 2009
@@ -31,8 +31,8 @@
 """
 
 from codec010 import StringCodec
+from concurrency import synchronized, Waiter
 from datatypes import timestamp, uuid4, Serial
-from lockable import synchronized, Lockable
 from logging import getLogger
 from ops import PRIMITIVE
 from threading import Thread, RLock, Condition
@@ -69,7 +69,7 @@
   """
   pass
 
-class Connection(Lockable):
+class Connection:
 
   """
   A Connection manages a group of L{Sessions<Session>} and connects
@@ -114,19 +114,23 @@
     self._connected = False
     self._lock = RLock()
     self._condition = Condition(self._lock)
+    self._waiter = Waiter(self._condition)
     self._modcount = Serial(0)
     self.error = None
     from driver import Driver
     self._driver = Driver(self)
     self._driver.start()
 
+  def _wait(self, predicate, timeout=None):
+    return self._waiter.wait(predicate, timeout=timeout)
+
   def _wakeup(self):
     self._modcount += 1
     self._driver.wakeup()
 
   def _catchup(self, exc=ConnectionError):
     mc = self._modcount
-    self.wait(lambda: not self._driver._modcount < mc)
+    self._wait(lambda: not self._driver._modcount < mc)
     self._check_error(exc)
 
   def _check_error(self, exc=ConnectionError):
@@ -134,7 +138,7 @@
       raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=ConnectionError):
-    result = self.wait(lambda: self.error or predicate(), timeout)
+    result = self._wait(lambda: self.error or predicate(), timeout)
     self._check_error(exc)
     return result
 
@@ -255,7 +259,7 @@
 class TransactionAborted(SessionError):
   pass
 
-class Session(Lockable):
+class Session:
 
   """
   Sessions provide a linear context for sending and receiving
@@ -287,7 +291,6 @@
     self.closed = False
 
     self._lock = connection._lock
-    self._condition = connection._condition
     self.thread = Thread(target = self.run)
     self.thread.setDaemon(True)
     self.thread.start()
@@ -295,6 +298,9 @@
   def __repr__(self):
     return "<Session %s>" % self.name
 
+  def _wait(self, predicate, timeout=None):
+    return self.connection._wait(predicate, timeout=timeout)
+
   def _wakeup(self):
     self.connection._wakeup()
 
@@ -369,8 +375,8 @@
 
   @synchronized
   def _get(self, predicate, timeout=None):
-    if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
-                 timeout):
+    if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+                  timeout):
       msg = self._pop(predicate)
       if msg is not None:
         msg._receiver.returned += 1
@@ -454,7 +460,7 @@
     for rcv in self.receivers:
       rcv.stop()
     # TODO: think about stopping individual receivers in listen mode
-    self.wait(lambda: self._peek(self._pred) is None)
+    self._wait(lambda: self._peek(self._pred) is None)
     self.started = False
 
   def _pred(self, m):
@@ -470,10 +476,10 @@
         else:
           msg._receiver.listener(msg)
           if self._peek(self._pred) is None:
-            self.notifyAll()
+            self.connection._waiter.notifyAll()
     finally:
       self.closed = True
-      self.notifyAll()
+      self.connection._waiter.notifyAll()
 
   @synchronized
   def close(self):
@@ -486,7 +492,7 @@
     self.closing = True
     self._wakeup()
     self._catchup()
-    self.wait(lambda: self.closed)
+    self._wait(lambda: self.closed)
     while self.thread.isAlive():
       self.thread.join(3)
     self.thread = None
@@ -500,7 +506,7 @@
 class InsufficientCapacity(SendError):
   pass
 
-class Sender(Lockable):
+class Sender:
 
   """
   Sends outgoing messages.
@@ -515,7 +521,6 @@
     self.acked = Serial(0)
     self.closed = False
     self._lock = self.session._lock
-    self._condition = self.session._condition
 
   def _wakeup(self):
     self.session._wakeup()
@@ -598,7 +603,7 @@
   """
   pass
 
-class Receiver(Lockable):
+class Receiver:
 
   """
   Receives incoming messages from a remote source. Messages may be
@@ -625,7 +630,6 @@
     self.closed = False
     self.listener = None
     self._lock = self.session._lock
-    self._condition = self.session._condition
 
   def _wakeup(self):
     self.session._wakeup()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org