You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/09/17 16:08:30 UTC

svn commit: r1625598 - in /qpid/proton/branches/examples/tutorial: proton_events.py simple_recv.py simple_send.py

Author: gsim
Date: Wed Sep 17 14:08:30 2014
New Revision: 1625598

URL: http://svn.apache.org/r1625598
Log:
improved failover support

Modified:
    qpid/proton/branches/examples/tutorial/proton_events.py
    qpid/proton/branches/examples/tutorial/simple_recv.py
    qpid/proton/branches/examples/tutorial/simple_send.py

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1625598&r1=1625597&r2=1625598&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Sep 17 14:08:30 2014
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import heapq, os, Queue, socket, time, types
+import heapq, os, Queue, re, socket, time, types
 from proton import Collector, Connection, Delivery, Endpoint, Event
 from proton import Message, ProtonException, Transport, TransportException
 from select import select
@@ -58,7 +58,8 @@ class EventDispatcher(object):
 
 class Selectable(object):
 
-    def __init__(self, conn, sock):
+    def __init__(self, conn, sock, events):
+        self.events = events
         self.conn = conn
         self.transport = Transport()
         self.transport.bind(self.conn)
@@ -131,7 +132,8 @@ class Selectable(object):
                     if not self._closed_cleanly():
                         self.read_done = True
                         self.write_done = True
-                    self.transport.close_tail()
+                    else:
+                        self.transport.close_tail()
             except TransportException, e:
                 print "Error on read: %s" % e
                 self.read_done = True
@@ -159,13 +161,9 @@ class Selectable(object):
             self.write_done = True
 
     def removed(self):
-        if not self._closed_cleanly(): self.disconnected()
-
-    def disconnected(self):
-        if hasattr(self.conn, "context") and hasattr(self.conn.context, "on_disconnected"):
-            self.conn.context.on_disconnected(self.conn)
-        else:
-            print "connection %s disconnected" % self.conn
+        if not self._closed_cleanly():
+            self.transport.unbind()
+            self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn))
 
 class Acceptor:
 
@@ -202,7 +200,7 @@ class Acceptor:
     def readable(self):
         sock, addr = self.socket.accept()
         if sock:
-            self.selectables.append(Selectable(self.events.connection(), sock).accept())
+            self.selectables.append(Selectable(self.events.connection(), sock, self.events).accept())
 
     def removed(self): pass
 
@@ -624,29 +622,147 @@ class MessagingContext(object):
         if self.conn:
             self.conn.close()
 
