You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/10/14 16:35:41 UTC
svn commit: r1631772 [3/3] - in /qpid/proton/branches/examples:
examples/engine/ examples/engine/java/ examples/engine/java/src/
examples/engine/java/src/main/ examples/engine/java/src/main/java/
examples/engine/java/src/main/java/org/ examples/engine/...
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Tue Oct 14 14:35:39 2014
@@ -18,44 +18,9 @@
#
import heapq, os, Queue, re, socket, time, types
from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout
-from proton import Message, ProtonException, Transport, TransportException, ConnectionException
+from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
from select import select
-class EventDispatcher(object):
-
- methods = {
- Event.CONNECTION_INIT: "on_connection_init",
- Event.CONNECTION_OPEN: "on_connection_local_open",
- Event.CONNECTION_REMOTE_OPEN: "on_connection_open",
- Event.CONNECTION_CLOSE: "on_connection_local_close",
- Event.CONNECTION_REMOTE_CLOSE: "on_connection_close",
- Event.CONNECTION_FINAL: "on_connection_final",
-
- Event.SESSION_INIT: "on_session_init",
- Event.SESSION_OPEN: "on_session_open",
- Event.SESSION_REMOTE_OPEN: "on_session_open",
- Event.SESSION_CLOSE: "on_session_local_close",
- Event.SESSION_REMOTE_CLOSE: "on_session_close",
- Event.SESSION_FINAL: "on_session_final",
-
- Event.LINK_INIT: "on_link_init",
- Event.LINK_OPEN: "on_link_local_open",
- Event.LINK_REMOTE_OPEN: "on_link_open",
- Event.LINK_CLOSE: "on_link_local_close",
- Event.LINK_REMOTE_CLOSE: "on_link_close",
- Event.LINK_FLOW: "on_link_flow",
- Event.LINK_FINAL: "on_link_final",
-
- Event.TRANSPORT: "on_transport",
- Event.DELIVERY: "on_delivery"
- }
-
- def dispatch(self, event):
- getattr(self, self.methods.get(event.type, "on_%s" % str(event.type)), self.unhandled)(event)
-
- def unhandled(self, event):
- pass
-
class AmqpConnection(object):
def __init__(self, conn, sock, events):
@@ -270,7 +235,7 @@ class Events(object):
def dispatch(self, event):
for d in self.dispatchers:
- d.dispatch(event)
+ event.dispatch(d)
@property
def next_interval(self):
@@ -280,9 +245,14 @@ class Events(object):
def empty(self):
return self.collector.peek() == None
+class ExtendedEventType(object):
+ def __init__(self, name):
+ self.name = name
+ self.method = "on_%s" % name
+
class ApplicationEvent(Event):
- def __init__(self, type, connection=None, session=None, link=None, delivery=None, subject=None):
- self.type = type
+ def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.type = ExtendedEventType(typename)
self.subject = subject
if delivery:
self.context = delivery
@@ -302,7 +272,7 @@ class ApplicationEvent(Event):
def __repr__(self):
objects = [self.context, self.subject]
- return "%s(%s)" % (self.type,
+ return "%s(%s)" % (self.type.name,
", ".join([str(o) for o in objects if o is not None]))
class ScheduledEvents(Events):
@@ -399,41 +369,41 @@ class SelectLoop(object):
return False
-class Handshaker(EventDispatcher):
+class Handshaker(Handler):
- def on_connection_open(self, event):
+ def on_connection_remote_open(self, event):
conn = event.connection
if conn.state & Endpoint.LOCAL_UNINIT:
conn.open()
- def on_session_open(self, event):
+ def on_session_remote_open(self, event):
ssn = event.session
if ssn.state & Endpoint.LOCAL_UNINIT:
ssn.open()
- def on_link_open(self, event):
+ def on_link_remote_open(self, event):
link = event.link
if link.state & Endpoint.LOCAL_UNINIT:
link.source.copy(link.remote_source)
link.target.copy(link.remote_target)
link.open()
- def on_connection_close(self, event):
+ def on_connection_remote_close(self, event):
conn = event.connection
if not (conn.state & Endpoint.LOCAL_CLOSED):
conn.close()
- def on_session_close(self, event):
+ def on_session_remote_close(self, event):
ssn = event.session
if not (ssn.state & Endpoint.LOCAL_CLOSED):
ssn.close()
- def on_link_close(self, event):
+ def on_link_remote_close(self, event):
link = event.link
if not (link.state & Endpoint.LOCAL_CLOSED):
link.close()
-class FlowController(EventDispatcher):
+class FlowController(Handler):
def __init__(self, window=1):
self.window = window
@@ -446,7 +416,7 @@ class FlowController(EventDispatcher):
if event.link.is_receiver:
self.top_up(event.link)
- def on_link_open(self, event):
+ def on_link_remote_open(self, event):
if event.link.is_receiver:
self.top_up(event.link)
@@ -458,7 +428,7 @@ class FlowController(EventDispatcher):
if not event.delivery.released and event.delivery.link.is_receiver:
self.top_up(event.delivery.link)
-class ScopedDispatcher(EventDispatcher):
+class ScopedHandler(Handler):
scopes = {
"pn_connection": ["connection"],
@@ -467,59 +437,17 @@ class ScopedDispatcher(EventDispatcher):
"pn_delivery": ["delivery", "link", "session", "connection"]
}
- def dispatch(self, event):
+ def on_unhandled(self, event):
if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
return
- method = self.methods.get(event.type, "on_%s" % str(event.type))
objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])]
targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
- handlers = [getattr(t, method) for t in targets if hasattr(t, method)]
+ handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)]
for h in handlers:
h(event)
-class ErrorHandler(EventDispatcher):
- def was_closed_by_peer(self, endpoint, parent=None):
- if parent:
- return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint)
- else:
- return endpoint.state & Endpoint.LOCAL_ACTIVE and endpoint.state & Endpoint.REMOTE_CLOSED
-
- def treat_as_error(self, endpoint, parent=None):
- return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent)
-
- def print_error(self, endpoint, endpoint_type):
- if endpoint.remote_condition:
- print endpoint.remote_condition.description
- elif self.was_closed_by_peer(endpoint):
- print "%s closed by peer" % endpoint_type
-
- def on_link_close(self, event):
- if self.treat_as_error(event.link, event.connection):
- self.on_link_error(event)
+class OutgoingMessageHandler(Handler):
- def on_session_close(self, event):
- if self.treat_as_error(event.session, event.connection):
- self.on_session_error(event)
-
- def on_connection_close(self, event):
- if self.treat_as_error(event.connection):
- self.on_connection_error(event)
-
- def on_connection_error(self, event):
- self.print_error(event.connection, "connection")
- event.connection.close()
-
- def on_session_error(self, event):
- self.print_error(event.session, "session")
- event.session.close()
- event.connection.close()
-
- def on_link_error(self, event):
- self.print_error(event.link, "link")
- event.link.close()
- event.connection.close()
-
-class OutgoingMessageHandler(EventDispatcher):
def on_link_flow(self, event):
if event.link.is_sender and event.link.credit:
self.on_credit(event)
@@ -561,7 +489,7 @@ class Reject(ProtonException):
"""
pass
-class IncomingMessageHandler(EventDispatcher):
+class IncomingMessageHandler(Handler):
def on_delivery(self, event):
dlv = event.delivery
if dlv.released or not dlv.link.is_receiver: return
@@ -599,9 +527,110 @@ class IncomingMessageHandler(EventDispat
def on_settled(self, event): pass
def auto_accept(self): return True
-class BaseHandler(ErrorHandler, IncomingMessageHandler, OutgoingMessageHandler):
+class ClientEndpointHandler(Handler):
+
+ def is_local_open(self, endpoint):
+ return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+ def is_remote_open(self, endpoint):
+ return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+ def is_remote_closed(self, endpoint):
+ return endpoint.state & Endpoint.REMOTE_CLOSED
+
+ def was_closed_by_peer(self, endpoint, parent=None):
+ if parent:
+ return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint)
+ else:
+ return self.is_local_open(endpoint) and self.is_remote_closed(endpoint)
+
+ def treat_as_error(self, endpoint, parent=None):
+ return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent)
+
+ def print_error(self, endpoint, endpoint_type):
+ if endpoint.remote_condition:
+ print endpoint.remote_condition.description
+ elif self.was_closed_by_peer(endpoint):
+ print "%s closed by peer" % endpoint_type
+
+ def on_link_remote_close(self, event):
+ if self.treat_as_error(event.link, event.connection):
+ self.on_link_error(event)
+ else:
+ self.on_link_closed(event)
+
+ def on_session_remote_close(self, event):
+ if self.treat_as_error(event.session, event.connection):
+ self.on_session_error(event)
+ else:
+ self.on_session_closed(event)
+
+ def on_connection_remote_close(self, event):
+ if self.treat_as_error(event.connection):
+ self.on_connection_error(event)
+ else:
+ self.on_connection_closed(event)
+
+ def on_connection_error(self, event):
+ self.print_error(event.connection, "connection")
+ event.connection.close()
+
+ def on_session_error(self, event):
+ self.print_error(event.session, "session")
+ event.session.close()
+ event.connection.close()
+
+ def on_link_error(self, event):
+ self.print_error(event.link, "link")
+ event.link.close()
+ event.connection.close()
+
+ def on_connection_local_open(self, event):
+ if self.is_remote_open(event.connection):
+ self.on_connection_opened(event)
+
+ def on_connection_remote_open(self, event):
+ if self.is_local_open(event.connection):
+ self.on_connection_opened(event)
+
+ def on_session_local_open(self, event):
+ if self.is_remote_open(event.session):
+ self.on_session_opened(event)
+
+ def on_session_remote_open(self, event):
+ if self.is_local_open(event.session):
+ self.on_session_opened(event)
+
+ def on_link_local_open(self, event):
+ if self.is_remote_open(event.link):
+ self.on_link_opened(event)
+
+ def on_link_remote_open(self, event):
+ if self.is_local_open(event.link):
+ self.on_link_opened(event)
+
+ def on_connection_opened(self, event):
+ pass
+
+ def on_session_opened(self, event):
+ pass
+
+ def on_link_opened(self, event):
+ pass
+
+ def on_connection_closed(self, event):
+ pass
+
+ def on_session_closed(self, event):
+ pass
+
+ def on_link_closed(self, event):
+ pass
+
+class ClientHandler(ClientEndpointHandler, IncomingMessageHandler, OutgoingMessageHandler):
+
def __init__(self):
- super(BaseHandler, self).__init__()
+ super(ClientHandler, self).__init__()
def on_delivery(self, event):
IncomingMessageHandler.on_delivery(self, event)
@@ -689,11 +718,11 @@ class MessagingContext(object):
ssn.open()
return ssn
- def on_session_close(self, event):
+ def on_session_remote_close(self, event):
if self.conn:
self.conn.close()
-class Connector(EventDispatcher):
+class Connector(Handler):
def attach_to(self, loop):
self.loop = loop
@@ -706,7 +735,7 @@ class Connector(EventDispatcher):
if hasattr(event.connection, "address"):
self._connect(event.connection)
- def on_connection_open(self, event):
+ def on_connection_remote_open(self, event):
if hasattr(event.connection, "reconnect"):
event.connection.reconnect.reset()
@@ -816,9 +845,10 @@ class Urls(object):
class EventLoop(object):
def __init__(self, *handlers):
self.connector = Connector()
- l = [ScopedDispatcher(), self.connector]
- if handlers: l += handlers
- else: l.append(FlowController(10))
+ if handlers:
+ l = handlers + (self.connector, ScopedHandler())
+ else:
+ l = [FlowController(10), self.connector, ScopedHandler()]
self.events = ScheduledEvents(*l)
self.loop = SelectLoop(self.events)
self.connector.attach_to(self)
@@ -904,10 +934,10 @@ class BlockingReceiver(BlockingLink):
super(BlockingReceiver, self).__init__(connection, receiver)
if credit: receiver.flow(credit)
-class BlockingConnection(EventDispatcher):
+class BlockingConnection(Handler):
def __init__(self, url, timeout=None):
self.timeout = timeout
- self.events = Events(ScopedDispatcher())
+ self.events = Events(ScopedHandler())
self.loop = SelectLoop(self.events)
self.context = MessagingContext(self.loop.events.connection(), handler=self)
if isinstance(url, basestring):
@@ -951,11 +981,11 @@ class BlockingConnection(EventDispatcher
if msg: txt += ": " + msg
raise Timeout(txt)
- def on_link_close(self, event):
+ def on_link_remote_close(self, event):
if event.link.state & Endpoint.LOCAL_ACTIVE:
self.closed(event.link.remote_condition)
- def on_connection_close(self, event):
+ def on_connection_remote_close(self, event):
if event.connection.state & Endpoint.LOCAL_ACTIVE:
self.closed(event.connection.remote_condition)
Modified: qpid/proton/branches/examples/tutorial/proton_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_tornado.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_tornado.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_tornado.py Tue Oct 14 14:35:39 2014
@@ -18,7 +18,7 @@
# under the License.
#
-from proton_events import ApplicationEvent, Connector, EventLoop, Events, FlowController, MessagingContext, ScopedDispatcher, Url
+from proton_events import ApplicationEvent, EventLoop
import tornado.ioloop
class TornadoLoop(EventLoop):
Modified: qpid/proton/branches/examples/tutorial/recurring_timer.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/recurring_timer.py (original)
+++ qpid/proton/branches/examples/tutorial/recurring_timer.py Tue Oct 14 14:35:39 2014
@@ -19,9 +19,9 @@
#
import time
-from proton_events import EventLoop, EventDispatcher
+from proton_events import EventLoop, Handler
-class Recurring(EventDispatcher):
+class Recurring(Handler):
def __init__(self, period):
self.eventloop = EventLoop(self)
self.period = period
Modified: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py (original)
+++ qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py Tue Oct 14 14:35:39 2014
@@ -19,10 +19,10 @@
#
import time
-from proton_events import EventDispatcher
+from proton_events import Handler
from proton_tornado import TornadoLoop
-class Recurring(EventDispatcher):
+class Recurring(Handler):
def __init__(self, period):
self.eventloop = TornadoLoop(self)
self.period = period
Modified: qpid/proton/branches/examples/tutorial/server.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/server.py (original)
+++ qpid/proton/branches/examples/tutorial/server.py Tue Oct 14 14:35:39 2014
@@ -19,9 +19,9 @@
#
from proton import Message
-from proton_events import EventLoop, IncomingMessageHandler
+from proton_events import EventLoop, ClientHandler
-class Server(IncomingMessageHandler):
+class Server(ClientHandler):
def __init__(self, eventloop, host, address):
self.eventloop = eventloop
self.conn = eventloop.connect(host, handler=self)
@@ -38,14 +38,10 @@ class Server(IncomingMessageHandler):
self.senders[event.message.reply_to] = sender
sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
- def on_connection_open(self, event):
+ def on_connection_opened(self, event):
if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
self.relay = self.conn.create_sender(None)
- def on_connection_close(self, endpoint, error):
- if error: print "Closed due to %s" % error
- self.conn.close()
-
def run(self):
self.eventloop.run()
Modified: qpid/proton/branches/examples/tutorial/simple_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_recv.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_recv.py Tue Oct 14 14:35:39 2014
@@ -20,7 +20,7 @@
import proton_events
-class Recv(proton_events.BaseHandler):
+class Recv(proton_events.ClientHandler):
def on_message(self, event):
print event.message.body
Modified: qpid/proton/branches/examples/tutorial/simple_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_send.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_send.py Tue Oct 14 14:35:39 2014
@@ -21,7 +21,7 @@
from proton import Message
import proton_events
-class Send(proton_events.BaseHandler):
+class Send(proton_events.ClientHandler):
def __init__(self, messages):
self.sent = 0
self.confirmed = 0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org