You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/09/17 16:08:30 UTC
svn commit: r1625598 - in /qpid/proton/branches/examples/tutorial:
proton_events.py simple_recv.py simple_send.py
Author: gsim
Date: Wed Sep 17 14:08:30 2014
New Revision: 1625598
URL: http://svn.apache.org/r1625598
Log:
improved failover support
Modified:
qpid/proton/branches/examples/tutorial/proton_events.py
qpid/proton/branches/examples/tutorial/simple_recv.py
qpid/proton/branches/examples/tutorial/simple_send.py
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1625598&r1=1625597&r2=1625598&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Sep 17 14:08:30 2014
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-import heapq, os, Queue, socket, time, types
+import heapq, os, Queue, re, socket, time, types
from proton import Collector, Connection, Delivery, Endpoint, Event
from proton import Message, ProtonException, Transport, TransportException
from select import select
@@ -58,7 +58,8 @@ class EventDispatcher(object):
class Selectable(object):
- def __init__(self, conn, sock):
+ def __init__(self, conn, sock, events):
+ self.events = events
self.conn = conn
self.transport = Transport()
self.transport.bind(self.conn)
@@ -131,7 +132,8 @@ class Selectable(object):
if not self._closed_cleanly():
self.read_done = True
self.write_done = True
- self.transport.close_tail()
+ else:
+ self.transport.close_tail()
except TransportException, e:
print "Error on read: %s" % e
self.read_done = True
@@ -159,13 +161,9 @@ class Selectable(object):
self.write_done = True
def removed(self):
- if not self._closed_cleanly(): self.disconnected()
-
- def disconnected(self):
- if hasattr(self.conn, "context") and hasattr(self.conn.context, "on_disconnected"):
- self.conn.context.on_disconnected(self.conn)
- else:
- print "connection %s disconnected" % self.conn
+ if not self._closed_cleanly():
+ self.transport.unbind()
+ self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn))
class Acceptor:
@@ -202,7 +200,7 @@ class Acceptor:
def readable(self):
sock, addr = self.socket.accept()
if sock:
- self.selectables.append(Selectable(self.events.connection(), sock).accept())
+ self.selectables.append(Selectable(self.events.connection(), sock, self.events).accept())
def removed(self): pass
@@ -624,29 +622,147 @@ class MessagingContext(object):
if self.conn:
self.conn.close()
+class Connector(EventDispatcher):
+ def attach_to(self, loop):
+ self.loop = loop
+
+ def _connect(self, connection):
+ host, port = connection.address.next()
+ print "connecting to %s:%i" % (host, port)
+ self.loop.add(Selectable(connection, socket.socket(), self.loop.events).connect(host, port))
+
+ def on_connection_open(self, event):
+ if hasattr(event.connection, "address"):
+ self._connect(event.connection)
+
+ def on_connection_remote_open(self, event):
+ if hasattr(event.connection, "reconnect"):
+ event.connection.reconnect.reset()
+
+ def on_disconnected(self, event):
+ if hasattr(event.connection, "reconnect"):
+ delay = event.connection.reconnect.next()
+ if delay == 0:
+ print "Disconnected, reconnecting..."
+ self._connect(event.connection)
+ else:
+ print "Disconnected will try to reconnect after %s seconds" % delay
+ self.loop.schedule(time.time() + delay, connection=event.connection)
+
+ def on_timer(self, event):
+ self._connect(event.connection)
+
+class Backoff(object):
+ def __init__(self):
+ self.delay = 0
+
+ def reset(self):
+ self.delay = 0
+
+ def next(self):
+ current = self.delay
+ if current == 0:
+ self.delay = 0.1
+ else:
+ self.delay = min(10, 2*current)
+ return current
+
+class Url(object):
+ RE = re.compile(r"""
+ # [ <scheme>:// ] [ <user> [ / <password> ] @] ( <host4> | \[ <host6> \] ) [ :<port> ]
+ ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$
+""", re.X | re.I)
+
+ AMQPS = "amqps"
+ AMQP = "amqp"
+
+ def __init__(self, value):
+ match = Url.RE.match(value)
+ if match is None:
+ raise ValueError(value)
+ self.scheme, self.user, self.password, host4, host6, port = match.groups()
+ self.host = host4 or host6
+ if port is None:
+ self.port = None
+ else:
+ self.port = int(port)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ return (self.host, self.port)
+
+ def __repr__(self):
+ return "URL(%r)" % str(self)
+
+ def __str__(self):
+ s = ""
+ if self.scheme:
+ s += "%s://" % self.scheme
+ if self.user:
+ s += self.user
+ if self.password:
+ s += "/%s" % self.password
+ s += "@"
+ if ':' not in self.host:
+ s += self.host
+ else:
+ s += "[%s]" % self.host
+ if self.port:
+ s += ":%s" % self.port
+ return s
+
+ def __eq__(self, url):
+ if isinstance(url, basestring):
+ url = URL(url)
+ return \
+ self.scheme==url.scheme and \
+ self.user==url.user and self.password==url.password and \
+ self.host==url.host and self.port==url.port
+
+ def __ne__(self, url):
+ return not self.__eq__(url)
+
+class Urls(object):
+ def __init__(self, values):
+ self.values = [Url(v) for v in values]
+ self.i = iter(self.values)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ try:
+ return self.i.next()
+ except StopIteration:
+ self.i = iter(self.values)
+ return self.i.next()
class EventLoop(object):
def __init__(self, *handlers):
- l = [ScopedDispatcher()]
+ self.connector = Connector()
+ l = [ScopedDispatcher(), self.connector]
if handlers: l += handlers
else: l.append(FlowController(10))
self.events = ScheduledEvents(*l)
self.loop = SelectLoop(self.events)
+ self.connector.attach_to(self)
self.trigger = None
- def connect(self, url, name=None, handler=None):
- identifier = name or url
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
context = MessagingContext(self.loop.events.connection(), handler=handler)
- host, port = url.split(":")
- if port: port = int(port)
- context.conn.hostname = host
- self.loop.add(Selectable(context.conn, socket.socket()).connect(host, port))
+ if url: context.conn.address = Url(url)
+ elif urls: context.conn.address = Urls(urls)
+ elif address: context.conn.address = address
+ else: raise ValueError("One of url, urls or address required")
+ if reconnect:
+ context.conn.reconnect = reconnect
context.conn.open()
return context
def listen(self, url):
- host, port = url.split(":")
- if port: port = int(port)
+ host, port = Url(url).next()
return Acceptor(self.loop.events, self.loop.selectables, host, port)
def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
@@ -658,6 +774,12 @@ class EventLoop(object):
self.loop.selectables.append(self.trigger)
return self.trigger
+ def add(self, selectable):
+ self.loop.add(selectable)
+
+ def remove(self, selectable):
+ self.loop.remove(selectable)
+
def run(self):
self.loop.run()
@@ -694,7 +816,7 @@ class BlockingConnection(EventDispatcher
self.context = MessagingContext(self.loop.events.connection(), handler=self)
host, port = url.split(":")
if port: port = int(port)
- self.loop.add(Selectable(self.context.conn, socket.socket()).connect(host, port))
+ self.loop.add(Selectable(self.context.conn, socket.socket(), self.events).connect(host, port))
self.context.conn.open()
while self.context.conn.state & Endpoint.REMOTE_UNINIT:
self.loop.do_work()
@@ -721,7 +843,7 @@ class BlockingConnection(EventDispatcher
if event.connection.state & Endpoint.LOCAL_ACTIVE:
self.closed(event.connection.remote_condition)
- def on_disconnected(self, connection):
+ def on_disconnected(self, event):
raise Exception("Disconnected");
def closed(self, error=None):
Modified: qpid/proton/branches/examples/tutorial/simple_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1625598&r1=1625597&r2=1625598&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_recv.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_recv.py Wed Sep 17 14:08:30 2014
@@ -19,26 +19,17 @@
#
import time
-from proton_events import IncomingMessageHandler, EventLoop
+from proton_events import Backoff, EventLoop, IncomingMessageHandler
class Recv(IncomingMessageHandler):
def __init__(self, eventloop, host, address):
self.eventloop = eventloop
- self.host = host
- self.address = address
- self.delay = 0
- self.connect()
-
- def connect(self):
- self.conn = self.eventloop.connect(self.host, handler=self)
+ self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff())
+ self.conn.receiver(address)
def on_message(self, event):
print event.message.body
- def on_connection_remote_open(self, event):
- self.delay = 0
- self.conn.receiver(self.address)
-
def on_link_remote_close(self, event):
self.closed(event.link.remote_condition)
@@ -50,20 +41,6 @@ class Recv(IncomingMessageHandler):
print "Closed due to %s" % error
self.conn.close()
- def on_disconnected(self, conn):
- if self.delay == 0:
- self.delay = 0.1
- print "Disconnected, reconnecting..."
- self.connect()
- else:
- print "Disconnected will try to reconnect after %d seconds" % self.delay
- self.eventloop.schedule(time.time() + self.delay, connection=conn)
- self.delay = min(10, 2*self.delay)
-
- def on_timer(self, event):
- print "Reconnecting..."
- self.connect()
-
def run(self):
self.eventloop.run()
Modified: qpid/proton/branches/examples/tutorial/simple_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1625598&r1=1625597&r2=1625598&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_send.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_send.py Wed Sep 17 14:08:30 2014
@@ -20,7 +20,7 @@
import time
from proton import Message
-from proton_events import OutgoingMessageHandler, EventLoop
+from proton_events import Backoff, EventLoop, OutgoingMessageHandler
class Send(OutgoingMessageHandler):
def __init__(self, eventloop, host, address, messages):
@@ -28,13 +28,8 @@ class Send(OutgoingMessageHandler):
self.sent = 0
self.confirmed = 0
self.total = messages
- self.address = address
- self.host = host
- self.delay = 0
- self.connect()
-
- def connect(self):
- self.conn = self.eventloop.connect(self.host, handler=self)
+ self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff())
+ self.sender = self.conn.sender(address)
def on_link_flow(self, event):
for i in range(self.sender.credit):
@@ -56,9 +51,7 @@ class Send(OutgoingMessageHandler):
def on_connection_remote_open(self, event):
self.sent = self.confirmed
- self.sender = self.conn.sender(self.address)
- self.sender.offered(self.total)
- self.delay = 0
+ self.sender.offered(self.total - self.sent)
def on_link_remote_close(self, event):
self.closed(event.link.remote_condition)
@@ -71,20 +64,6 @@ class Send(OutgoingMessageHandler):
print "Closed due to %s" % error
self.conn.close()
- def on_disconnected(self, conn):
- if self.delay == 0:
- self.delay = 0.1
- print "Disconnected, reconnecting..."
- self.connect()
- else:
- print "Disconnected will try to reconnect after %d seconds" % self.delay
- self.eventloop.schedule(time.time() + self.delay, connection=conn)
- self.delay = min(10, 2*self.delay)
-
- def on_timer(self, event):
- print "Reconnecting..."
- self.connect()
-
def run(self):
self.eventloop.run()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org