+class Connector(EventDispatcher):
+    def attach_to(self, loop):
+        self.loop = loop
+
+    def _connect(self, connection):
+        host, port = connection.address.next()
+        print "connecting to %s:%i" % (host, port)
+        self.loop.add(Selectable(connection, socket.socket(), self.loop.events).connect(host, port))
+
+    def on_connection_open(self, event):
+        if hasattr(event.connection, "address"):
+            self._connect(event.connection)
+
+    def on_connection_remote_open(self, event):
+        if hasattr(event.connection, "reconnect"):
+            event.connection.reconnect.reset()
+
+    def on_disconnected(self, event):
+        if hasattr(event.connection, "reconnect"):
+            delay = event.connection.reconnect.next()
+            if delay == 0:
+                print "Disconnected, reconnecting..."
+                self._connect(event.connection)
+            else:
+                print "Disconnected will try to reconnect after %s seconds" % delay
+                self.loop.schedule(time.time() + delay, connection=event.connection)
+
+    def on_timer(self, event):
+        self._connect(event.connection)
+
+class Backoff(object):
+    def __init__(self):
+        self.delay = 0
+
+    def reset(self):
+        self.delay = 0
+
+    def next(self):
+        current = self.delay
+        if current == 0:
+            self.delay = 0.1
+        else:
+            self.delay = min(10, 2*current)
+        return current
+
+class Url(object):
+    RE = re.compile(r"""
+        # [   <scheme>://  ] [    <user>   [   / <password>   ] @]    ( <host4>     | \[    <host6>    \] )  [   :<port>   ]
+        ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+)   )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$
+""", re.X | re.I)
+
+    AMQPS = "amqps"
+    AMQP = "amqp"
+
+    def __init__(self, value):
+        match = Url.RE.match(value)
+        if match is None:
+            raise ValueError(value)
+        self.scheme, self.user, self.password, host4, host6, port = match.groups()
+        self.host = host4 or host6
+        if port is None:
+            self.port = None
+        else:
+            self.port = int(port)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        return (self.host, self.port)
+
+    def __repr__(self):
+        return "URL(%r)" % str(self)
+
+    def __str__(self):
+        s = ""
+        if self.scheme:
+            s += "%s://" % self.scheme
+        if self.user:
+            s += self.user
+            if self.password:
+                s += "/%s" % self.password
+                s += "@"
+        if ':' not in self.host:
+            s += self.host
+        else:
+            s += "[%s]" % self.host
+        if self.port:
+            s += ":%s" % self.port
+        return s
+
+    def __eq__(self, url):
+        if isinstance(url, basestring):
+            url = URL(url)
+        return \
+            self.scheme==url.scheme and \
+            self.user==url.user and self.password==url.password and \
+            self.host==url.host and self.port==url.port
+
+    def __ne__(self, url):
+        return not self.__eq__(url)
+
+class Urls(object):
+    def __init__(self, values):
+        self.values = [Url(v) for v in values]
+        self.i = iter(self.values)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        try:
+            return self.i.next()
+        except StopIteration:
+            self.i = iter(self.values)
+            return self.i.next()
 
 class EventLoop(object):
     def __init__(self, *handlers):
-        l = [ScopedDispatcher()]
+        self.connector = Connector()
+        l = [ScopedDispatcher(), self.connector]
         if handlers: l += handlers
         else: l.append(FlowController(10))
         self.events = ScheduledEvents(*l)
         self.loop = SelectLoop(self.events)
+        self.connector.attach_to(self)
         self.trigger = None
 
-    def connect(self, url, name=None, handler=None):
-        identifier = name or url
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
         context = MessagingContext(self.loop.events.connection(), handler=handler)
-        host, port = url.split(":")
-        if port: port = int(port)
-        context.conn.hostname = host
-        self.loop.add(Selectable(context.conn, socket.socket()).connect(host, port))
+        if url: context.conn.address = Url(url)
+        elif urls: context.conn.address = Urls(urls)
+        elif address: context.conn.address = address
+        else: raise ValueError("One of url, urls or address required")
+        if reconnect:
+            context.conn.reconnect = reconnect
         context.conn.open()
         return context
 
     def listen(self, url):
-        host, port = url.split(":")
-        if port: port = int(port)
+        host, port = Url(url).next()
         return Acceptor(self.loop.events, self.loop.selectables, host, port)
 
     def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
@@ -658,6 +774,12 @@ class EventLoop(object):
             self.loop.selectables.append(self.trigger)
         return self.trigger
 
+    def add(self, selectable):
+        self.loop.add(selectable)
+
+    def remove(self, selectable):
+        self.loop.remove(selectable)
+
     def run(self):
         self.loop.run()
 
