You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/09/30 17:14:55 UTC

svn commit: r1628465 - in /qpid/proton/branches/examples/tutorial: client_http.py helloworld_direct_tornado.py helloworld_tornado.py proton_events.py proton_tornado.py recurring_timer_tornado.py server.py

Author: gsim
Date: Tue Sep 30 15:14:55 2014
New Revision: 1628465

URL: http://svn.apache.org/r1628465
Log:
Added some examples of using proton with tornados io loop

Added:
    qpid/proton/branches/examples/tutorial/client_http.py   (with props)
    qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py   (with props)
    qpid/proton/branches/examples/tutorial/helloworld_tornado.py   (with props)
    qpid/proton/branches/examples/tutorial/proton_tornado.py
    qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py   (with props)
Modified:
    qpid/proton/branches/examples/tutorial/proton_events.py
    qpid/proton/branches/examples/tutorial/server.py

Added: qpid/proton/branches/examples/tutorial/client_http.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/client_http.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/client_http.py (added)
+++ qpid/proton/branches/examples/tutorial/client_http.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import IncomingMessageHandler
+from proton_tornado import TornadoLoop
+from tornado.ioloop import IOLoop
+import tornado.web
+
+class ExampleHandler(tornado.web.RequestHandler, IncomingMessageHandler):
+    def initialize(self, loop):
+        self.loop = loop
+
+    def get(self):
+        self._write_open()
+        self._write_form()
+        self._write_close()
+
+    @tornado.web.asynchronous
+    def post(self):
+        self.conn = self.loop.connect("localhost:5672")
+        self.sender = self.conn.sender("examples")
+        self.conn.receiver(None, dynamic=True, handler=self)
+
+    def on_link_remote_open(self, event):
+        req = Message(reply_to=event.link.remote_source.address, body=self.get_body_argument("message"))
+        self.sender.send_msg(req)
+
+    def on_message(self, event):
+        self.set_header("Content-Type", "text/html")
+        self._write_open()
+        self._write_form()
+        self.write("Response: " + event.message.body)
+        self._write_close()
+        self.finish()
+        self.conn.close()
+
+    def _write_open(self):
+        self.write('<html><body>')
+
+    def _write_close(self):
+        self.write('</body></html>')
+
+    def _write_form(self):
+        self.write('<form action="/client" method="POST">'
+                   'Request: <input type="text" name="message">'
+                   '<input type="submit" value="Submit">'
+                   '</form>')
+
+
+loop = TornadoLoop()
+app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(loop=loop))])
+app.listen(8888)
+try:
+    loop.run()
+except KeyboardInterrupt:
+    loop.stop()

Propchange: qpid/proton/branches/examples/tutorial/client_http.py
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import FlowController, Handshaker, IncomingMessageHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorldReceiver(IncomingMessageHandler):
+    def on_message(self, event):
+        print event.message.body
+        event.connection.close()
+
+class HelloWorldSender(object):
+    def on_link_flow(self, event):
+        event.link.send_msg(Message(body=u"Hello World!"))
+        event.link.close()
+
+class HelloWorld(object):
+    def __init__(self, eventloop, url, address):
+        self.eventloop = eventloop
+        self.acceptor = eventloop.listen(url)
+        self.conn = eventloop.connect(url, handler=self)
+        self.address = address
+
+    def on_connection_remote_open(self, event):
+        self.conn.sender(self.address, handler=HelloWorldSender())
+
+    def on_link_remote_close(self, event):
+        self.closed(event.link.remote_condition)
+
+    def on_connection_remote_close(self, event):
+        self.closed(event.connection.remote_condition)
+        self.eventloop.stop()
+
+    def closed(self, error=None):
+        if error:
+            print "Closed due to %s" % error
+        self.conn.close()
+        self.acceptor.close()
+
+    def run(self):
+        self.eventloop.run()
+
+eventloop = TornadoLoop(HelloWorldReceiver(), Handshaker(), FlowController(1))
+HelloWorld(eventloop, "localhost:8888", "examples").run()
+

