You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:03:31 UTC
[03/50] qpid-proton git commit: PROTON-1394: Python client resource
cleanup, circular references and underlying C objects
PROTON-1394: Python client resource cleanup, circular references and underlying C objects
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b79759d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b79759d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b79759d6
Branch: refs/heads/go1
Commit: b79759d6dbd2f542086bb6bcb3c94806ed818b5a
Parents: b17671e
Author: Clifford Jansen <cl...@apache.org>
Authored: Fri Jul 28 16:44:18 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Fri Jul 28 16:44:18 2017 -0700
----------------------------------------------------------------------
proton-c/bindings/python/proton/__init__.py | 9 ++++---
proton-c/bindings/python/proton/handlers.py | 8 +++---
proton-c/bindings/python/proton/reactor.py | 31 +++++++++++++++++-------
proton-c/bindings/python/proton/utils.py | 21 ++++++++++------
4 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 2b354df..dca600b 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2324,7 +2324,7 @@ class Endpoint(object):
from . import reactor
ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
- on_error = ractor.on_error
+ on_error = ractor.on_error_delegate()
else:
on_error = None
record = self._get_attachments()
@@ -2334,7 +2334,7 @@ class Endpoint(object):
from . import reactor
ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
- on_error = ractor.on_error
+ on_error = ractor.on_error_delegate()
else:
on_error = None
impl = _chandler(handler, on_error)
@@ -4110,9 +4110,10 @@ class WrappedHandler(Wrapper):
else:
on_error(info)
- def add(self, handler):
+ def add(self, handler, on_error=None):
if handler is None: return
- impl = _chandler(handler, self._on_error)
+ if on_error is None: on_error = self._on_error
+ impl = _chandler(handler, on_error)
pn_handler_add(self._impl, impl)
pn_decref(impl)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 6d580b7..6d3cce5 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-import heapq, logging, os, re, socket, time, types
+import heapq, logging, os, re, socket, time, types, weakref
from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
@@ -390,9 +390,9 @@ class MessagingHandler(Handler, Acking):
self.handlers = []
if prefetch:
self.handlers.append(CFlowController(prefetch))
- self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
- self.handlers.append(IncomingMessageHandler(auto_accept, self))
- self.handlers.append(OutgoingMessageHandler(auto_settle, self))
+ self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
+ self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
+ self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
self.fatal_conditions = ["amqp:unauthorized-access"]
def on_transport_error(self, event):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index bffeea1..5f6d8cb 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -84,20 +84,33 @@ class Reactor(Wrapper):
def __init__(self, *handlers, **kwargs):
Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
for h in handlers:
- self.handler.add(h)
+ self.handler.add(h, on_error=self.on_error_delegate())
def _init(self):
self.errors = []
+ # on_error relay handler tied to underlying C reactor. Use when the
+ # error will always be generated from a callback from this reactor.
+ # Needed to prevent reference cycles and be compatible with wrappers.
+ class ErrorDelegate(object):
+ def __init__(self, reactor):
+ self.reactor_impl = reactor._impl
+ def on_error(self, info):
+ ractor = Reactor.wrap(self.reactor_impl)
+ ractor.on_error(info)
+
+ def on_error_delegate(self):
+ return Reactor.ErrorDelegate(self).on_error
+
def on_error(self, info):
self.errors.append(info)
self.yield_()
def _get_global(self):
- return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
+ return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
def _set_global(self, handler):
- impl = _chandler(handler, self.on_error)
+ impl = _chandler(handler, self.on_error_delegate())
pn_reactor_set_global_handler(self._impl, impl)
pn_decref(impl)
@@ -118,10 +131,10 @@ class Reactor(Wrapper):
return pn_reactor_mark(self._impl)
def _get_handler(self):
- return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
+ return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
def _set_handler(self, handler):
- impl = _chandler(handler, self.on_error)
+ impl = _chandler(handler, self.on_error_delegate())
pn_reactor_set_handler(self._impl, impl)
pn_decref(impl)
@@ -164,13 +177,13 @@ class Reactor(Wrapper):
self._check_errors()
def schedule(self, delay, task):
- impl = _chandler(task, self.on_error)
+ impl = _chandler(task, self.on_error_delegate())
task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
pn_decref(impl)
return task
def acceptor(self, host, port, handler=None):
- impl = _chandler(handler, self.on_error)
+ impl = _chandler(handler, self.on_error_delegate())
aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
pn_decref(impl)
if aimpl:
@@ -181,7 +194,7 @@ class Reactor(Wrapper):
def connection(self, handler=None):
"""Deprecated: use connection_to_host() instead
"""
- impl = _chandler(handler, self.on_error)
+ impl = _chandler(handler, self.on_error_delegate())
result = Connection.wrap(pn_reactor_connection(self._impl, impl))
if impl: pn_decref(impl)
return result
@@ -215,7 +228,7 @@ class Reactor(Wrapper):
return utf82unicode(_url)
def selectable(self, handler=None):
- impl = _chandler(handler, self.on_error)
+ impl = _chandler(handler, self.on_error_delegate())
result = Selectable.wrap(pn_reactor_selectable(self._impl))
if impl:
record = pn_selectable_attachments(result._impl)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 05ef80d..d0679ae 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -137,10 +137,10 @@ class BlockingReceiver(BlockingLink):
def __del__(self):
self.fetcher = None
# The next line causes a core dump if the Proton-C reactor finalizes
- # first. The self.container reference prevents reactor finalization
- # until after it is set to None.
- self.link.handler = None
- self.container = None
+ # first. The self.container reference prevents out of order reactor
+ # finalization. It may not be set if exception in BlockingLink.__init__
+ if hasattr(self, "container"):
+ self.link.handler = None # implicit call to reactor
def receive(self, timeout=False):
if not self.fetcher:
@@ -208,9 +208,16 @@ class BlockingConnection(Handler):
self.container.timeout = self.timeout
self.container.start()
self.url = Url(url).defaults()
- self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
- self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
- msg="Opening connection")
+ self.conn = None
+ failed = True
+ try:
+ self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
+ self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
+ msg="Opening connection")
+ failed = False
+ finally:
+ if failed and self.conn:
+ self.close()
def create_sender(self, address, handler=None, name=None, options=None):
return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org