You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2019/04/29 16:14:11 UTC

[qpid-proton] branch master updated: PROTON-2026: [Python] Try to reconnect if connect initially fails - Also use non blocking connect and listen - Add a test for reconnect connect behaviour

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 713c3aa  PROTON-2026: [Python] Try to reconnect if connect initially fails - Also use non blocking connect and listen - Add a test for reconnect connect behaviour
713c3aa is described below

commit 713c3aa0a2423b34ebffb8a2b2a43f0271966e4b
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Apr 24 00:03:29 2019 -0400

    PROTON-2026: [Python] Try to reconnect if connect initially fails
    - Also use non blocking connect and listen
    - Add a test for reconnect connect behaviour
---
 python/proton/__init__.py            | 10 ++++-
 python/proton/_compat.py             |  4 +-
 python/proton/_handlers.py           | 85 ++++++++++++++++++++++++++++++------
 python/proton/_io.py                 | 42 +++++++++++++-----
 python/proton/_reactor.py            |  5 +--
 python/proton/_selectable.py         | 12 +++--
 python/proton/_transport.py          | 14 ++++--
 python/tests/proton_tests/reactor.py | 61 +++++++++++++++++++++++++-
 8 files changed, 192 insertions(+), 41 deletions(-)

diff --git a/python/proton/__init__.py b/python/proton/__init__.py
index 774b97b..e40fac7 100644
--- a/python/proton/__init__.py
+++ b/python/proton/__init__.py
@@ -31,6 +31,8 @@ The proton APIs consist of the following classes:
 from __future__ import absolute_import
 
 import logging
+import logging.config
+import os
 
 from cproton import PN_VERSION_MAJOR, PN_VERSION_MINOR, PN_VERSION_POINT
 
@@ -127,5 +129,9 @@ class NullHandler(logging.Handler):
 
 handler = NullHandler()
 
-log = logging.getLogger("proton")
-log.addHandler(handler)
+logconfigfile = os.getenv('PNPY_LOGGER_CONFIG', None)
+if logconfigfile:
+    logging.config.fileConfig(logconfigfile, None, False)
+else:
+    log = logging.getLogger("proton")
+    log.addHandler(handler)
diff --git a/python/proton/_compat.py b/python/proton/_compat.py
index 7db8174..7380334 100644
--- a/python/proton/_compat.py
+++ b/python/proton/_compat.py
@@ -52,7 +52,7 @@ if PY3:
     def iteritems(d, **kw):
         return iter(d.items(**kw))
 
-    def select_errno(e):
+    def socket_errno(e):
         return e.errno
 
     unichr = chr
@@ -66,7 +66,7 @@ else:
     def iteritems(d, **kw):
         return d.iteritems(**kw)
 
-    def select_errno(e):
+    def socket_errno(e):
         return e[0]
 
     unichr = unichr
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index c946a3d..566e010 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -19,16 +19,20 @@
 
 from __future__ import absolute_import
 
+import errno
 import logging
+import socket
 import time
 import weakref
 
+from ._condition import Condition
 from ._delivery import Delivery
 from ._endpoints import Endpoint
-from ._events import Handler, _dispatch
+from ._events import Event, Handler, _dispatch
 from ._exceptions import ProtonException
 from ._io import IO
 from ._message import Message
+from ._selectable import Selectable
 from ._transport import Transport
 from ._url import Url
 
@@ -795,8 +799,9 @@ class IOHandler(Handler):
                 else:
                     # EOF handling
                     self.on_selectable_error(event)
-            except:
+            except socket.error as e:
                 # TODO: What's the error handling to be here?
+                log.error("Couldn't recv: %r" % e)
                 t.close_tail()
 
         # Always update as we may have gone to not reading or from
@@ -819,7 +824,8 @@ class IOHandler(Handler):
             try:
                 n = s.send(t.peek(pending))
                 t.pop(n)
-            except:
+            except socket.error as e:
+                log.error("Couldn't send: %r" % e)
                 # TODO: Error? or actually an exception
                 t.close_head()
 
