You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/10/01 19:37:09 UTC
svn commit: r1628781 - in /qpid/proton/branches/examples/tutorial:
db_recv.py db_send.py helloworld.py helloworld_alt.py helloworld_direct.py
helloworld_direct_alt.py helloworld_simple.py proton_events.py
simple_recv.py simple_send.py
Author: gsim
Date: Wed Oct 1 17:37:09 2014
New Revision: 1628781
URL: http://svn.apache.org/r1628781
Log:
Some simplifications
Modified:
qpid/proton/branches/examples/tutorial/db_recv.py
qpid/proton/branches/examples/tutorial/db_send.py
qpid/proton/branches/examples/tutorial/helloworld.py
qpid/proton/branches/examples/tutorial/helloworld_alt.py
qpid/proton/branches/examples/tutorial/helloworld_direct.py
qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py
qpid/proton/branches/examples/tutorial/helloworld_simple.py
qpid/proton/branches/examples/tutorial/proton_events.py
qpid/proton/branches/examples/tutorial/simple_recv.py
qpid/proton/branches/examples/tutorial/simple_send.py
Modified: qpid/proton/branches/examples/tutorial/db_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_recv.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_recv.py (original)
+++ qpid/proton/branches/examples/tutorial/db_recv.py Wed Oct 1 17:37:09 2014
@@ -18,23 +18,20 @@
# under the License.
#
-import time
-from proton_events import ApplicationEvent, IncomingMessageHandler, EventLoop, FlowController
+from proton_events import ApplicationEvent, BaseHandler, EventLoop
from db_common import Db
-class Recv(IncomingMessageHandler):
+class Recv(BaseHandler):
def __init__(self, host, address):
- self.eventloop = EventLoop()#self, FlowController(10))
+ self.eventloop = EventLoop()
self.host = host
self.address = address
self.delay = 0
self.db = Db("dst_db", self.eventloop.get_event_trigger())
# TODO: load last tag from db
self.last_id = None
- self.connect()
-
- def connect(self):
self.conn = self.eventloop.connect(self.host, handler=self)
+ self.conn.receiver(self.address)
def auto_accept(self): return False
@@ -50,35 +47,6 @@ class Recv(IncomingMessageHandler):
else:
self.accept(event.delivery)
- def on_connection_remote_open(self, event):
- self.delay = 0
- self.conn.receiver(self.address)
-
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
-
- def on_disconnected(self, conn):
- if self.delay == 0:
- self.delay = 0.1
- print "Disconnected, reconnecting..."
- self.connect()
- else:
- print "Disconnected will try to reconnect after %d seconds" % self.delay
- self.eventloop.schedule(time.time() + self.delay, connection=conn)
- self.delay = min(10, 2*self.delay)
-
- def on_timer(self, event):
- print "Reconnecting..."
- self.connect()
-
def run(self):
self.eventloop.run()
Modified: qpid/proton/branches/examples/tutorial/db_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_send.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_send.py (original)
+++ qpid/proton/branches/examples/tutorial/db_send.py Wed Oct 1 17:37:09 2014
@@ -21,10 +21,10 @@
import Queue
import time
from proton import Message
-from proton_events import ApplicationEvent, EventLoop, OutgoingMessageHandler
+from proton_events import ApplicationEvent, BaseHandler, EventLoop
from db_common import Db
-class Send(OutgoingMessageHandler):
+class Send(BaseHandler):
def __init__(self, host, address):
self.eventloop = EventLoop()
self.address = address
@@ -33,10 +33,8 @@ class Send(OutgoingMessageHandler):
self.sent = 0
self.records = Queue.Queue(maxsize=50)
self.db = Db("src_db", self.eventloop.get_event_trigger())
- self.connect()
-
- def connect(self):
self.conn = self.eventloop.connect(self.host, handler=self)
+ self.sender = self.conn.sender(self.address)
def on_records_loaded(self, event):
if self.records.empty() and event.subject == self.sent:
@@ -50,7 +48,7 @@ class Send(OutgoingMessageHandler):
if not self.records.full():
self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.sent))
- def on_link_flow(self, event):
+ def on_credit(self, event):
self.send()
def send(self):
@@ -67,37 +65,11 @@ class Send(OutgoingMessageHandler):
self.db.delete(id)
print "settled message %s" % id
- def on_connection_remote_open(self, event):
+ def on_disconnected(self, event):
self.db.reset()
- self.sender = self.conn.sender(self.address)
- self.delay = 0
-
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
-
- def on_disconnected(self, conn):
- if self.delay == 0:
- self.delay = 0.1
- print "Disconnected, reconnecting..."
- self.connect()
- else:
- print "Disconnected will try to reconnect after %d seconds" % self.delay
- self.eventloop.schedule(time.time() + self.delay, connection=conn, subject="reconnect")
- self.delay = min(10, 2*self.delay)
def on_timer(self, event):
- if event.subject == "reconnect":
- print "Reconnecting..."
- self.connect()
- elif event.subject == "data":
+ if event.subject == "data":
print "Rechecking for data..."
self.request_records()
Modified: qpid/proton/branches/examples/tutorial/helloworld.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld.py (original)
+++ qpid/proton/branches/examples/tutorial/helloworld.py Wed Oct 1 17:37:09 2014
@@ -19,44 +19,30 @@
#
from proton import Message
-from proton_events import EventLoop, IncomingMessageHandler
+from proton_events import ErrorHandler, EventLoop, IncomingMessageHandler, OutgoingMessageHandler
class HelloWorldReceiver(IncomingMessageHandler):
def on_message(self, event):
print event.message.body
event.connection.close()
-class HelloWorldSender(object):
- def on_link_flow(self, event):
+class HelloWorldSender(OutgoingMessageHandler):
+ def on_credit(self, event):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
-class HelloWorld(object):
- def __init__(self, eventloop, url, address):
- self.eventloop = eventloop
- self.conn = eventloop.connect(url, handler=self)
+class HelloWorld(ErrorHandler):
+ def __init__(self, url, address):
+ self.eventloop = EventLoop()
+ self.conn = self.eventloop.connect(url, handler=self)
self.address = address
def on_connection_remote_open(self, event):
self.conn.receiver(self.address, handler=HelloWorldReceiver())
-
- def on_link_remote_open(self, event):
- if event.link.is_receiver:
- self.conn.sender(self.address, handler=HelloWorldSender())
-
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
+ self.conn.sender(self.address, handler=HelloWorldSender())
def run(self):
self.eventloop.run()
-HelloWorld(EventLoop(), "localhost:5672", "examples").run()
+HelloWorld("localhost:5672", "examples").run()
Modified: qpid/proton/branches/examples/tutorial/helloworld_alt.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_alt.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_alt.py (original)
+++ qpid/proton/branches/examples/tutorial/helloworld_alt.py Wed Oct 1 17:37:09 2014
@@ -19,19 +19,18 @@
#
from proton import Message
-from proton_events import EventLoop, IncomingMessageHandler
+import proton_events
-class HelloWorld(IncomingMessageHandler):
- def __init__(self, eventloop, url, address):
- self.eventloop = eventloop
- self.conn = eventloop.connect(url, handler=self)
+class HelloWorld(proton_events.BaseHandler):
+ def __init__(self, conn, address):
+ self.conn = conn
self.address = address
def on_connection_remote_open(self, event):
self.conn.receiver(self.address)
self.conn.sender(self.address)
- def on_link_flow(self, event):
+ def on_credit(self, event):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
@@ -39,19 +38,7 @@ class HelloWorld(IncomingMessageHandler)
print event.message.body
event.connection.close()
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
-
- def run(self):
- self.eventloop.run()
-
-HelloWorld(EventLoop(), "localhost:5672", "examples").run()
+conn = proton_events.connect("localhost:5672")
+conn.handler=HelloWorld(conn, "examples")
+proton_events.run()
Modified: qpid/proton/branches/examples/tutorial/helloworld_direct.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_direct.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_direct.py (original)
+++ qpid/proton/branches/examples/tutorial/helloworld_direct.py Wed Oct 1 17:37:09 2014
@@ -19,19 +19,19 @@
#
from proton import Message
-from proton_events import EventLoop, FlowController, Handshaker, IncomingMessageHandler
+from proton_events import ErrorHandler, EventLoop, FlowController, Handshaker, IncomingMessageHandler, OutgoingMessageHandler
class HelloWorldReceiver(IncomingMessageHandler):
def on_message(self, event):
print event.message.body
event.connection.close()
-class HelloWorldSender(object):
- def on_link_flow(self, event):
+class HelloWorldSender(OutgoingMessageHandler):
+ def on_credit(self, event):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
-class HelloWorld(object):
+class HelloWorld(ErrorHandler):
def __init__(self, eventloop, url, address):
self.eventloop = eventloop
self.acceptor = eventloop.listen(url)
@@ -41,15 +41,7 @@ class HelloWorld(object):
def on_connection_remote_open(self, event):
self.conn.sender(self.address, handler=HelloWorldSender())
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
self.conn.close()
self.acceptor.close()
Modified: qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py (original)
+++ qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py Wed Oct 1 17:37:09 2014
@@ -19,14 +19,14 @@
#
from proton import Message
-from proton_events import EventLoop, FlowController, Handshaker, IncomingMessageHandler
+from proton_events import ErrorHandler, EventLoop, FlowController, Handshaker, IncomingMessageHandler, OutgoingMessageHandler
class HelloWorldReceiver(IncomingMessageHandler):
def on_message(self, event):
print event.message.body
event.connection.close()
-class HelloWorld(object):
+class HelloWorld(ErrorHandler, OutgoingMessageHandler):
def __init__(self, eventloop, url, address):
self.eventloop = eventloop
self.acceptor = eventloop.listen(url)
@@ -40,15 +40,7 @@ class HelloWorld(object):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
self.conn.close()
self.acceptor.close()
Modified: qpid/proton/branches/examples/tutorial/helloworld_simple.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_simple.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_simple.py (original)
+++ qpid/proton/branches/examples/tutorial/helloworld_simple.py Wed Oct 1 17:37:09 2014
@@ -19,39 +19,20 @@
#
from proton import Message
-from proton_events import EventLoop, IncomingMessageHandler
+import proton_events
-class HelloWorldReceiver(IncomingMessageHandler):
- def on_message(self, event):
- print event.message.body
- event.connection.close()
-
-class HelloWorldSender(object):
- def on_link_flow(self, event):
+class HelloWorld(proton_events.BaseHandler):
+ def on_credit(self, event):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
-class HelloWorld(object):
- def __init__(self, eventloop, url, address):
- self.eventloop = eventloop
- self.conn = eventloop.connect(url, handler=self)
- self.address = address
- self.conn.receiver(self.address, handler=HelloWorldReceiver())
- self.conn.sender(self.address, handler=HelloWorldSender())
-
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
+ def on_message(self, event):
+ print event.message.body
+ event.connection.close()
- def run(self):
- self.eventloop.run()
+conn = proton_events.connect("localhost:5672", handler=HelloWorld())
+conn.receiver("examples")
+conn.sender("examples")
+proton_events.run()
-HelloWorld(EventLoop(), "localhost:5672", "examples").run()
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 1 17:37:09 2014
@@ -404,14 +404,17 @@ class SelectLoop(object):
timeout = 0
if self.events.next_interval and (timeout is None or self.events.next_interval < timeout):
timeout = self.events.next_interval
- readable, writable, _ = select(reading, writing, [], timeout)
+ if reading or writing or timeout:
+ readable, writable, _ = select(reading, writing, [], timeout)
- for s in readable:
- s.readable()
- for s in writable:
- s.writable()
+ for s in readable:
+ s.readable()
+ for s in writable:
+ s.writable()
- return bool(readable or writable)
+ return bool(readable or writable)
+ else:
+ return False
class Handshaker(EventDispatcher):
@@ -490,11 +493,54 @@ class ScopedDispatcher(EventDispatcher):
for h in handlers:
h(event)
+class ErrorHandler(EventDispatcher):
+ def was_closed_by_peer(self, endpoint):
+ return endpoint.state & Endpoint.LOCAL_ACTIVE and endpoint.state & Endpoint.REMOTE_CLOSED
+
+ def treat_as_error(self, endpoint):
+ return endpoint.remote_condition or self.was_closed_by_peer(endpoint)
+
+ def print_error(self, endpoint, endpoint_type):
+ if endpoint.remote_condition:
+ print endpoint.remote_condition.description
+ elif self.was_closed_by_peer(endpoint):
+ print "%s closed by peer" % endpoint_type
+
+ def on_link_remote_close(self, event):
+ if self.treat_as_error(event.link):
+ self.on_link_error(event)
+
+ def on_session_remote_close(self, event):
+ if self.treat_as_error(event.session):
+ self.on_session_error(event)
+
+ def on_connection_remote_close(self, event):
+ if self.treat_as_error(event.connection):
+ self.on_connection_error(event)
+
+ def on_connection_error(self, event):
+ self.print_error(event.connection, "connection")
+ event.connection.close()
+
+ def on_session_error(self, event):
+ self.print_error(event.session, "session")
+ event.session.close()
+ event.connection.close()
+
+ def on_link_error(self, event):
+ self.print_error(event.link, "link")
+ event.link.close()
+ event.connection.close()
+
class OutgoingMessageHandler(EventDispatcher):
+ def on_link_flow(self, event):
+ if event.link.is_sender and event.link.credit:
+ self.on_credit(event)
+
def on_delivery(self, event):
dlv = event.delivery
link = dlv.link
- if dlv.updated and not hasattr(dlv, "_been_settled"):
+ if link.is_sender and dlv.updated and not hasattr(dlv, "_been_settled"):
if dlv.remote_state == Delivery.ACCEPTED:
self.on_accepted(event)
elif dlv.remote_state == Delivery.REJECTED:
@@ -509,6 +555,7 @@ class OutgoingMessageHandler(EventDispat
dlv._been_settled = True
dlv.settle()
+ def on_credit(self, event): pass
def on_accepted(self, event): pass
def on_rejected(self, event): pass
def on_released(self, event): pass
@@ -566,6 +613,14 @@ class IncomingMessageHandler(EventDispat
def on_settled(self, event): pass
def auto_accept(self): return True
+class BaseHandler(ErrorHandler, IncomingMessageHandler, OutgoingMessageHandler):
+ def __init__(self):
+ super(BaseHandler, self).__init__()
+
+ def on_delivery(self, event):
+ IncomingMessageHandler.on_delivery(self, event)
+ OutgoingMessageHandler.on_delivery(self, event)
+
def delivery_tags():
count = 1
while True:
@@ -591,6 +646,14 @@ class MessagingContext(object):
self.conn._mc = self
self.ssn = ssn
+ def _get_handler(self):
+ return self.conn.context
+
+ def _set_handler(self, value):
+ self.conn.context = value
+
+ handler = property(_get_handler, _set_handler)
+
def sender(self, target, source=None, name=None, handler=None, tags=None):
snd = self._get_ssn().sender(name or self._get_id(target, source))
if source:
@@ -670,6 +733,8 @@ class Connector(EventDispatcher):
else:
print "Disconnected will try to reconnect after %s seconds" % delay
self.loop.schedule(time.time() + delay, connection=event.connection, subject=self)
+ else:
+ print "Disconnected"
def on_timer(self, event):
if event.subject == self and event.connection:
@@ -781,6 +846,8 @@ class EventLoop(object):
else: raise ValueError("One of url, urls or address required")
if reconnect:
context.conn.reconnect = reconnect
+ elif reconnect is None:
+ context.conn.reconnect = Backoff()
context.conn.open()
return context
@@ -812,6 +879,18 @@ class EventLoop(object):
def do_work(self, timeout=None):
return self.loop.do_work(timeout)
+EventLoop.DEFAULT = EventLoop()
+
+def connect(url=None, urls=None, address=None, handler=None, reconnect=None, eventloop=None):
+ if not eventloop:
+ eventloop = EventLoop.DEFAULT
+ return eventloop.connect(url=url, urls=urls, address=address, handler=handler, reconnect=reconnect)
+
+def run(eventloop=None):
+ if not eventloop:
+ eventloop = EventLoop.DEFAULT
+ eventloop.run()
+
class BlockingLink(object):
def __init__(self, connection, link):
self.connection = connection
Modified: qpid/proton/branches/examples/tutorial/simple_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_recv.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_recv.py Wed Oct 1 17:37:09 2014
@@ -18,34 +18,16 @@
# under the License.
#
-import time
-from proton_events import Backoff, EventLoop, IncomingMessageHandler
-
-class Recv(IncomingMessageHandler):
- def __init__(self, eventloop, host, address):
- self.eventloop = eventloop
- self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff())
- self.conn.receiver(address)
+import proton_events
+class Recv(proton_events.BaseHandler):
def on_message(self, event):
print event.message.body
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
-
- def run(self):
- self.eventloop.run()
-
try:
- Recv(EventLoop(), "localhost:5672", "examples").run()
+ conn = proton_events.connect("localhost:5672", handler=Recv())
+ conn.receiver("examples")
+ proton_events.run()
except KeyboardInterrupt: pass
Modified: qpid/proton/branches/examples/tutorial/simple_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1628781&r1=1628780&r2=1628781&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_send.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_send.py Wed Oct 1 17:37:09 2014
@@ -18,54 +18,32 @@
# under the License.
#
-import time
from proton import Message
-from proton_events import Backoff, EventLoop, OutgoingMessageHandler
+import proton_events
-class Send(OutgoingMessageHandler):
- def __init__(self, eventloop, host, address, messages):
- self.eventloop = eventloop
+class Send(proton_events.BaseHandler):
+ def __init__(self, messages):
self.sent = 0
self.confirmed = 0
self.total = messages
- self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff())
- self.sender = self.conn.sender(address)
- def on_link_flow(self, event):
- for i in range(self.sender.credit):
- if self.sent == self.total:
- self.sender.drained()
- break
- msg = Message(body={'sequence':self.sent})
- self.sender.send_msg(msg, handler=self)
+ def on_credit(self, event):
+ while event.link.credit and self.sent < self.total:
+ msg = Message(body={'sequence':(self.sent+1)})
+ event.link.send_msg(msg)
self.sent += 1
def on_accepted(self, event):
- """
- Stop the application once all of the messages are sent and acknowledged,
- """
self.confirmed += 1
if self.confirmed == self.total:
- self.sender.close()
- self.conn.close()
+ print "all messages confirmed"
+ event.connection.close()
- def on_connection_remote_open(self, event):
+ def on_disconnected(self, event):
self.sent = self.confirmed
- self.sender.offered(self.total - self.sent)
-
- def on_link_remote_close(self, event):
- self.closed(event.link.remote_condition)
-
- def on_connection_remote_close(self, event):
- self.closed(event.connection.remote_condition)
-
- def closed(self, error=None):
- if error:
- print "Closed due to %s" % error
- self.conn.close()
-
- def run(self):
- self.eventloop.run()
-
-Send(EventLoop(), "localhost:5672", "examples", 10000).run()
+try:
+ conn = proton_events.connect("localhost:5672", handler=Send(10000))
+ conn.sender("examples")
+ proton_events.run()
+except KeyboardInterrupt: pass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org