You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/02 17:35:43 UTC
svn commit: r810573 - in /qpid/trunk/qpid/python/qpid: concurrency.py
driver.py lockable.py messaging.py
Author: rhs
Date: Wed Sep 2 15:35:42 2009
New Revision: 810573
URL: http://svn.apache.org/viewvc?rev=810573&view=rev
Log:
changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports
Added:
qpid/trunk/qpid/python/qpid/concurrency.py (contents, props changed)
- copied, changed from r810508, qpid/trunk/qpid/python/qpid/lockable.py
Removed:
qpid/trunk/qpid/python/qpid/lockable.py
Modified:
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
Copied: qpid/trunk/qpid/python/qpid/concurrency.py (from r810508, qpid/trunk/qpid/python/qpid/lockable.py)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?p2=qpid/trunk/qpid/python/qpid/concurrency.py&p1=qpid/trunk/qpid/python/qpid/lockable.py&r1=810508&r2=810573&rev=810573&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/lockable.py (original)
+++ qpid/trunk/qpid/python/qpid/concurrency.py Wed Sep 2 15:35:42 2009
@@ -26,11 +26,11 @@
exec """
def %s%s:
%s
- %s.lock()
+ %s._lock.acquire()
try:
return meth%s
finally:
- %s.unlock()
+ %s._lock.release()
""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs),
repr(inspect.getdoc(meth)), args[0],
inspect.formatargspec(args, vargs, kwargs, defs,
@@ -38,13 +38,10 @@
args[0]) in scope
return scope[meth.__name__]
-class Lockable(object):
+class Waiter(object):
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
+ def __init__(self, condition):
+ self.condition = condition
def wait(self, predicate, timeout=None):
passed = 0
@@ -53,16 +50,16 @@
if timeout is None:
# using the timed wait prevents keyboard interrupts from being
# blocked while waiting
- self._condition.wait(3)
+ self.condition.wait(3)
elif passed < timeout:
- self._condition.wait(timeout - passed)
+ self.condition.wait(timeout - passed)
else:
return False
passed = time.time() - start
return True
def notify(self):
- self._condition.notify()
+ self.condition.notify()
def notifyAll(self):
- self._condition.notifyAll()
+ self.condition.notifyAll()
Propchange: qpid/trunk/qpid/python/qpid/concurrency.py
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=810573&r1=810572&r2=810573&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Sep 2 15:35:42 2009
@@ -18,11 +18,11 @@
#
import compat, connection, socket, sys, time
+from concurrency import synchronized
from datatypes import RangedSet, Message as Message010
from exceptions import Timeout
-from lockable import synchronized, Lockable
from logging import getLogger
-from messaging import get_codec, Message, Pattern, UNLIMITED
+from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
from ops import delivery_mode
from session import Client, INCOMPLETE, SessionDetached
from threading import Condition, Thread
@@ -63,12 +63,11 @@
return handler._message_transfer(session, cmd)
return Delegate
-class Driver(Lockable):
+class Driver:
def __init__(self, connection):
self.connection = connection
self._lock = self.connection._lock
- self._condition = self.connection._condition
self._wakeup_cond = Condition()
self._socket = None
self._conn = None
@@ -134,7 +133,7 @@
self.connection.error = (msg,)
self._modcount = modcount
- self.notifyAll()
+ self.connection._waiter.notifyAll()
def connect(self):
if self._conn is not None:
@@ -177,7 +176,7 @@
_ssn.auto_sync = False
_ssn.invoke_lock = self._lock
_ssn.lock = self._lock
- _ssn.condition = self._condition
+ _ssn.condition = self.connection._condition
if ssn.transactional:
# XXX: adding an attribute to qpid.session.Session
_ssn.acked = []
@@ -422,7 +421,7 @@
rcv.received += 1
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
- self.notifyAll()
+ self.connection._waiter.notifyAll()
return INCOMPLETE
def _decode(self, message):
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=810573&r1=810572&r2=810573&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep 2 15:35:42 2009
@@ -31,8 +31,8 @@
"""
from codec010 import StringCodec
+from concurrency import synchronized, Waiter
from datatypes import timestamp, uuid4, Serial
-from lockable import synchronized, Lockable
from logging import getLogger
from ops import PRIMITIVE
from threading import Thread, RLock, Condition
@@ -69,7 +69,7 @@
"""
pass
-class Connection(Lockable):
+class Connection:
"""
A Connection manages a group of L{Sessions<Session>} and connects
@@ -114,19 +114,23 @@
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
self._modcount = Serial(0)
self.error = None
from driver import Driver
self._driver = Driver(self)
self._driver.start()
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
def _wakeup(self):
self._modcount += 1
self._driver.wakeup()
def _catchup(self, exc=ConnectionError):
mc = self._modcount
- self.wait(lambda: not self._driver._modcount < mc)
+ self._wait(lambda: not self._driver._modcount < mc)
self._check_error(exc)
def _check_error(self, exc=ConnectionError):
@@ -134,7 +138,7 @@
raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ConnectionError):
- result = self.wait(lambda: self.error or predicate(), timeout)
+ result = self._wait(lambda: self.error or predicate(), timeout)
self._check_error(exc)
return result
@@ -255,7 +259,7 @@
class TransactionAborted(SessionError):
pass
-class Session(Lockable):
+class Session:
"""
Sessions provide a linear context for sending and receiving
@@ -287,7 +291,6 @@
self.closed = False
self._lock = connection._lock
- self._condition = connection._condition
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -295,6 +298,9 @@
def __repr__(self):
return "<Session %s>" % self.name
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
+
def _wakeup(self):
self.connection._wakeup()
@@ -369,8 +375,8 @@
@synchronized
def _get(self, predicate, timeout=None):
- if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -454,7 +460,7 @@
for rcv in self.receivers:
rcv.stop()
# TODO: think about stopping individual receivers in listen mode
- self.wait(lambda: self._peek(self._pred) is None)
+ self._wait(lambda: self._peek(self._pred) is None)
self.started = False
def _pred(self, m):
@@ -470,10 +476,10 @@
else:
msg._receiver.listener(msg)
if self._peek(self._pred) is None:
- self.notifyAll()
+ self.connection._waiter.notifyAll()
finally:
self.closed = True
- self.notifyAll()
+ self.connection._waiter.notifyAll()
@synchronized
def close(self):
@@ -486,7 +492,7 @@
self.closing = True
self._wakeup()
self._catchup()
- self.wait(lambda: self.closed)
+ self._wait(lambda: self.closed)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
@@ -500,7 +506,7 @@
class InsufficientCapacity(SendError):
pass
-class Sender(Lockable):
+class Sender:
"""
Sends outgoing messages.
@@ -515,7 +521,6 @@
self.acked = Serial(0)
self.closed = False
self._lock = self.session._lock
- self._condition = self.session._condition
def _wakeup(self):
self.session._wakeup()
@@ -598,7 +603,7 @@
"""
pass
-class Receiver(Lockable):
+class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
@@ -625,7 +630,6 @@
self.closed = False
self.listener = None
self._lock = self.session._lock
- self._condition = self.session._condition
def _wakeup(self):
self.session._wakeup()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org