@@ -873,7 +879,7 @@ class IOHandler(Handler):
         url.defaults()
 
         host = url.host
-        port = url.port
+        port = int(url.port)
 
         if not c.user:
             user = url.username
@@ -883,14 +889,20 @@ class IOHandler(Handler):
             if password:
                 c.password = password
 
-        # TODO Currently this is synch and will throw if it cannot connect
-        # do we want to handle errors differently? or do it asynch?
-        sock = IO.connect(host, int(port))
+        addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
 
-        s = reactor.selectable(delegate=sock)
-        s._transport = t
-        t._selectable = s
-        self.update(t, s, reactor.now)
+        # Try first possible address
+        log.debug("Connect trying first transport address: %s", addrs[0])
+        sock = IO.connect(addrs[0])
+
+        # At this point we need to arrange to be called back when the socket is writable
+        connector = ConnectSelectable(sock, reactor, addrs[1:], t, self)
+        connector.collect(reactor._collector)
+        connector.writing = True
+        connector.push_event(connector, Event.SELECTABLE_INIT)
+
+        # TODO: Don't understand why we need this now - how can we get PN_TRANSPORT until the connection succeeds?
+        t._selectable = None
 
     @staticmethod
     def update(transport, selectable, now):
@@ -920,6 +932,53 @@ class IOHandler(Handler):
         t = event.transport
         r = t._reactor
         s = t._selectable
-        s.terminate()
-        r.update(s)
+        if s and not s.is_terminal:
+            s.terminate()
+            r.update(s)
         t.unbind()
+
+
+class ConnectSelectable(Selectable):
+    def __init__(self, sock, reactor, addrs, transport, iohandler):
+        super(ConnectSelectable, self).__init__(sock, reactor)
+        self._addrs = addrs
+        self._transport = transport
+        self._iohandler = iohandler
+
+    def readable(self):
+        pass
+
+    def writable(self):
+        e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+        t = self._transport
+        if e == 0:
+            log.debug("Connection succeeded")
+            s = self._reactor.selectable(delegate=self._delegate)
+            s._transport = t
+            t._selectable = s
+            self._iohandler.update(t, s, t._reactor.now)
+
+            # Disassociate from the socket (which has been passed on)
+            self._delegate = None
+            self.terminate()
+            self.update()
+            return
+        elif e == errno.ECONNREFUSED:
+            if len(self._addrs) > 0:
+                log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
+                sock = IO.connect(self._addrs[0])
+                self._addrs = self._addrs[1:]
+                self._delegate.close()
+                self._delegate = sock
+                return
+            else:
+                log.debug("Connection refused, but tried all transport addresses")
+                t.condition = Condition("proton.pythonio", "Connection refused to all addresses")
+        else:
+            log.error("Couldn't connect: %s", e)
+            t.condition = Condition("proton.pythonio", "Connection error: %s" % e)
+
+        t.close_tail()
+        t.close_head()
+        self.terminate()
+        self.update()
diff --git a/python/proton/_io.py b/python/proton/_io.py
index 97eba50..ac15b20 100644
--- a/python/proton/_io.py
+++ b/python/proton/_io.py
@@ -23,30 +23,46 @@ import errno
 import socket
 import select
 import time
-from ._compat import select_errno
+
+from ._compat import socket_errno
 
 PN_INVALID_SOCKET = -1
 
 class IO(object):
 
     @staticmethod
+    def _setupsocket(s):
+        s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
+        s.setblocking(False)
+
+    @staticmethod
     def close(s):
         s.close()
 
     @staticmethod
     def listen(host, port):
         s = socket.socket()
+        IO._setupsocket(s)
         s.bind((host, port))
         s.listen(10)
         return s
 
     @staticmethod
     def accept(s):
-        return s.accept()
+        n = s.accept()
+        IO._setupsocket(n[0])
+        return n
 
     @staticmethod
