You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/10/14 16:35:41 UTC

svn commit: r1631772 [3/3] - in /qpid/proton/branches/examples: examples/engine/ examples/engine/java/ examples/engine/java/src/ examples/engine/java/src/main/ examples/engine/java/src/main/java/ examples/engine/java/src/main/java/org/ examples/engine/...

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Tue Oct 14 14:35:39 2014
@@ -18,44 +18,9 @@
 #
 import heapq, os, Queue, re, socket, time, types
 from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout
-from proton import Message, ProtonException, Transport, TransportException, ConnectionException
+from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
 from select import select
 
-class EventDispatcher(object):
-
-    methods = {
-        Event.CONNECTION_INIT: "on_connection_init",
-        Event.CONNECTION_OPEN: "on_connection_local_open",
-        Event.CONNECTION_REMOTE_OPEN: "on_connection_open",
-        Event.CONNECTION_CLOSE: "on_connection_local_close",
-        Event.CONNECTION_REMOTE_CLOSE: "on_connection_close",
-        Event.CONNECTION_FINAL: "on_connection_final",
-
-        Event.SESSION_INIT: "on_session_init",
-        Event.SESSION_OPEN: "on_session_open",
-        Event.SESSION_REMOTE_OPEN: "on_session_open",
-        Event.SESSION_CLOSE: "on_session_local_close",
-        Event.SESSION_REMOTE_CLOSE: "on_session_close",
-        Event.SESSION_FINAL: "on_session_final",
-
-        Event.LINK_INIT: "on_link_init",
-        Event.LINK_OPEN: "on_link_local_open",
-        Event.LINK_REMOTE_OPEN: "on_link_open",
-        Event.LINK_CLOSE: "on_link_local_close",
-        Event.LINK_REMOTE_CLOSE: "on_link_close",
-        Event.LINK_FLOW: "on_link_flow",
-        Event.LINK_FINAL: "on_link_final",
-
-        Event.TRANSPORT: "on_transport",
-        Event.DELIVERY: "on_delivery"
-    }
-
-    def dispatch(self, event):
-        getattr(self, self.methods.get(event.type, "on_%s" % str(event.type)), self.unhandled)(event)
-
-    def unhandled(self, event):
-        pass
-
 class AmqpConnection(object):
 
     def __init__(self, conn, sock, events):
@@ -270,7 +235,7 @@ class Events(object):
 
     def dispatch(self, event):
         for d in self.dispatchers:
-            d.dispatch(event)
+            event.dispatch(d)
 
     @property
     def next_interval(self):
@@ -280,9 +245,14 @@ class Events(object):
     def empty(self):
         return self.collector.peek() == None
 
+class ExtendedEventType(object):
+    def __init__(self, name):
+        self.name = name
+        self.method = "on_%s" % name
+
 class ApplicationEvent(Event):
-    def __init__(self, type, connection=None, session=None, link=None, delivery=None, subject=None):
-        self.type = type
+    def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
+        self.type = ExtendedEventType(typename)
         self.subject = subject
         if delivery:
             self.context = delivery
@@ -302,7 +272,7 @@ class ApplicationEvent(Event):
 
     def __repr__(self):
         objects = [self.context, self.subject]