Propchange: qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/branches/examples/tutorial/helloworld_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/helloworld_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import IncomingMessageHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorldReceiver(IncomingMessageHandler):
+    def on_message(self, event):
+        print event.message.body
+        event.connection.close()
+
+class HelloWorldSender(object):
+    def on_link_flow(self, event):
+        event.link.send_msg(Message(body=u"Hello World!"))
+        event.link.close()
+
+class HelloWorld(object):
+    def __init__(self, eventloop, url, address):
+        self.eventloop = eventloop
+        self.conn = eventloop.connect(url, handler=self)
+        self.address = address
+
+    def on_connection_remote_open(self, event):
+        self.conn.receiver(self.address, handler=HelloWorldReceiver())
+
+    def on_link_remote_open(self, event):
+        if event.link.is_receiver:
+            self.conn.sender(self.address, handler=HelloWorldSender())
+
+    def on_link_remote_close(self, event):
+        self.closed(event.link.remote_condition)
+
+    def on_connection_remote_close(self, event):
+        self.closed(event.connection.remote_condition)
+        self.eventloop.stop()
+
+    def closed(self, error=None):
+        if error:
+            print "Closed due to %s" % error
+        self.conn.close()
+
+    def run(self):
+        self.eventloop.run()
+
+HelloWorld(TornadoLoop(), "localhost:5672", "examples").run()
+

Propchange: qpid/proton/branches/examples/tutorial/helloworld_tornado.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1628465&r1=1628464&r2=1628465&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Tue Sep 30 15:14:55 2014
@@ -56,7 +56,7 @@ class EventDispatcher(object):
     def unhandled(self, event):
         pass
 
-class Selectable(object):
+class AmqpConnection(object):
 
     def __init__(self, conn, sock, events):
         self.events = events
@@ -68,6 +68,7 @@ class Selectable(object):
         self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
         self.write_done = False
         self.read_done = False
+        self._closed = False
 
     def accept(self):
         #TODO: use SASL if requested by peer
@@ -91,12 +92,16 @@ class Selectable(object):
         return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED
 
     def closed(self):
-        if self.write_done and self.read_done:
-            self.socket.close()
+        if not self._closed and self.write_done and self.read_done:
+            self.close()
             return True
         else:
             return False
 
+    def close(self):
+        self.socket.close()
+        self._closed = True
+
     def fileno(self):
         return self.socket.fileno()
 
@@ -170,15 +175,17 @@ class Selectable(object):
 
 class Acceptor:
 
-    def __init__(self, events, selectables, host, port):
+    def __init__(self, events, loop, host, port):
         self.events = events
-        self.selectables = selectables
+        #self.selectables = selectables
+        self.loop = loop
         self.socket = socket.socket()
         self.socket.setblocking(0)
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         self.socket.bind((host, port))
         self.socket.listen(5)
-        self.selectables.append(self)
+        #self.selectables.append(self)
+        self.loop.add(self)
         self._closed = False
 
     def closed(self):
@@ -203,7 +210,8 @@ class Acceptor:
     def readable(self):
         sock, addr = self.socket.accept()
         if sock:
-            self.selectables.append(Selectable(self.events.connection(), sock, self.events).accept())
+            #self.selectables.append(AmqpConnection(self.events.connection(), sock, self.events).accept())
+            self.loop.add(AmqpConnection(self.events.connection(), sock, self.events).accept())
 
     def removed(self): pass
 
@@ -442,7 +450,7 @@ class Handshaker(EventDispatcher):
 
 class FlowController(EventDispatcher):
 
-    def __init__(self, window):
+    def __init__(self, window=1):
         self.window = window
 
     def top_up(self, link):
@@ -580,6 +588,7 @@ class MessagingContext(object):
         self.conn = conn
         if handler:
             self.conn.context = handler
+        self.conn._mc = self
         self.ssn = ssn
 
     def sender(self, target, source=None, name=None, handler=None, tags=None):
@@ -642,7 +651,7 @@ class Connector(EventDispatcher):
     def _connect(self, connection):
         host, port = connection.address.next()
         print "connecting to %s:%i" % (host, port)
-        self.loop.add(Selectable(connection, socket.socket(), self.loop.events).connect(host, port))
+        self.loop.add(AmqpConnection(connection, socket.socket(), self.loop.events).connect(host, port))
 
     def on_connection_open(self, event):
         if hasattr(event.connection, "address"):
@@ -765,7 +774,7 @@ class EventLoop(object):
         self.trigger = None
 
     def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
-        context = MessagingContext(self.loop.events.connection(), handler=handler)
+        context = MessagingContext(self.events.connection(), handler=handler)
         if url: context.conn.address = Url(url)
         elif urls: context.conn.address = Urls(urls)
         elif address: context.conn.address = address