-    def connect(host, port):
-        return socket.create_connection((host, port))
+    def connect(addr):
+        s = socket.socket(addr[0], addr[1], addr[2])
+        IO._setupsocket(s)
+        try:
+            s.connect(addr[4])
+        except socket.error as e:
+            if socket_errno(e) not in (errno.EINPROGRESS, errno.EWOULDBLOCK, errno.EAGAIN):
+                raise
+        return s
 
     @staticmethod
     def select(*args, **kwargs):
@@ -107,6 +123,9 @@ class IO(object):
         def select(self, timeout):
 
             def select_inner(timeout):
+                # This inner select adds the writing fds to the exception fd set
+                # because Windows returns connected fds in the exception set not the
+                # writable set
                 r = self._reading
                 w = self._writing
 
@@ -114,32 +133,35 @@ class IO(object):
 
                 # No timeout or deadline
                 if timeout is None and self._deadline is None:
-                    return IO.select(r, w, [])
+                    return IO.select(r, w, w)
 
                 if timeout is None:
                     t = max(0, self._deadline - now)
-                    return IO.select(r, w, [], t)
+                    return IO.select(r, w, w, t)
 
                 if self._deadline is None:
-                    return IO.select(r, w, [], timeout)
+                    return IO.select(r, w, w, timeout)
 
                 t = max(0, min(timeout, self._deadline - now))
                 if len(r)==0 and len(w)==0:
                     if t > 0: IO.sleep(t)
                     return ([],[],[])
 
-                return IO.select(r, w, [], t)
+                return IO.select(r, w, w, t)
 
             # Need to allow for signals interrupting us on Python 2
             # In this case the signal handler could have messed up our internal state
             # so don't retry just return with no handles.
             try:
-                r, w, _ = select_inner(timeout)
+                r, w, ex = select_inner(timeout)
             except select.error as e:
-                if select_errno(e) != errno.EINTR:
+                if socket_errno(e) != errno.EINTR:
                     raise
                 r, w = ([], [])
 
+            # For windows non blocking connect we get exception not writable so add exceptions to writable
+            w += ex
+
             # Calculate timed out selectables
             now = time.time()
             t = [s for s in self._selectables if s.deadline and now > s.deadline]
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 60a1a2c..628cdd5 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -349,7 +349,7 @@ class Reactor(object):
         result = Selectable(delegate, self)
         result.collect(self._collector)
         result.handler = handler
-        self.push_event(result, Event.SELECTABLE_INIT)
+        result.push_event(result, Event.SELECTABLE_INIT)
         return result
 
     def update(self, selectable):
@@ -773,10 +773,9 @@ class Connector(Handler):
         _logger.debug("connected to %s" % event.connection.hostname)
         if self.reconnect:
             self.reconnect.reset()
-            self.transport = None
 
     def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
+        event.transport.close_head()
 
     def on_transport_closed(self, event):
         if self.connection is None: return
diff --git a/python/proton/_selectable.py b/python/proton/_selectable.py
index 2125f7d..1839953 100644
--- a/python/proton/_selectable.py
+++ b/python/proton/_selectable.py
@@ -63,7 +63,8 @@ class Selectable(object):
         self._collector = collector
 
     def push_event(self, context, type):
-        self._collector.put(context, type)
+        if self._collector:
+            self._collector.put(context, type)
 
     def update(self):
         if not self._terminated:
@@ -74,16 +75,13 @@ class Selectable(object):
                 self.push_event(self, Event.SELECTABLE_UPDATED)
 
     def readable(self):
-        if self._collector:
-            self.push_event(self, Event.SELECTABLE_READABLE)
+        self.push_event(self, Event.SELECTABLE_READABLE)
 
     def writable(self):
-        if self._collector:
-            self.push_event(self, Event.SELECTABLE_WRITABLE)
+        self.push_event(self, Event.SELECTABLE_WRITABLE)
 
     def expired(self):