-        return "%s(%s)" % (self.type,
+        return "%s(%s)" % (self.type.name,
                            ", ".join([str(o) for o in objects if o is not None]))
 
 class ScheduledEvents(Events):
@@ -399,41 +369,41 @@ class SelectLoop(object):
             return False
 
 
-class Handshaker(EventDispatcher):
+class Handshaker(Handler):
 
-    def on_connection_open(self, event):
+    def on_connection_remote_open(self, event):
         conn = event.connection
         if conn.state & Endpoint.LOCAL_UNINIT:
             conn.open()
 
-    def on_session_open(self, event):
+    def on_session_remote_open(self, event):
         ssn = event.session
         if ssn.state & Endpoint.LOCAL_UNINIT:
             ssn.open()
 
-    def on_link_open(self, event):
+    def on_link_remote_open(self, event):
         link = event.link
         if link.state & Endpoint.LOCAL_UNINIT:
             link.source.copy(link.remote_source)
             link.target.copy(link.remote_target)
             link.open()
 
-    def on_connection_close(self, event):
+    def on_connection_remote_close(self, event):
         conn = event.connection
         if not (conn.state & Endpoint.LOCAL_CLOSED):
             conn.close()
 
-    def on_session_close(self, event):
+    def on_session_remote_close(self, event):
         ssn = event.session
         if not (ssn.state & Endpoint.LOCAL_CLOSED):
             ssn.close()
 
-    def on_link_close(self, event):
+    def on_link_remote_close(self, event):
         link = event.link
         if not (link.state & Endpoint.LOCAL_CLOSED):
             link.close()
 
-class FlowController(EventDispatcher):
+class FlowController(Handler):
 
     def __init__(self, window=1):
         self.window = window
@@ -446,7 +416,7 @@ class FlowController(EventDispatcher):
         if event.link.is_receiver:
             self.top_up(event.link)
 
-    def on_link_open(self, event):
+    def on_link_remote_open(self, event):
         if event.link.is_receiver:
             self.top_up(event.link)
 
@@ -458,7 +428,7 @@ class FlowController(EventDispatcher):
         if not event.delivery.released and event.delivery.link.is_receiver:
             self.top_up(event.delivery.link)
 
-class ScopedDispatcher(EventDispatcher):
+class ScopedHandler(Handler):
 
     scopes = {
         "pn_connection": ["connection"],
@@ -467,59 +437,17 @@ class ScopedDispatcher(EventDispatcher):
         "pn_delivery": ["delivery", "link", "session", "connection"]
     }
 
-    def dispatch(self, event):
+    def on_unhandled(self, event):
         if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
             return
-        method = self.methods.get(event.type, "on_%s" % str(event.type))
         objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])]
         targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
-        handlers = [getattr(t, method) for t in targets if hasattr(t, method)]
+        handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)]
         for h in handlers:
             h(event)
 
-class ErrorHandler(EventDispatcher):
-    def was_closed_by_peer(self, endpoint, parent=None):
-        if parent:
-            return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint)
-        else:
-            return endpoint.state & Endpoint.LOCAL_ACTIVE and endpoint.state & Endpoint.REMOTE_CLOSED
-
-    def treat_as_error(self, endpoint, parent=None):
-        return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent)
-
-    def print_error(self, endpoint, endpoint_type):
-        if endpoint.remote_condition:
-            print endpoint.remote_condition.description
-        elif self.was_closed_by_peer(endpoint):
-            print "%s closed by peer" % endpoint_type
-
-    def on_link_close(self, event):
-        if self.treat_as_error(event.link, event.connection):
-            self.on_link_error(event)
+class OutgoingMessageHandler(Handler):
 
-    def on_session_close(self, event):
-        if self.treat_as_error(event.session, event.connection):
-            self.on_session_error(event)
-
-    def on_connection_close(self, event):
-        if self.treat_as_error(event.connection):
-            self.on_connection_error(event)
-
-    def on_connection_error(self, event):
-        self.print_error(event.connection, "connection")
-        event.connection.close()
-
-    def on_session_error(self, event):
-        self.print_error(event.session, "session")
-        event.session.close()
-        event.connection.close()
-
-    def on_link_error(self, event):
-        self.print_error(event.link, "link")
-        event.link.close()
-        event.connection.close()
-
-class OutgoingMessageHandler(EventDispatcher):
     def on_link_flow(self, event):
         if event.link.is_sender and event.link.credit:
             self.on_credit(event)
