You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/23 21:34:58 UTC
svn commit: r926766 - in /qpid/trunk/qpid/python/qpid: compat.py
concurrency.py messaging/driver.py messaging/endpoints.py
tests/messaging/endpoints.py
Author: rhs
Date: Tue Mar 23 20:34:58 2010
New Revision: 926766
URL: http://svn.apache.org/viewvc?rev=926766&view=rev
Log:
fixed resource leakage on repeated connection open/close
Modified:
qpid/trunk/qpid/python/qpid/compat.py
qpid/trunk/qpid/python/qpid/concurrency.py
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
Modified: qpid/trunk/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/compat.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/compat.py (original)
+++ qpid/trunk/qpid/python/qpid/compat.py Tue Mar 23 20:34:58 2010
@@ -84,6 +84,16 @@ if sys.platform in ('win32', 'cygwin'):
def fileno(self):
return self.read_sock.fileno()
+ def close(self):
+ if self.write_sock is not None:
+ self.write_sock.close()
+ self.write_sock = None
+ self.read_sock.close()
+ self.read_sock = None
+
+ def __del__(self):
+ self.close()
+
def __repr__(self):
return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
@@ -102,9 +112,8 @@ else:
class PipeWaiter(BaseWaiter):
- def __init__(self, read_fd, write_fd):
- self.read_fd = read_fd
- self.write_fd = write_fd
+ def __init__(self):
+ self.read_fd, self.write_fd = os.pipe()
def _do_write(self):
os.write(self.write_fd, "\0")
@@ -115,8 +124,18 @@ else:
def fileno(self):
return self.read_fd
+ def close(self):
+ if self.write_fd is not None:
+ os.close(self.write_fd)
+ self.write_fd = None
+ os.close(self.read_fd)
+ self.read_fd = None
+
+ def __del__(self):
+ self.close()
+
def __repr__(self):
return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
def selectable_waiter():
- return PipeWaiter(*os.pipe())
+ return PipeWaiter()
Modified: qpid/trunk/qpid/python/qpid/concurrency.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/concurrency.py (original)
+++ qpid/trunk/qpid/python/qpid/concurrency.py Tue Mar 23 20:34:58 2010
@@ -98,3 +98,9 @@ class Condition:
self.lock._acquire_restore(st)
self.waiting.remove(sw)
self.waiters.append(sw)
+
+ def gc(self):
+ assert self.lock._is_owned()
+ while self.waiters:
+ sw = self.waiters.pop(0)
+ sw.close()
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Mar 23 20:34:58 2010
@@ -342,6 +342,9 @@ class Driver:
def start(self):
self._selector.register(self)
+ def stop(self):
+ self._selector.unregister(self)
+
def fileno(self):
return self._socket.fileno()
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 20:34:58 2010
@@ -102,7 +102,6 @@ class Connection:
self.error = None
from driver import Driver
self._driver = Driver(self)
- self._driver.start()
def _wait(self, predicate, timeout=None):
return self._waiter.wait(predicate, timeout=timeout)
@@ -157,6 +156,7 @@ class Connection:
Connect to the remote endpoint.
"""
self._connected = True
+ self._driver.start()
self._wakeup()
self._ewait(lambda: self._transport_connected and not self._unlinked(),
exc=ConnectError)
@@ -175,6 +175,8 @@ class Connection:
self._connected = False
self._wakeup()
self._ewait(lambda: not self._transport_connected)
+ self._driver.stop()
+ self._condition.gc()
@synchronized
def connected(self):
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 20:34:58 2010
@@ -20,7 +20,7 @@
# setup, usage, teardown, errors(sync), errors(async), stress, soak,
# boundary-conditions, config
-import time
+import errno, os, time
from qpid import compat
from qpid.messaging import *
from qpid.tests.messaging import Base
@@ -48,6 +48,29 @@ class SetupTests(Base):
# XXX: should verify that e includes appropriate diagnostic info
pass
+ def use_fds(self):
+ fds = []
+ try:
+ while True:
+ fds.append(os.open("/dev/null", os.O_RDONLY))
+ except OSError, e:
+ if e.errno != errno.EMFILE:
+ raise e
+ else:
+ return fds
+
+ def testOpenCloseResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection.open(self.broker.host, self.broker.port)
+ conn.close()
+ finally:
+ while fds:
+ os.close(fds.pop())
+
class ConnectionTests(Base):
def setup_connection(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org