You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:03:31 UTC

[03/50] qpid-proton git commit: PROTON-1394: Python client resource cleanup, circular references and underlying C objects

PROTON-1394: Python client resource cleanup, circular references and underlying C objects


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b79759d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b79759d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b79759d6

Branch: refs/heads/go1
Commit: b79759d6dbd2f542086bb6bcb3c94806ed818b5a
Parents: b17671e
Author: Clifford Jansen <cl...@apache.org>
Authored: Fri Jul 28 16:44:18 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Fri Jul 28 16:44:18 2017 -0700

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py |  9 ++++---
 proton-c/bindings/python/proton/handlers.py |  8 +++---
 proton-c/bindings/python/proton/reactor.py  | 31 +++++++++++++++++-------
 proton-c/bindings/python/proton/utils.py    | 21 ++++++++++------
 4 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 2b354df..dca600b 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2324,7 +2324,7 @@ class Endpoint(object):
     from . import reactor
     ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
     if ractor:
-      on_error = ractor.on_error
+      on_error = ractor.on_error_delegate()
     else:
       on_error = None
     record = self._get_attachments()
@@ -2334,7 +2334,7 @@ class Endpoint(object):
     from . import reactor
     ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
     if ractor:
-      on_error = ractor.on_error
+      on_error = ractor.on_error_delegate()
     else:
       on_error = None
     impl = _chandler(handler, on_error)
@@ -4110,9 +4110,10 @@ class WrappedHandler(Wrapper):
     else:
       on_error(info)
 
-  def add(self, handler):
+  def add(self, handler, on_error=None):
     if handler is None: return
-    impl = _chandler(handler, self._on_error)
+    if on_error is None: on_error = self._on_error
+    impl = _chandler(handler, on_error)
     pn_handler_add(self._impl, impl)
     pn_decref(impl)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 6d580b7..6d3cce5 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import heapq, logging, os, re, socket, time, types
+import heapq, logging, os, re, socket, time, types, weakref
 
 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
@@ -390,9 +390,9 @@ class MessagingHandler(Handler, Acking):
         self.handlers = []
         if prefetch:
             self.handlers.append(CFlowController(prefetch))
-        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
-        self.handlers.append(IncomingMessageHandler(auto_accept, self))
-        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
+        self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
+        self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
         self.fatal_conditions = ["amqp:unauthorized-access"]
 
     def on_transport_error(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index bffeea1..5f6d8cb 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -84,20 +84,33 @@ class Reactor(Wrapper):
     def __init__(self, *handlers, **kwargs):
         Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
         for h in handlers:
-            self.handler.add(h)
+            self.handler.add(h, on_error=self.on_error_delegate())
 
     def _init(self):
         self.errors = []
 
+    # on_error relay handler tied to underlying C reactor.  Use when the
+    # error will always be generated from a callback from this reactor.
+    # Needed to prevent reference cycles and be compatible with wrappers.
+    class ErrorDelegate(object):
+      def __init__(self, reactor):
+        self.reactor_impl = reactor._impl
+      def on_error(self, info):
+        ractor = Reactor.wrap(self.reactor_impl)
+        ractor.on_error(info)
+
+    def on_error_delegate(self):
+        return Reactor.ErrorDelegate(self).on_error
+
     def on_error(self, info):
         self.errors.append(info)
         self.yield_()
 
     def _get_global(self):
-        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
+        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
 
     def _set_global(self, handler):
-        impl = _chandler(handler, self.on_error)
+        impl = _chandler(handler, self.on_error_delegate())
         pn_reactor_set_global_handler(self._impl, impl)
         pn_decref(impl)
 
@@ -118,10 +131,10 @@ class Reactor(Wrapper):
         return pn_reactor_mark(self._impl)
 
     def _get_handler(self):
-        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
+        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
 
     def _set_handler(self, handler):
-        impl = _chandler(handler, self.on_error)
+        impl = _chandler(handler, self.on_error_delegate())
         pn_reactor_set_handler(self._impl, impl)
         pn_decref(impl)
 
@@ -164,13 +177,13 @@ class Reactor(Wrapper):
         self._check_errors()
 
     def schedule(self, delay, task):
-        impl = _chandler(task, self.on_error)
+        impl = _chandler(task, self.on_error_delegate())
         task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
         pn_decref(impl)
         return task
 
     def acceptor(self, host, port, handler=None):
-        impl = _chandler(handler, self.on_error)
+        impl = _chandler(handler, self.on_error_delegate())
         aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
         pn_decref(impl)
         if aimpl:
@@ -181,7 +194,7 @@ class Reactor(Wrapper):
     def connection(self, handler=None):
         """Deprecated: use connection_to_host() instead
         """
-        impl = _chandler(handler, self.on_error)
+        impl = _chandler(handler, self.on_error_delegate())
         result = Connection.wrap(pn_reactor_connection(self._impl, impl))
         if impl: pn_decref(impl)
         return result
@@ -215,7 +228,7 @@ class Reactor(Wrapper):
         return utf82unicode(_url)
 
     def selectable(self, handler=None):
-        impl = _chandler(handler, self.on_error)
+        impl = _chandler(handler, self.on_error_delegate())
         result = Selectable.wrap(pn_reactor_selectable(self._impl))
         if impl:
             record = pn_selectable_attachments(result._impl)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 05ef80d..d0679ae 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -137,10 +137,10 @@ class BlockingReceiver(BlockingLink):
     def __del__(self):
         self.fetcher = None
         # The next line causes a core dump if the Proton-C reactor finalizes
-        # first.  The self.container reference prevents reactor finalization
-        # until after it is set to None.
-        self.link.handler = None
-        self.container = None
+        # first.  The self.container reference prevents out of order reactor
+        # finalization. It may not be set if exception in BlockingLink.__init__
+        if hasattr(self, "container"):
+            self.link.handler = None  # implicit call to reactor
 
     def receive(self, timeout=False):
         if not self.fetcher:
@@ -208,9 +208,16 @@ class BlockingConnection(Handler):
         self.container.timeout = self.timeout
         self.container.start()
         self.url = Url(url).defaults()
-        self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
-        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
-                  msg="Opening connection")
+        self.conn = None
+        failed = True
+        try:
+            self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
+            self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
+                      msg="Opening connection")
+            failed = False
+        finally:
+            if failed and self.conn:
+                self.close()
 
     def create_sender(self, address, handler=None, name=None, options=None):
         return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org