@@ -777,7 +786,7 @@ class EventLoop(object):
 
     def listen(self, url):
         host, port = Url(url).next()
-        return Acceptor(self.loop.events, self.loop.selectables, host, port)
+        return Acceptor(self.events, self, host, port)
 
     def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
         self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
@@ -785,7 +794,7 @@ class EventLoop(object):
     def get_event_trigger(self):
         if not self.trigger or self.trigger.closed():
             self.trigger = EventInjector(self.events)
-            self.loop.selectables.append(self.trigger)
+            self.add(self.trigger)
         return self.trigger
 
     def add(self, selectable):
@@ -803,7 +812,6 @@ class EventLoop(object):
     def do_work(self, timeout=None):
         return self.loop.do_work(timeout)
 
-
 class BlockingLink(object):
     def __init__(self, connection, link):
         self.connection = connection
@@ -837,9 +845,12 @@ class BlockingConnection(EventDispatcher
         self.events = Events(ScopedDispatcher())
         self.loop = SelectLoop(self.events)
         self.context = MessagingContext(self.loop.events.connection(), handler=self)
-        self.url = url
+        if isinstance(url, basestring):
+            self.url = Url(url)
+        else:
+            self.url = url
         self.loop.add(
-            Selectable(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port))
+            AmqpConnection(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port))
         self.context.conn.open()
         self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT),
                   msg="Opening connection")
@@ -856,6 +867,10 @@ class BlockingConnection(EventDispatcher
         self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE),
                   msg="Closing connection")
 
+    def run(self):
+        """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+        self.loop.run()
+
     def wait(self, condition, timeout=False, msg=None):
         """Call do_work until condition() is true"""
         if timeout is False:

Added: qpid/proton/branches/examples/tutorial/proton_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/proton_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton_events import ApplicationEvent, Connector, EventLoop, Events, FlowController, MessagingContext, ScopedDispatcher, Url
+import tornado.ioloop
+
+class TornadoLoop(EventLoop):
+    def __init__(self, *handlers):
+        super(TornadoLoop, self).__init__(*handlers)
+        self.loop = tornado.ioloop.IOLoop.current()
+
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
+        conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect)
+        self.events.process()
+        return conn
+
+    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+        self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+    def add(self, conn):
+        self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE)
+
+    def remove(self, conn):
+        self.loop.remove_handler(conn)
+
+    def run(self):
+        self.loop.start()
+
+    def stop(self):
+        self.loop.stop()
+
+    def _get_event_flags(self, conn):
+        flags = 0
+        if conn.reading():
+            flags |= tornado.ioloop.IOLoop.READ
+        if conn.writing():
+            flags |= tornado.ioloop.IOLoop.WRITE
+        return flags
+
+    def _connection_ready(self, conn, events):
+        if events & tornado.ioloop.IOLoop.READ:
+            conn.readable()
+        if events & tornado.ioloop.IOLoop.WRITE:
+            conn.writable()
+        if events & tornado.ioloop.IOLoop.ERROR or conn.closed():
+            conn.close()
+            self.loop.remove_handler(conn)
+            conn.removed()
+        self.events.process()
+        self.loop.update_handler(conn, self._get_event_flags(conn))

Added: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton_events import EventDispatcher
+from proton_tornado import TornadoLoop
+
+class Recurring(EventDispatcher):
+    def __init__(self, period):
+        self.eventloop = TornadoLoop(self)
+        self.period = period
+        self.eventloop.schedule(time.time() + self.period, subject=self)
+
+    def on_timer(self, event):
+        print "Tick..."
+        self.eventloop.schedule(time.time() + self.period, subject=self)
+
+    def run(self):
+        self.eventloop.run()
+
+    def stop(self):
+        self.eventloop.stop()
+
+try:
+    app = Recurring(1.0)
+    app.run()
+except KeyboardInterrupt:
+    app.stop()
+    print
+
+

Propchange: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/proton/branches/examples/tutorial/server.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server.py?rev=1628465&r1=1628464&r2=1628465&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/server.py (original)
+++ qpid/proton/branches/examples/tutorial/server.py Tue Sep 30 15:14:55 2014
@@ -39,7 +39,7 @@ class Server(IncomingMessageHandler):
         sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
 
     def on_connection_remote_open(self, event):
-        if 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
+        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
             self.relay = self.conn.sender(None)
 
     def on_connection_remote_close(self, endpoint, error):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org