You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2019/04/29 16:14:11 UTC
[qpid-proton] branch master updated: PROTON-2026: [Python] Try to
reconnect if connect initially fails - Also use non blocking connect and
listen - Add a test for reconnect connect behaviour
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 713c3aa PROTON-2026: [Python] Try to reconnect if connect initially fails - Also use non blocking connect and listen - Add a test for reconnect connect behaviour
713c3aa is described below
commit 713c3aa0a2423b34ebffb8a2b2a43f0271966e4b
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Apr 24 00:03:29 2019 -0400
PROTON-2026: [Python] Try to reconnect if connect initially fails
- Also use non blocking connect and listen
- Add a test for reconnect connect behaviour
---
python/proton/__init__.py | 10 ++++-
python/proton/_compat.py | 4 +-
python/proton/_handlers.py | 85 ++++++++++++++++++++++++++++++------
python/proton/_io.py | 42 +++++++++++++-----
python/proton/_reactor.py | 5 +--
python/proton/_selectable.py | 12 +++--
python/proton/_transport.py | 14 ++++--
python/tests/proton_tests/reactor.py | 61 +++++++++++++++++++++++++-
8 files changed, 192 insertions(+), 41 deletions(-)
diff --git a/python/proton/__init__.py b/python/proton/__init__.py
index 774b97b..e40fac7 100644
--- a/python/proton/__init__.py
+++ b/python/proton/__init__.py
@@ -31,6 +31,8 @@ The proton APIs consist of the following classes:
from __future__ import absolute_import
import logging
+import logging.config
+import os
from cproton import PN_VERSION_MAJOR, PN_VERSION_MINOR, PN_VERSION_POINT
@@ -127,5 +129,9 @@ class NullHandler(logging.Handler):
handler = NullHandler()
-log = logging.getLogger("proton")
-log.addHandler(handler)
+logconfigfile = os.getenv('PNPY_LOGGER_CONFIG', None)
+if logconfigfile:
+ logging.config.fileConfig(logconfigfile, None, False)
+else:
+ log = logging.getLogger("proton")
+ log.addHandler(handler)
diff --git a/python/proton/_compat.py b/python/proton/_compat.py
index 7db8174..7380334 100644
--- a/python/proton/_compat.py
+++ b/python/proton/_compat.py
@@ -52,7 +52,7 @@ if PY3:
def iteritems(d, **kw):
return iter(d.items(**kw))
- def select_errno(e):
+ def socket_errno(e):
return e.errno
unichr = chr
@@ -66,7 +66,7 @@ else:
def iteritems(d, **kw):
return d.iteritems(**kw)
- def select_errno(e):
+ def socket_errno(e):
return e[0]
unichr = unichr
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index c946a3d..566e010 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -19,16 +19,20 @@
from __future__ import absolute_import
+import errno
import logging
+import socket
import time
import weakref
+from ._condition import Condition
from ._delivery import Delivery
from ._endpoints import Endpoint
-from ._events import Handler, _dispatch
+from ._events import Event, Handler, _dispatch
from ._exceptions import ProtonException
from ._io import IO
from ._message import Message
+from ._selectable import Selectable
from ._transport import Transport
from ._url import Url
@@ -795,8 +799,9 @@ class IOHandler(Handler):
else:
# EOF handling
self.on_selectable_error(event)
- except:
+ except socket.error as e:
# TODO: What's the error handling to be here?
+ log.error("Couldn't recv: %r" % e)
t.close_tail()
# Always update as we may have gone to not reading or from
@@ -819,7 +824,8 @@ class IOHandler(Handler):
try:
n = s.send(t.peek(pending))
t.pop(n)
- except:
+ except socket.error as e:
+ log.error("Couldn't send: %r" % e)
# TODO: Error? or actually an exception
t.close_head()
@@ -873,7 +879,7 @@ class IOHandler(Handler):
url.defaults()
host = url.host
- port = url.port
+ port = int(url.port)
if not c.user:
user = url.username
@@ -883,14 +889,20 @@ class IOHandler(Handler):
if password:
c.password = password
- # TODO Currently this is synch and will throw if it cannot connect
- # do we want to handle errors differently? or do it asynch?
- sock = IO.connect(host, int(port))
+ addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
- s = reactor.selectable(delegate=sock)
- s._transport = t
- t._selectable = s
- self.update(t, s, reactor.now)
+ # Try first possible address
+ log.debug("Connect trying first transport address: %s", addrs[0])
+ sock = IO.connect(addrs[0])
+
+ # At this point we need to arrange to be called back when the socket is writable
+ connector = ConnectSelectable(sock, reactor, addrs[1:], t, self)
+ connector.collect(reactor._collector)
+ connector.writing = True
+ connector.push_event(connector, Event.SELECTABLE_INIT)
+
+ # TODO: Don't understand why we need this now - how can we get PN_TRANSPORT until the connection succeeds?
+ t._selectable = None
@staticmethod
def update(transport, selectable, now):
@@ -920,6 +932,53 @@ class IOHandler(Handler):
t = event.transport
r = t._reactor
s = t._selectable
- s.terminate()
- r.update(s)
+ if s and not s.is_terminal:
+ s.terminate()
+ r.update(s)
t.unbind()
+
+
+class ConnectSelectable(Selectable):
+ def __init__(self, sock, reactor, addrs, transport, iohandler):
+ super(ConnectSelectable, self).__init__(sock, reactor)
+ self._addrs = addrs
+ self._transport = transport
+ self._iohandler = iohandler
+
+ def readable(self):
+ pass
+
+ def writable(self):
+ e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ t = self._transport
+ if e == 0:
+ log.debug("Connection succeeded")
+ s = self._reactor.selectable(delegate=self._delegate)
+ s._transport = t
+ t._selectable = s
+ self._iohandler.update(t, s, t._reactor.now)
+
+ # Disassociate from the socket (which has been passed on)
+ self._delegate = None
+ self.terminate()
+ self.update()
+ return
+ elif e == errno.ECONNREFUSED:
+ if len(self._addrs) > 0:
+ log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
+ sock = IO.connect(self._addrs[0])
+ self._addrs = self._addrs[1:]
+ self._delegate.close()
+ self._delegate = sock
+ return
+ else:
+ log.debug("Connection refused, but tried all transport addresses")
+ t.condition = Condition("proton.pythonio", "Connection refused to all addresses")
+ else:
+ log.error("Couldn't connect: %s", e)
+ t.condition = Condition("proton.pythonio", "Connection error: %s" % e)
+
+ t.close_tail()
+ t.close_head()
+ self.terminate()
+ self.update()
diff --git a/python/proton/_io.py b/python/proton/_io.py
index 97eba50..ac15b20 100644
--- a/python/proton/_io.py
+++ b/python/proton/_io.py
@@ -23,30 +23,46 @@ import errno
import socket
import select
import time
-from ._compat import select_errno
+
+from ._compat import socket_errno
PN_INVALID_SOCKET = -1
class IO(object):
@staticmethod
+ def _setupsocket(s):
+ s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
+ s.setblocking(False)
+
+ @staticmethod
def close(s):
s.close()
@staticmethod
def listen(host, port):
s = socket.socket()
+ IO._setupsocket(s)
s.bind((host, port))
s.listen(10)
return s
@staticmethod
def accept(s):
- return s.accept()
+ n = s.accept()
+ IO._setupsocket(n[0])
+ return n
@staticmethod
- def connect(host, port):
- return socket.create_connection((host, port))
+ def connect(addr):
+ s = socket.socket(addr[0], addr[1], addr[2])
+ IO._setupsocket(s)
+ try:
+ s.connect(addr[4])
+ except socket.error as e:
+ if socket_errno(e) not in (errno.EINPROGRESS, errno.EWOULDBLOCK, errno.EAGAIN):
+ raise
+ return s
@staticmethod
def select(*args, **kwargs):
@@ -107,6 +123,9 @@ class IO(object):
def select(self, timeout):
def select_inner(timeout):
+ # This inner select adds the writing fds to the exception fd set
+ # because Windows returns connected fds in the exception set not the
+ # writable set
r = self._reading
w = self._writing
@@ -114,32 +133,35 @@ class IO(object):
# No timeout or deadline
if timeout is None and self._deadline is None:
- return IO.select(r, w, [])
+ return IO.select(r, w, w)
if timeout is None:
t = max(0, self._deadline - now)
- return IO.select(r, w, [], t)
+ return IO.select(r, w, w, t)
if self._deadline is None:
- return IO.select(r, w, [], timeout)
+ return IO.select(r, w, w, timeout)
t = max(0, min(timeout, self._deadline - now))
if len(r)==0 and len(w)==0:
if t > 0: IO.sleep(t)
return ([],[],[])
- return IO.select(r, w, [], t)
+ return IO.select(r, w, w, t)
# Need to allow for signals interrupting us on Python 2
# In this case the signal handler could have messed up our internal state
# so don't retry just return with no handles.
try:
- r, w, _ = select_inner(timeout)
+ r, w, ex = select_inner(timeout)
except select.error as e:
- if select_errno(e) != errno.EINTR:
+ if socket_errno(e) != errno.EINTR:
raise
r, w = ([], [])
+ # For windows non blocking connect we get exception not writable so add exceptions to writable
+ w += ex
+
# Calculate timed out selectables
now = time.time()
t = [s for s in self._selectables if s.deadline and now > s.deadline]
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 60a1a2c..628cdd5 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -349,7 +349,7 @@ class Reactor(object):
result = Selectable(delegate, self)
result.collect(self._collector)
result.handler = handler
- self.push_event(result, Event.SELECTABLE_INIT)
+ result.push_event(result, Event.SELECTABLE_INIT)
return result
def update(self, selectable):
@@ -773,10 +773,9 @@ class Connector(Handler):
_logger.debug("connected to %s" % event.connection.hostname)
if self.reconnect:
self.reconnect.reset()
- self.transport = None
def on_transport_tail_closed(self, event):
- self.on_transport_closed(event)
+ event.transport.close_head()
def on_transport_closed(self, event):
if self.connection is None: return
diff --git a/python/proton/_selectable.py b/python/proton/_selectable.py
index 2125f7d..1839953 100644
--- a/python/proton/_selectable.py
+++ b/python/proton/_selectable.py
@@ -63,7 +63,8 @@ class Selectable(object):
self._collector = collector
def push_event(self, context, type):
- self._collector.put(context, type)
+ if self._collector:
+ self._collector.put(context, type)
def update(self):
if not self._terminated:
@@ -74,16 +75,13 @@ class Selectable(object):
self.push_event(self, Event.SELECTABLE_UPDATED)
def readable(self):
- if self._collector:
- self.push_event(self, Event.SELECTABLE_READABLE)
+ self.push_event(self, Event.SELECTABLE_READABLE)
def writable(self):
- if self._collector:
- self.push_event(self, Event.SELECTABLE_WRITABLE)
+ self.push_event(self, Event.SELECTABLE_WRITABLE)
def expired(self):
- if self._collector:
- self.push_event(self, Event.SELECTABLE_EXPIRED)
+ self.push_event(self, Event.SELECTABLE_EXPIRED)
@property
def is_terminal(self):
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
index ddd08c7..eb61a22 100644
--- a/python/proton/_transport.py
+++ b/python/proton/_transport.py
@@ -43,7 +43,7 @@ from cproton import PN_EOS, PN_OK, PN_SASL_AUTH, PN_SASL_NONE, PN_SASL_OK, PN_SA
pn_transport_tick, pn_transport_trace, pn_transport_unbind
from ._common import millis2secs, secs2millis, unicode2utf8, utf82unicode
-from ._condition import cond2obj
+from ._condition import cond2obj, obj2cond
from ._exceptions import EXCEPTIONS, SSLException, SSLUnavailable, SessionException, TransportException
from ._wrapper import Wrapper
@@ -256,10 +256,18 @@ The idle timeout of the connection (float, in seconds).
self._ssl = SSL(self, domain, session_details)
return self._ssl
- @property
- def condition(self):
+ def _get_condition(self):
return cond2obj(pn_transport_condition(self._impl))
+ def _set_condition(self, cond):
+ pn_cond = pn_transport_condition(self._impl)
+ obj2cond(cond, pn_cond)
+
+ condition = property(_get_condition, _set_condition,
+ doc="""
+The error condition (if any) of the transport.
+""")
+
@property
def connection(self):
from . import _endpoints
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 1586d45..99ea996 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -423,7 +423,6 @@ class ContainerTest(Test):
def __init__(self, host):
super(ContainerTest._ServerHandler, self).__init__()
self.host = host
- port = free_tcp_port()
self.port = free_tcp_port()
self.client_addr = None
self.peer_hostname = None
@@ -500,6 +499,66 @@ class ContainerTest(Test):
container.run()
assert server_handler.peer_hostname is None, server_handler.peer_hostname
+ class _ReconnectServerHandler(MessagingHandler):
+ def __init__(self, host, listen_on_error=True):
+ super(ContainerTest._ReconnectServerHandler, self).__init__()
+ self.host = host
+ self.port = free_tcp_port()
+ self.client_addr = None
+ self.peer_hostname = None
+ self.listen_on_error = listen_on_error
+
+ def on_connection_opened(self, event):
+ self.client_addr = event.connected_address
+ self.peer_hostname = event.connection.remote_hostname
+ self.listener.close()
+
+ def on_connection_closing(self, event):
+ event.connection.close()
+
+ def listen(self, container):
+ if self.listen_on_error:
+ self.listener = container.listen("%s:%s" % (self.host, self.port))
+
+ class _ReconnectClientHandler(MessagingHandler):
+ def __init__(self, server_handler):
+ super(ContainerTest._ReconnectClientHandler, self).__init__()
+ self.connect_failed = False
+ self.server_addr = None
+ self.server_handler = server_handler
+
+ def on_connection_opened(self, event):
+ self.server_addr = event.connected_address
+ event.connection.close()
+
+ def on_transport_error(self, event):
+ assert self.connect_failed == False
+ self.connect_failed = True
+ self.server_handler.listen(event.container)
+
+ def test_reconnect(self):
+ server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True)
+ client_handler = ContainerTest._ReconnectClientHandler(server_handler)
+ container = Container(server_handler)
+ container.connect(url=Url(host="localhost", port=server_handler.port),
+ handler=client_handler)
+ container.run()
+ assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
+ assert client_handler.connect_failed
+ assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr
+
+ def test_not_reconnecting(self):
+ server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False)
+ client_handler = ContainerTest._ReconnectClientHandler(server_handler)
+ container = Container(server_handler)
+ container.connect(url=Url(host="localhost", port=server_handler.port),
+ handler=client_handler, reconnect=False)
+ container.run()
+ assert server_handler.peer_hostname == None, server_handler.peer_hostname
+ assert client_handler.connect_failed
+ assert client_handler.server_addr == None, client_handler.server_addr
+
+
class SelectorTest(Test):
"""Test the Selector"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org