You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/09/30 17:14:55 UTC
svn commit: r1628465 - in /qpid/proton/branches/examples/tutorial:
client_http.py helloworld_direct_tornado.py helloworld_tornado.py
proton_events.py proton_tornado.py recurring_timer_tornado.py server.py
Author: gsim
Date: Tue Sep 30 15:14:55 2014
New Revision: 1628465
URL: http://svn.apache.org/r1628465
Log:
Added some examples of using proton with tornados io loop
Added:
qpid/proton/branches/examples/tutorial/client_http.py (with props)
qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py (with props)
qpid/proton/branches/examples/tutorial/helloworld_tornado.py (with props)
qpid/proton/branches/examples/tutorial/proton_tornado.py
qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py (with props)
Modified:
qpid/proton/branches/examples/tutorial/proton_events.py
qpid/proton/branches/examples/tutorial/server.py
Added: qpid/proton/branches/examples/tutorial/client_http.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/client_http.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/client_http.py (added)
+++ qpid/proton/branches/examples/tutorial/client_http.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import IncomingMessageHandler
+from proton_tornado import TornadoLoop
+from tornado.ioloop import IOLoop
+import tornado.web
+
+class ExampleHandler(tornado.web.RequestHandler, IncomingMessageHandler):
+ def initialize(self, loop):
+ self.loop = loop
+
+ def get(self):
+ self._write_open()
+ self._write_form()
+ self._write_close()
+
+ @tornado.web.asynchronous
+ def post(self):
+ self.conn = self.loop.connect("localhost:5672")
+ self.sender = self.conn.sender("examples")
+ self.conn.receiver(None, dynamic=True, handler=self)
+
+ def on_link_remote_open(self, event):
+ req = Message(reply_to=event.link.remote_source.address, body=self.get_body_argument("message"))
+ self.sender.send_msg(req)
+
+ def on_message(self, event):
+ self.set_header("Content-Type", "text/html")
+ self._write_open()
+ self._write_form()
+ self.write("Response: " + event.message.body)
+ self._write_close()
+ self.finish()
+ self.conn.close()
+
+ def _write_open(self):
+ self.write('<html><body>')
+
+ def _write_close(self):
+ self.write('</body></html>')
+
+ def _write_form(self):
+ self.write('<form action="/client" method="POST">'
+ 'Request: <input type="text" name="message">'
+ '<input type="submit" value="Submit">'
+ '</form>')
+
+
+loop = TornadoLoop()
+app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(loop=loop))])
+app.listen(8888)
+try:
+ loop.run()
+except KeyboardInterrupt:
+ loop.stop()
Propchange: qpid/proton/branches/examples/tutorial/client_http.py
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import FlowController, Handshaker, IncomingMessageHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorldReceiver(IncomingMessageHandler):
+ def on_message(self, event):
+ print event.message.body
+ event.connection.close()
+
+class HelloWorldSender(object):
+ def on_link_flow(self, event):
+ event.link.send_msg(Message(body=u"Hello World!"))
+ event.link.close()
+
+class HelloWorld(object):
+ def __init__(self, eventloop, url, address):
+ self.eventloop = eventloop
+ self.acceptor = eventloop.listen(url)
+ self.conn = eventloop.connect(url, handler=self)
+ self.address = address
+
+ def on_connection_remote_open(self, event):
+ self.conn.sender(self.address, handler=HelloWorldSender())
+
+ def on_link_remote_close(self, event):
+ self.closed(event.link.remote_condition)
+
+ def on_connection_remote_close(self, event):
+ self.closed(event.connection.remote_condition)
+ self.eventloop.stop()
+
+ def closed(self, error=None):
+ if error:
+ print "Closed due to %s" % error
+ self.conn.close()
+ self.acceptor.close()
+
+ def run(self):
+ self.eventloop.run()
+
+eventloop = TornadoLoop(HelloWorldReceiver(), Handshaker(), FlowController(1))
+HelloWorld(eventloop, "localhost:8888", "examples").run()
+
Propchange: qpid/proton/branches/examples/tutorial/helloworld_direct_tornado.py
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/branches/examples/tutorial/helloworld_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/helloworld_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/helloworld_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import IncomingMessageHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorldReceiver(IncomingMessageHandler):
+ def on_message(self, event):
+ print event.message.body
+ event.connection.close()
+
+class HelloWorldSender(object):
+ def on_link_flow(self, event):
+ event.link.send_msg(Message(body=u"Hello World!"))
+ event.link.close()
+
+class HelloWorld(object):
+ def __init__(self, eventloop, url, address):
+ self.eventloop = eventloop
+ self.conn = eventloop.connect(url, handler=self)
+ self.address = address
+
+ def on_connection_remote_open(self, event):
+ self.conn.receiver(self.address, handler=HelloWorldReceiver())
+
+ def on_link_remote_open(self, event):
+ if event.link.is_receiver:
+ self.conn.sender(self.address, handler=HelloWorldSender())
+
+ def on_link_remote_close(self, event):
+ self.closed(event.link.remote_condition)
+
+ def on_connection_remote_close(self, event):
+ self.closed(event.connection.remote_condition)
+ self.eventloop.stop()
+
+ def closed(self, error=None):
+ if error:
+ print "Closed due to %s" % error
+ self.conn.close()
+
+ def run(self):
+ self.eventloop.run()
+
+HelloWorld(TornadoLoop(), "localhost:5672", "examples").run()
+
Propchange: qpid/proton/branches/examples/tutorial/helloworld_tornado.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1628465&r1=1628464&r2=1628465&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Tue Sep 30 15:14:55 2014
@@ -56,7 +56,7 @@ class EventDispatcher(object):
def unhandled(self, event):
pass
-class Selectable(object):
+class AmqpConnection(object):
def __init__(self, conn, sock, events):
self.events = events
@@ -68,6 +68,7 @@ class Selectable(object):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.write_done = False
self.read_done = False
+ self._closed = False
def accept(self):
#TODO: use SASL if requested by peer
@@ -91,12 +92,16 @@ class Selectable(object):
return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED
def closed(self):
- if self.write_done and self.read_done:
- self.socket.close()
+ if not self._closed and self.write_done and self.read_done:
+ self.close()
return True
else:
return False
+ def close(self):
+ self.socket.close()
+ self._closed = True
+
def fileno(self):
return self.socket.fileno()
@@ -170,15 +175,17 @@ class Selectable(object):
class Acceptor:
- def __init__(self, events, selectables, host, port):
+ def __init__(self, events, loop, host, port):
self.events = events
- self.selectables = selectables
+ #self.selectables = selectables
+ self.loop = loop
self.socket = socket.socket()
self.socket.setblocking(0)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((host, port))
self.socket.listen(5)
- self.selectables.append(self)
+ #self.selectables.append(self)
+ self.loop.add(self)
self._closed = False
def closed(self):
@@ -203,7 +210,8 @@ class Acceptor:
def readable(self):
sock, addr = self.socket.accept()
if sock:
- self.selectables.append(Selectable(self.events.connection(), sock, self.events).accept())
+ #self.selectables.append(AmqpConnection(self.events.connection(), sock, self.events).accept())
+ self.loop.add(AmqpConnection(self.events.connection(), sock, self.events).accept())
def removed(self): pass
@@ -442,7 +450,7 @@ class Handshaker(EventDispatcher):
class FlowController(EventDispatcher):
- def __init__(self, window):
+ def __init__(self, window=1):
self.window = window
def top_up(self, link):
@@ -580,6 +588,7 @@ class MessagingContext(object):
self.conn = conn
if handler:
self.conn.context = handler
+ self.conn._mc = self
self.ssn = ssn
def sender(self, target, source=None, name=None, handler=None, tags=None):
@@ -642,7 +651,7 @@ class Connector(EventDispatcher):
def _connect(self, connection):
host, port = connection.address.next()
print "connecting to %s:%i" % (host, port)
- self.loop.add(Selectable(connection, socket.socket(), self.loop.events).connect(host, port))
+ self.loop.add(AmqpConnection(connection, socket.socket(), self.loop.events).connect(host, port))
def on_connection_open(self, event):
if hasattr(event.connection, "address"):
@@ -765,7 +774,7 @@ class EventLoop(object):
self.trigger = None
def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
- context = MessagingContext(self.loop.events.connection(), handler=handler)
+ context = MessagingContext(self.events.connection(), handler=handler)
if url: context.conn.address = Url(url)
elif urls: context.conn.address = Urls(urls)
elif address: context.conn.address = address
@@ -777,7 +786,7 @@ class EventLoop(object):
def listen(self, url):
host, port = Url(url).next()
- return Acceptor(self.loop.events, self.loop.selectables, host, port)
+ return Acceptor(self.events, self, host, port)
def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
@@ -785,7 +794,7 @@ class EventLoop(object):
def get_event_trigger(self):
if not self.trigger or self.trigger.closed():
self.trigger = EventInjector(self.events)
- self.loop.selectables.append(self.trigger)
+ self.add(self.trigger)
return self.trigger
def add(self, selectable):
@@ -803,7 +812,6 @@ class EventLoop(object):
def do_work(self, timeout=None):
return self.loop.do_work(timeout)
-
class BlockingLink(object):
def __init__(self, connection, link):
self.connection = connection
@@ -837,9 +845,12 @@ class BlockingConnection(EventDispatcher
self.events = Events(ScopedDispatcher())
self.loop = SelectLoop(self.events)
self.context = MessagingContext(self.loop.events.connection(), handler=self)
- self.url = url
+ if isinstance(url, basestring):
+ self.url = Url(url)
+ else:
+ self.url = url
self.loop.add(
- Selectable(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port))
+ AmqpConnection(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port))
self.context.conn.open()
self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT),
msg="Opening connection")
@@ -856,6 +867,10 @@ class BlockingConnection(EventDispatcher
self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE),
msg="Closing connection")
+ def run(self):
+ """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+ self.loop.run()
+
def wait(self, condition, timeout=False, msg=None):
"""Call do_work until condition() is true"""
if timeout is False:
Added: qpid/proton/branches/examples/tutorial/proton_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/proton_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton_events import ApplicationEvent, Connector, EventLoop, Events, FlowController, MessagingContext, ScopedDispatcher, Url
+import tornado.ioloop
+
+class TornadoLoop(EventLoop):
+ def __init__(self, *handlers):
+ super(TornadoLoop, self).__init__(*handlers)
+ self.loop = tornado.ioloop.IOLoop.current()
+
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
+ conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect)
+ self.events.process()
+ return conn
+
+ def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+ def add(self, conn):
+ self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE)
+
+ def remove(self, conn):
+ self.loop.remove_handler(conn)
+
+ def run(self):
+ self.loop.start()
+
+ def stop(self):
+ self.loop.stop()
+
+ def _get_event_flags(self, conn):
+ flags = 0
+ if conn.reading():
+ flags |= tornado.ioloop.IOLoop.READ
+ if conn.writing():
+ flags |= tornado.ioloop.IOLoop.WRITE
+ return flags
+
+ def _connection_ready(self, conn, events):
+ if events & tornado.ioloop.IOLoop.READ:
+ conn.readable()
+ if events & tornado.ioloop.IOLoop.WRITE:
+ conn.writable()
+ if events & tornado.ioloop.IOLoop.ERROR or conn.closed():
+ conn.close()
+ self.loop.remove_handler(conn)
+ conn.removed()
+ self.events.process()
+ self.loop.update_handler(conn, self._get_event_flags(conn))
Added: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py?rev=1628465&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py (added)
+++ qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py Tue Sep 30 15:14:55 2014
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton_events import EventDispatcher
+from proton_tornado import TornadoLoop
+
+class Recurring(EventDispatcher):
+ def __init__(self, period):
+ self.eventloop = TornadoLoop(self)
+ self.period = period
+ self.eventloop.schedule(time.time() + self.period, subject=self)
+
+ def on_timer(self, event):
+ print "Tick..."
+ self.eventloop.schedule(time.time() + self.period, subject=self)
+
+ def run(self):
+ self.eventloop.run()
+
+ def stop(self):
+ self.eventloop.stop()
+
+try:
+ app = Recurring(1.0)
+ app.run()
+except KeyboardInterrupt:
+ app.stop()
+ print
+
+
Propchange: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/proton/branches/examples/tutorial/server.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server.py?rev=1628465&r1=1628464&r2=1628465&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/server.py (original)
+++ qpid/proton/branches/examples/tutorial/server.py Tue Sep 30 15:14:55 2014
@@ -39,7 +39,7 @@ class Server(IncomingMessageHandler):
sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
def on_connection_remote_open(self, event):
- if 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
+ if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
self.relay = self.conn.sender(None)
def on_connection_remote_close(self, endpoint, error):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org