You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/10/12 21:06:19 UTC
qpid-proton git commit: PROTON-1534: BlockingConnection proper
cleanup after LinkDetached exception
Repository: qpid-proton
Updated Branches:
refs/heads/master b9d64d66c -> 47c9ae01d
PROTON-1534: BlockingConnection proper cleanup after LinkDetached exception
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/47c9ae01
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/47c9ae01
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/47c9ae01
Branch: refs/heads/master
Commit: 47c9ae01dcbdbfffc0a4e78b4648171896d0f2a7
Parents: b9d64d6
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu Oct 12 14:04:15 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu Oct 12 14:05:50 2017 -0700
----------------------------------------------------------------------
proton-c/bindings/python/proton/utils.py | 35 ++++++++++++++++++---------
1 file changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/47c9ae01/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 13f29a7..c9a3654 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -22,6 +22,7 @@ from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkE
from proton import ProtonException, Timeout, Url
from proton.reactor import Container
from proton.handlers import MessagingHandler, IncomingMessageHandler
+from cproton import pn_reactor_collector, pn_collector_release
class BlockingLink(object):
@@ -43,7 +44,8 @@ class BlockingLink(object):
def _checkClosed(self):
if self.link.state & Endpoint.REMOTE_CLOSED:
self.link.close()
- raise LinkDetached(self.link)
+ if not self.connection.closing:
+ raise LinkDetached(self.link)
def close(self):
self.link.close()
@@ -99,10 +101,12 @@ class Fetcher(MessagingHandler):
def on_link_error(self, event):
if event.link.state & Endpoint.LOCAL_ACTIVE:
event.link.close()
- raise LinkDetached(event.link)
+ if not self.connection.closing:
+ raise LinkDetached(event.link)
def on_connection_error(self, event):
- raise ConnectionClosed(event.connection)
+ if not self.connection.closing:
+ raise ConnectionClosed(event.connection)
@property
def has_message(self):
@@ -214,6 +218,7 @@ class BlockingConnection(Handler):
self.container.start()
self.url = Url(url).defaults()
self.conn = None
+ self.closing = False
failed = True
try:
self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
@@ -239,19 +244,23 @@ class BlockingConnection(Handler):
self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
def close(self):
- if not self.conn:
+ # TODO: provide stronger interrupt protection on cleanup. See PEP 419
+ if self.closing:
return
- self.conn.close()
+ self.closing = True
+ self.container.errors = []
try:
- self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
- msg="Closing connection")
+ if self.conn:
+ self.conn.close()
+ self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
+ msg="Closing connection")
finally:
self.conn.free()
- # For cleanup, reactor needs to process PN_CONNECTION_FINAL
- # and all events with embedded contexts must be drained.
- self.run() # will not block any more
+ # Nothing left to block on. Allow reactor to clean up.
+ self.run()
self.conn = None
self.container.global_handler = None # break circular ref: container to cadapter.on_error
+ pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
self.container = None
def _is_closed(self):
@@ -293,12 +302,14 @@ class BlockingConnection(Handler):
def on_link_remote_close(self, event):
if event.link.state & Endpoint.LOCAL_ACTIVE:
event.link.close()
- raise LinkDetached(event.link)
+ if not self.closing:
+ raise LinkDetached(event.link)
def on_connection_remote_close(self, event):
if event.connection.state & Endpoint.LOCAL_ACTIVE:
event.connection.close()
- raise ConnectionClosed(event.connection)
+ if not self.closing:
+ raise ConnectionClosed(event.connection)
def on_transport_tail_closed(self, event):
self.on_transport_closed(event)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org