@@ -561,7 +489,7 @@ class Reject(ProtonException):
   """
   pass
 
-class IncomingMessageHandler(EventDispatcher):
+class IncomingMessageHandler(Handler):
     def on_delivery(self, event):
         dlv = event.delivery
         if dlv.released or not dlv.link.is_receiver: return
@@ -599,9 +527,110 @@ class IncomingMessageHandler(EventDispat
     def on_settled(self, event): pass
     def auto_accept(self): return True
 
-class BaseHandler(ErrorHandler, IncomingMessageHandler, OutgoingMessageHandler):
+class ClientEndpointHandler(Handler):
+
+    def is_local_open(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+    def is_remote_open(self, endpoint):
+        return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+    def is_remote_closed(self, endpoint):
+        return endpoint.state & Endpoint.REMOTE_CLOSED
+
+    def was_closed_by_peer(self, endpoint, parent=None):
+        if parent:
+            return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint)
+        else:
+            return self.is_local_open(endpoint) and self.is_remote_closed(endpoint)
+
+    def treat_as_error(self, endpoint, parent=None):
+        return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent)
+
+    def print_error(self, endpoint, endpoint_type):
+        if endpoint.remote_condition:
+            print endpoint.remote_condition.description
+        elif self.was_closed_by_peer(endpoint):
+            print "%s closed by peer" % endpoint_type
+
+    def on_link_remote_close(self, event):
+        if self.treat_as_error(event.link, event.connection):
+            self.on_link_error(event)
+        else:
+            self.on_link_closed(event)
+
+    def on_session_remote_close(self, event):
+        if self.treat_as_error(event.session, event.connection):
+            self.on_session_error(event)
+        else:
+            self.on_session_closed(event)
+
+    def on_connection_remote_close(self, event):
+        if self.treat_as_error(event.connection):
+            self.on_connection_error(event)
+        else:
+            self.on_connection_closed(event)
+
+    def on_connection_error(self, event):
+        self.print_error(event.connection, "connection")
+        event.connection.close()
+
+    def on_session_error(self, event):
+        self.print_error(event.session, "session")
+        event.session.close()
+        event.connection.close()
+
+    def on_link_error(self, event):
+        self.print_error(event.link, "link")
+        event.link.close()
+        event.connection.close()
+
+    def on_connection_local_open(self, event):
+        if self.is_remote_open(event.connection):
+            self.on_connection_opened(event)
+
+    def on_connection_remote_open(self, event):
+        if self.is_local_open(event.connection):
+            self.on_connection_opened(event)
+
+    def on_session_local_open(self, event):
+        if self.is_remote_open(event.session):
+            self.on_session_opened(event)
+
+    def on_session_remote_open(self, event):
+        if self.is_local_open(event.session):
+            self.on_session_opened(event)
+
+    def on_link_local_open(self, event):
+        if self.is_remote_open(event.link):
+            self.on_link_opened(event)
+
+    def on_link_remote_open(self, event):
+        if self.is_local_open(event.link):
+            self.on_link_opened(event)
+
+    def on_connection_opened(self, event):
+        pass
+
+    def on_session_opened(self, event):
+        pass
+
+    def on_link_opened(self, event):
+        pass
+
+    def on_connection_closed(self, event):
+        pass
+
+    def on_session_closed(self, event):
+        pass
+
+    def on_link_closed(self, event):
+        pass
+
+class ClientHandler(ClientEndpointHandler, IncomingMessageHandler, OutgoingMessageHandler):
+
     def __init__(self):
-        super(BaseHandler, self).__init__()
+        super(ClientHandler, self).__init__()
 
     def on_delivery(self, event):
         IncomingMessageHandler.on_delivery(self, event)
@@ -689,11 +718,11 @@ class MessagingContext(object):
         ssn.open()
         return ssn
 
-    def on_session_close(self, event):
+    def on_session_remote_close(self, event):
         if self.conn:
             self.conn.close()
 
-class Connector(EventDispatcher):
+class Connector(Handler):
     def attach_to(self, loop):
         self.loop = loop
 
@@ -706,7 +735,7 @@ class Connector(EventDispatcher):
         if hasattr(event.connection, "address"):
             self._connect(event.connection)
 
-    def on_connection_open(self, event):
+    def on_connection_remote_open(self, event):
         if hasattr(event.connection, "reconnect"):
             event.connection.reconnect.reset()
 
@@ -816,9 +845,10 @@ class Urls(object):
 class EventLoop(object):
     def __init__(self, *handlers):
         self.connector = Connector()
-        l = [ScopedDispatcher(), self.connector]
-        if handlers: l += handlers
-        else: l.append(FlowController(10))
+        if handlers:
+            l = handlers + (self.connector, ScopedHandler())
+        else:
+            l = [FlowController(10), self.connector, ScopedHandler()]
         self.events = ScheduledEvents(*l)
         self.loop = SelectLoop(self.events)
         self.connector.attach_to(self)
@@ -904,10 +934,10 @@ class BlockingReceiver(BlockingLink):
         super(BlockingReceiver, self).__init__(connection, receiver)
         if credit: receiver.flow(credit)
 
-class BlockingConnection(EventDispatcher):
+class BlockingConnection(Handler):
     def __init__(self, url, timeout=None):
         self.timeout = timeout
-        self.events = Events(ScopedDispatcher())
+        self.events = Events(ScopedHandler())
         self.loop = SelectLoop(self.events)
         self.context = MessagingContext(self.loop.events.connection(), handler=self)
         if isinstance(url, basestring):
@@ -951,11 +981,11 @@ class BlockingConnection(EventDispatcher
                     if msg: txt += ": " + msg
                     raise Timeout(txt)
 
-    def on_link_close(self, event):
+    def on_link_remote_close(self, event):
         if event.link.state & Endpoint.LOCAL_ACTIVE:
             self.closed(event.link.remote_condition)
 
-    def on_connection_close(self, event):
+    def on_connection_remote_close(self, event):
         if event.connection.state & Endpoint.LOCAL_ACTIVE:
             self.closed(event.connection.remote_condition)
 

Modified: qpid/proton/branches/examples/tutorial/proton_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_tornado.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_tornado.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_tornado.py Tue Oct 14 14:35:39 2014
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-from proton_events import ApplicationEvent, Connector, EventLoop, Events, FlowController, MessagingContext, ScopedDispatcher, Url
+from proton_events import ApplicationEvent, EventLoop
 import tornado.ioloop
 
 class TornadoLoop(EventLoop):

Modified: qpid/proton/branches/examples/tutorial/recurring_timer.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/recurring_timer.py (original)
+++ qpid/proton/branches/examples/tutorial/recurring_timer.py Tue Oct 14 14:35:39 2014
@@ -19,9 +19,9 @@
 #
 
 import time
-from proton_events import EventLoop, EventDispatcher
+from proton_events import EventLoop, Handler
 
-class Recurring(EventDispatcher):
+class Recurring(Handler):
     def __init__(self, period):
         self.eventloop = EventLoop(self)
         self.period = period

Modified: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py (original)
+++ qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py Tue Oct 14 14:35:39 2014
@@ -19,10 +19,10 @@
 #
 
 import time
-from proton_events import EventDispatcher
+from proton_events import Handler
 from proton_tornado import TornadoLoop
 
-class Recurring(EventDispatcher):
+class Recurring(Handler):
     def __init__(self, period):
         self.eventloop = TornadoLoop(self)
         self.period = period

Modified: qpid/proton/branches/examples/tutorial/server.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/server.py (original)
+++ qpid/proton/branches/examples/tutorial/server.py Tue Oct 14 14:35:39 2014
@@ -19,9 +19,9 @@
 #
 
 from proton import Message
-from proton_events import EventLoop, IncomingMessageHandler
+from proton_events import EventLoop, ClientHandler
 
-class Server(IncomingMessageHandler):
+class Server(ClientHandler):
     def __init__(self, eventloop, host, address):
         self.eventloop = eventloop
         self.conn = eventloop.connect(host, handler=self)
@@ -38,14 +38,10 @@ class Server(IncomingMessageHandler):
             self.senders[event.message.reply_to] = sender
         sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
 
-    def on_connection_open(self, event):
+    def on_connection_opened(self, event):
         if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
             self.relay = self.conn.create_sender(None)
 
-    def on_connection_close(self, endpoint, error):
-        if error: print "Closed due to %s" % error
-        self.conn.close()
-
     def run(self):
         self.eventloop.run()
 

Modified: qpid/proton/branches/examples/tutorial/simple_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_recv.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_recv.py Tue Oct 14 14:35:39 2014
@@ -20,7 +20,7 @@
 
 import proton_events
 
-class Recv(proton_events.BaseHandler):
+class Recv(proton_events.ClientHandler):
     def on_message(self, event):
         print event.message.body
 

Modified: qpid/proton/branches/examples/tutorial/simple_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1631772&r1=1631771&r2=1631772&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/simple_send.py (original)
+++ qpid/proton/branches/examples/tutorial/simple_send.py Tue Oct 14 14:35:39 2014
@@ -21,7 +21,7 @@
 from proton import Message
 import proton_events
 
-class Send(proton_events.BaseHandler):
+class Send(proton_events.ClientHandler):
     def __init__(self, messages):
         self.sent = 0
         self.confirmed = 0



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org