You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/10/12 21:06:19 UTC

qpid-proton git commit: PROTON-1534: BlockingConnection proper cleanup after LinkDetached exception

Repository: qpid-proton
Updated Branches:
  refs/heads/master b9d64d66c -> 47c9ae01d


PROTON-1534: BlockingConnection proper cleanup after LinkDetached exception


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/47c9ae01
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/47c9ae01
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/47c9ae01

Branch: refs/heads/master
Commit: 47c9ae01dcbdbfffc0a4e78b4648171896d0f2a7
Parents: b9d64d6
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu Oct 12 14:04:15 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu Oct 12 14:05:50 2017 -0700

----------------------------------------------------------------------
 proton-c/bindings/python/proton/utils.py | 35 ++++++++++++++++++---------
 1 file changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/47c9ae01/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 13f29a7..c9a3654 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -22,6 +22,7 @@ from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkE
 from proton import ProtonException, Timeout, Url
 from proton.reactor import Container
 from proton.handlers import MessagingHandler, IncomingMessageHandler
+from cproton import pn_reactor_collector, pn_collector_release
 
 
 class BlockingLink(object):
@@ -43,7 +44,8 @@ class BlockingLink(object):
     def _checkClosed(self):
         if self.link.state & Endpoint.REMOTE_CLOSED:
             self.link.close()
-            raise LinkDetached(self.link)
+            if not self.connection.closing:
+                raise LinkDetached(self.link)
 
     def close(self):
         self.link.close()
@@ -99,10 +101,12 @@ class Fetcher(MessagingHandler):
     def on_link_error(self, event):
         if event.link.state & Endpoint.LOCAL_ACTIVE:
             event.link.close()
-            raise LinkDetached(event.link)
+            if not self.connection.closing:
+                raise LinkDetached(event.link)
 
     def on_connection_error(self, event):
-        raise ConnectionClosed(event.connection)
+        if not self.connection.closing:
+            raise ConnectionClosed(event.connection)
 
     @property
     def has_message(self):
@@ -214,6 +218,7 @@ class BlockingConnection(Handler):
         self.container.start()
         self.url = Url(url).defaults()
         self.conn = None
+        self.closing = False
         failed = True
         try:
             self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
@@ -239,19 +244,23 @@ class BlockingConnection(Handler):
             self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
 
     def close(self):
-        if not self.conn:
+        # TODO: provide stronger interrupt protection on cleanup.  See PEP 419
+        if self.closing:
             return
-        self.conn.close()
+        self.closing = True
+        self.container.errors = []
         try:
-            self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
-                      msg="Closing connection")
+            if self.conn:
+                self.conn.close()
+                self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
+                          msg="Closing connection")
         finally:
             self.conn.free()
-            # For cleanup, reactor needs to process PN_CONNECTION_FINAL
-            # and all events with embedded contexts must be drained.
-            self.run() # will not block any more
+            # Nothing left to block on.  Allow reactor to clean up.
+            self.run()
             self.conn = None
             self.container.global_handler = None # break circular ref: container to cadapter.on_error
+            pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
             self.container = None
 
     def _is_closed(self):
@@ -293,12 +302,14 @@ class BlockingConnection(Handler):
     def on_link_remote_close(self, event):
         if event.link.state & Endpoint.LOCAL_ACTIVE:
             event.link.close()
-            raise LinkDetached(event.link)
+            if not self.closing:
+                raise LinkDetached(event.link)
 
     def on_connection_remote_close(self, event):
         if event.connection.state & Endpoint.LOCAL_ACTIVE:
             event.connection.close()
-            raise ConnectionClosed(event.connection)
+            if not self.closing:
+                raise ConnectionClosed(event.connection)
 
     def on_transport_tail_closed(self, event):
         self.on_transport_closed(event)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org