@@ -694,7 +816,7 @@ class BlockingConnection(EventDispatcher
         self.context = MessagingContext(self.loop.events.connection(), handler=self)
         host, port = url.split(":")
         if port: port = int(port)
-        self.loop.add(Selectable(self.context.conn, socket.socket()).connect(host, port))
+        self.loop.add(Selectable(self.context.conn, socket.socket(), self.events).connect(host, port))
         self.context.conn.open()
         while self.context.conn.state & Endpoint.REMOTE_UNINIT:
             self.loop.do_work()
@@ -721,7 +843,7 @@ class BlockingConnection(EventDispatcher
         if event.connection.state & Endpoint.LOCAL_ACTIVE:
             self.closed(event.connection.remote_condition)
 
-    def on_disconnected(self, connection):
+    def on_disconnected(self, event):
         raise Exception("Disconnected");
 
     def closed(self, error=None):

Modified: qpid/proton/branches/examples/tutorial/simple_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1625598&r1=1625597&r2=1625598&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_recv.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_recv.py Wed Sep 17 14:08:30 2014
@@ -19,26 +19,17 @@
 #
 
 import time
-from proton_events import IncomingMessageHandler, EventLoop
+from proton_events import Backoff, EventLoop, IncomingMessageHandler
 
 class Recv(IncomingMessageHandler):
     def __init__(self, eventloop, host, address):
         self.eventloop = eventloop
-        self.host = host
-        self.address = address
-        self.delay = 0
-        self.connect()
-
-    def connect(self):
-        self.conn = self.eventloop.connect(self.host, handler=self)
+        self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff())
+        self.conn.receiver(address)
 
     def on_message(self, event):
         print event.message.body
 
-    def on_connection_remote_open(self, event):
-        self.delay = 0
-        self.conn.receiver(self.address)
-
     def on_link_remote_close(self, event):
         self.closed(event.link.remote_condition)
 
@@ -50,20 +41,6 @@ class Recv(IncomingMessageHandler):
             print "Closed due to %s" % error
         self.conn.close()
 
-    def on_disconnected(self, conn):
-        if self.delay == 0:
-            self.delay = 0.1
-            print "Disconnected, reconnecting..."
-            self.connect()
-        else:
-            print "Disconnected will try to reconnect after %d seconds" % self.delay
-            self.eventloop.schedule(time.time() + self.delay, connection=conn)
-            self.delay = min(10, 2*self.delay)
-
-    def on_timer(self, event):
-        print "Reconnecting..."
-        self.connect()
-
     def run(self):
         self.eventloop.run()
 

Modified: qpid/proton/branches/examples/tutorial/simple_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1625598&r1=1625597&r2=1625598&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_send.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_send.py Wed Sep 17 14:08:30 2014
@@ -20,7 +20,7 @@
 
 import time
 from proton import Message
-from proton_events import OutgoingMessageHandler, EventLoop
+from proton_events import Backoff, EventLoop, OutgoingMessageHandler
 
 class Send(OutgoingMessageHandler):
     def __init__(self, eventloop, host, address, messages):
@@ -28,13 +28,8 @@ class Send(OutgoingMessageHandler):
         self.sent = 0
         self.confirmed = 0
         self.total = messages
-        self.address = address
-        self.host = host
-        self.delay = 0
-        self.connect()
-
-    def connect(self):
-        self.conn = self.eventloop.connect(self.host, handler=self)
+        self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff())
+        self.sender = self.conn.sender(address)
 
     def on_link_flow(self, event):
         for i in range(self.sender.credit):
@@ -56,9 +51,7 @@ class Send(OutgoingMessageHandler):
 
     def on_connection_remote_open(self, event):
         self.sent = self.confirmed
-        self.sender = self.conn.sender(self.address)
-        self.sender.offered(self.total)
-        self.delay = 0
+        self.sender.offered(self.total - self.sent)
 
     def on_link_remote_close(self, event):
         self.closed(event.link.remote_condition)
@@ -71,20 +64,6 @@ class Send(OutgoingMessageHandler):
             print "Closed due to %s" % error
         self.conn.close()
 
-    def on_disconnected(self, conn):
-        if self.delay == 0:
-            self.delay = 0.1
-            print "Disconnected, reconnecting..."
-            self.connect()
-        else:
-            print "Disconnected will try to reconnect after %d seconds" % self.delay
-            self.eventloop.schedule(time.time() + self.delay, connection=conn)
-            self.delay = min(10, 2*self.delay)
-
-    def on_timer(self, event):
-        print "Reconnecting..."
-        self.connect()
-
     def run(self):
         self.eventloop.run()
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org