-        if self._collector:
-            self.push_event(self, Event.SELECTABLE_EXPIRED)
+        self.push_event(self, Event.SELECTABLE_EXPIRED)
 
     @property
     def is_terminal(self):
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
index ddd08c7..eb61a22 100644
--- a/python/proton/_transport.py
+++ b/python/proton/_transport.py
@@ -43,7 +43,7 @@ from cproton import PN_EOS, PN_OK, PN_SASL_AUTH, PN_SASL_NONE, PN_SASL_OK, PN_SA
     pn_transport_tick, pn_transport_trace, pn_transport_unbind
 
 from ._common import millis2secs, secs2millis, unicode2utf8, utf82unicode
-from ._condition import cond2obj
+from ._condition import cond2obj, obj2cond
 from ._exceptions import EXCEPTIONS, SSLException, SSLUnavailable, SessionException, TransportException
 from ._wrapper import Wrapper
 
@@ -256,10 +256,18 @@ The idle timeout of the connection (float, in seconds).
             self._ssl = SSL(self, domain, session_details)
         return self._ssl
 
-    @property
-    def condition(self):
+    def _get_condition(self):
         return cond2obj(pn_transport_condition(self._impl))
 
+    def _set_condition(self, cond):
+        pn_cond = pn_transport_condition(self._impl)
+        obj2cond(cond, pn_cond)
+
+    condition = property(_get_condition, _set_condition,
+                         doc="""
+The error condition (if any) of the transport.
+""")
+
     @property
     def connection(self):
         from . import _endpoints
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 1586d45..99ea996 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -423,7 +423,6 @@ class ContainerTest(Test):
         def __init__(self, host):
             super(ContainerTest._ServerHandler, self).__init__()
             self.host = host
-            port = free_tcp_port()
             self.port = free_tcp_port()
             self.client_addr = None
             self.peer_hostname = None
@@ -500,6 +499,66 @@ class ContainerTest(Test):
         container.run()
         assert server_handler.peer_hostname is None, server_handler.peer_hostname
 
+    class _ReconnectServerHandler(MessagingHandler):
+        def __init__(self, host, listen_on_error=True):
+            super(ContainerTest._ReconnectServerHandler, self).__init__()
+            self.host = host
+            self.port = free_tcp_port()
+            self.client_addr = None
+            self.peer_hostname = None
+            self.listen_on_error = listen_on_error
+
+        def on_connection_opened(self, event):
+            self.client_addr = event.connected_address
+            self.peer_hostname = event.connection.remote_hostname
+            self.listener.close()
+
+        def on_connection_closing(self, event):
+            event.connection.close()
+
+        def listen(self, container):
+            if self.listen_on_error:
+                self.listener = container.listen("%s:%s" % (self.host, self.port))
+
+    class _ReconnectClientHandler(MessagingHandler):
+        def __init__(self, server_handler):
+            super(ContainerTest._ReconnectClientHandler, self).__init__()
+            self.connect_failed = False
+            self.server_addr = None
+            self.server_handler = server_handler
+
+        def on_connection_opened(self, event):
+            self.server_addr = event.connected_address
+            event.connection.close()
+
+        def on_transport_error(self, event):
+            assert self.connect_failed == False
+            self.connect_failed = True
+            self.server_handler.listen(event.container)
+
+    def test_reconnect(self):
+        server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True)
+        client_handler = ContainerTest._ReconnectClientHandler(server_handler)
+        container = Container(server_handler)
+        container.connect(url=Url(host="localhost", port=server_handler.port),
+                          handler=client_handler)
+        container.run()
+        assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
+        assert client_handler.connect_failed
+        assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr
+
+    def test_not_reconnecting(self):
+        server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False)
+        client_handler = ContainerTest._ReconnectClientHandler(server_handler)
+        container = Container(server_handler)
+        container.connect(url=Url(host="localhost", port=server_handler.port),
+                          handler=client_handler, reconnect=False)
+        container.run()
+        assert server_handler.peer_hostname == None, server_handler.peer_hostname
+        assert client_handler.connect_failed
+        assert client_handler.server_addr == None, client_handler.server_addr
+
+
 class SelectorTest(Test):
     """Test the Selector"""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org