You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/26 23:15:03 UTC
svn commit: r989927 - in /avro/trunk: CHANGES.txt lang/py/src/avro/ipc.py
lang/py/src/avro/txipc.py lang/py/test/txsample_http_client.py
lang/py/test/txsample_http_server.py
Author: cutting
Date: Thu Aug 26 21:15:02 2010
New Revision: 989927
URL: http://svn.apache.org/viewvc?rev=989927&view=rev
Log:
AVRO-528. Python: Add support for Twisted. Contributed by Esteve Fernandez.
Added:
avro/trunk/lang/py/src/avro/txipc.py
avro/trunk/lang/py/test/txsample_http_client.py
avro/trunk/lang/py/test/txsample_http_server.py
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/py/src/avro/ipc.py
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=989927&r1=989926&r2=989927&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Aug 26 21:15:02 2010
@@ -60,6 +60,8 @@ Avro 1.4.0 (unreleased)
AVRO-611. IDL: Add support for one-way messages. (cutting)
+ AVRO-528. Python: Add support for Twisted. (Esteve Fernandez via cutting)
+
IMPROVEMENTS
AVRO-629. Prefer the JSON module of python's stdlib over simplejson.
Modified: avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ipc.py?rev=989927&r1=989926&r2=989927&view=diff
==============================================================================
--- avro/trunk/lang/py/src/avro/ipc.py (original)
+++ avro/trunk/lang/py/src/avro/ipc.py Thu Aug 26 21:15:02 2010
@@ -75,7 +75,7 @@ class ConnectionClosedException(schema.A
# Base IPC Classes (Requestor/Responder)
#
-class Requestor(object):
+class BaseRequestor(object):
"""Base class for the client side of a protocol interaction."""
def __init__(self, local_protocol, transceiver):
self._local_protocol = local_protocol
@@ -116,15 +116,7 @@ class Requestor(object):
# send the handshake and call request; block until call response
call_request = buffer_writer.getvalue()
- call_response = self.transceiver.transceive(call_request)
-
- # process the handshake and call response
- buffer_decoder = io.BinaryDecoder(StringIO(call_response))
- call_response_exists = self.read_handshake_response(buffer_decoder)
- if call_response_exists:
- return self.read_call_response(message_name, buffer_decoder)
- else:
- return self.request(message_name, request_datum)
+ return self.issue_request(call_request, message_name, request_datum)
def write_handshake_request(self, encoder):
local_hash = self.local_protocol.md5
@@ -232,6 +224,19 @@ class Requestor(object):
datum_reader = io.DatumReader(writers_schema, readers_schema)
return AvroRemoteException(datum_reader.read(decoder))
+class Requestor(BaseRequestor):
+
+ def issue_request(self, call_request, message_name, request_datum):
+ call_response = self.transceiver.transceive(call_request)
+
+ # process the handshake and call response
+ buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+ call_response_exists = self.read_handshake_response(buffer_decoder)
+ if call_response_exists:
+ return self.read_call_response(message_name, buffer_decoder)
+ else:
+ return self.request(message_name, request_datum)
+
class Responder(object):
"""Base class for the server side of a protocol interaction."""
def __init__(self, local_protocol):
Added: avro/trunk/lang/py/src/avro/txipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/txipc.py?rev=989927&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/txipc.py (added)
+++ avro/trunk/lang/py/src/avro/txipc.py Thu Aug 26 21:15:02 2010
@@ -0,0 +1,222 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+from avro import ipc
+from avro import io
+
+from zope.interface import implements
+
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+from twisted.internet.defer import maybeDeferred, Deferred
+from twisted.web.iweb import IBodyProducer
+from twisted.web import resource, server
+from twisted.internet.protocol import Protocol
+
+class TwistedRequestor(ipc.BaseRequestor):
+ """A Twisted-compatible requestor. Returns a Deferred that will fire with the
+ returning value, instead of blocking until the request completes."""
+ def _process_handshake(self, call_response, message_name, request_datum):
+ # process the handshake and call response
+ buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+ call_response_exists = self.read_handshake_response(buffer_decoder)
+ if call_response_exists:
+ return self.read_call_response(message_name, buffer_decoder)
+ else:
+ return self.request(message_name, request_datum)
+
+ def issue_request(self, call_request, message_name, request_datum):
+ d = self.transceiver.transceive(call_request)
+ d.addCallback(self._process_handshake, message_name, request_datum)
+ return d
+
+class RequestStreamingProducer(object):
+ """A streaming producer for issuing requests with the Twisted.web Agent."""
+ implements(IBodyProducer)
+
+ paused = False
+ stopped = False
+ started = False
+
+ def __init__(self, message):
+ self._message = message
+ self._length = len(message)
+ # We need a buffer length header for every buffer and an additional
+ # zero-length buffer as the message terminator
+ self._length += (self._length / ipc.BUFFER_SIZE + 2) \
+ * ipc.BUFFER_HEADER_LENGTH
+ self._total_bytes_sent = 0
+ self._deferred = Deferred()
+
+ # read-only properties
+ message = property(lambda self: self._message)
+ length = property(lambda self: self._length)
+ consumer = property(lambda self: self._consumer)
+ deferred = property(lambda self: self._deferred)
+
+ def _get_total_bytes_sent(self):
+ return self._total_bytes_sent
+
+ def _set_total_bytes_sent(self, bytes_sent):
+ self._total_bytes_sent = bytes_sent
+
+ total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent)
+
+ def startProducing(self, consumer):
+ if self.started:
+ return
+
+ self.started = True
+ self._consumer = consumer
+ # Keep writing data to the consumer until we're finished,
+ # paused (pauseProducing()) or stopped (stopProducing())
+ while self.length - self.total_bytes_sent > 0 and \
+ not self.paused and not self.stopped:
+ self.write()
+ # self.write will fire this deferred once it has written
+ # the entire message to the consumer
+ return self.deferred
+
+ def resumeProducing(self):
+ self.paused = False
+ self.write(self)
+
+ def pauseProducing(self):
+ self.paused = True
+
+ def stopProducing(self):
+ self.stopped = True
+
+ def write(self):
+ if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE:
+ buffer_length = ipc.BUFFER_SIZE
+ else:
+ buffer_length = self.length - self.total_bytes_sent
+ self.write_buffer(self.message[self.total_bytes_sent:
+ (self.total_bytes_sent + buffer_length)])
+ self.total_bytes_sent += buffer_length
+ # Make sure we wrote the entire message
+ if self.total_bytes_sent == self.length and not self.stopped:
+ self.stopProducing()
+ # A message is always terminated by a zero-length buffer.
+ self.write_buffer_length(0)
+ self.deferred.callback(None)
+
+ def write_buffer(self, chunk):
+ buffer_length = len(chunk)
+ self.write_buffer_length(buffer_length)
+ self.consumer.write(chunk)
+
+ def write_buffer_length(self, n):
+ self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n))
+
+class AvroProtocol(Protocol):
+
+ recvd = ''
+ done = False
+
+ def __init__(self, finished):
+ self.finished = finished
+ self.message = []
+
+ def dataReceived(self, data):
+ self.recvd = self.recvd + data
+ while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH:
+ buffer_length ,= ipc.BIG_ENDIAN_INT_STRUCT.unpack(
+ self.recvd[:ipc.BUFFER_HEADER_LENGTH])
+ if buffer_length == 0:
+ response = ''.join(self.message)
+ self.done = True
+ self.finished.callback(response)
+ break
+ if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH:
+ break
+ buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + ipc.BUFFER_HEADER_LENGTH]
+ self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:]
+ self.message.append(buffer)
+
+ def connectionLost(self, reason):
+ if not self.done:
+ self.finished.errback(ipc.ConnectionClosedException("Reader read 0 bytes."))
+
+class TwistedHTTPTransceiver(object):
+ """This transceiver uses the Agent class present in Twisted.web >= 9.0
+ for issuing requests to the remote endpoint."""
+ def __init__(self, host, port, remote_name=None, reactor=None):
+ self.url = "http://%s:%d/" % (host, port)
+
+ if remote_name is None:
+ # There's no easy way to get this peer's remote address
+ # in Twisted so I use a random UUID to identify ourselves
+ import uuid
+ self.remote_name = uuid.uuid4()
+
+ if reactor is None:
+ from twisted.internet import reactor
+ self.agent = Agent(reactor)
+
+ def read_framed_message(self, response):
+ finished = Deferred()
+ response.deliverBody(AvroProtocol(finished))
+ return finished
+
+ def transceive(self, request):
+ req_method = 'POST'
+ req_headers = {
+ 'Content-Type': ['avro/binary'],
+ 'Accept-Encoding': ['identity'],
+ }
+
+ body_producer = RequestStreamingProducer(request)
+ d = self.agent.request(
+ req_method,
+ self.url,
+ headers=Headers(req_headers),
+ bodyProducer=body_producer)
+ return d.addCallback(self.read_framed_message)
+
+class AvroResponderResource(resource.Resource):
+ """This Twisted.web resource can be placed anywhere in a URL hierarchy
+ to provide an Avro endpoint. Different Avro protocols can be served
+ by the same web server as long as they are in different resources in
+ a URL hierarchy."""
+ isLeaf = True
+
+ def __init__(self, responder):
+ resource.Resource.__init__(self)
+ self.responder = responder
+
+ def cb_render_POST(self, resp_body, request):
+ request.setResponseCode(200)
+ request.setHeader('Content-Type', 'avro/binary')
+ resp_writer = ipc.FramedWriter(request)
+ resp_writer.write_framed_message(resp_body)
+ request.finish()
+
+ def render_POST(self, request):
+ # Unfortunately, Twisted.web doesn't support incoming
+ # streamed input yet, the whole payload must be kept in-memory
+ request.content.seek(0, 0)
+ call_request_reader = ipc.FramedReader(request.content)
+ call_request = call_request_reader.read_framed_message()
+ d = maybeDeferred(self.responder.respond, call_request)
+ d.addCallback(self.cb_render_POST, request)
+ return server.NOT_DONE_YET
Added: avro/trunk/lang/py/test/txsample_http_client.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/txsample_http_client.py?rev=989927&view=auto
==============================================================================
--- avro/trunk/lang/py/test/txsample_http_client.py (added)
+++ avro/trunk/lang/py/test/txsample_http_client.py Thu Aug 26 21:15:02 2010
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+
+from twisted.internet import reactor, defer
+from twisted.python.util import println
+
+from avro import protocol
+from avro import txipc
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+ client = txipc.TwistedHTTPTransceiver(SERVER_HOST, SERVER_PORT)
+ return txipc.TwistedRequestor(protocol, client)
+
+if __name__ == '__main__':
+ if len(sys.argv) not in [4, 5]:
+ raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+ # client code - attach to the server and send a message
+ # fill in the Message record
+ message = dict()
+ message['to'] = sys.argv[1]
+ message['from'] = sys.argv[2]
+ message['body'] = sys.argv[3]
+
+ try:
+ num_messages = int(sys.argv[4])
+ except:
+ num_messages = 1
+
+ # build the parameters for the request
+ params = {}
+ params['message'] = message
+
+ requests = []
+ # send the requests and print the result
+ for msg_count in range(num_messages):
+ requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+ d = requestor.request('send', params)
+ d.addCallback(lambda result: println("Result: " + result))
+ requests.append(d)
+ results = defer.gatherResults(requests)
+
+ def replay_cb(result):
+ print("Replay Result: " + result)
+ reactor.stop()
+
+ def replay(_):
+ # try out a replay message
+ requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+ d = requestor.request('replay', dict())
+ d.addCallback(replay_cb)
+
+ results.addCallback(replay)
+ reactor.run()
Added: avro/trunk/lang/py/test/txsample_http_server.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/txsample_http_server.py?rev=989927&view=auto
==============================================================================
--- avro/trunk/lang/py/test/txsample_http_server.py (added)
+++ avro/trunk/lang/py/test/txsample_http_server.py Thu Aug 26 21:15:02 2010
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.web import server
+from twisted.internet import reactor
+
+from avro import ipc
+from avro import protocol
+from avro import txipc
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+ def __init__(self):
+ ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+ def invoke(self, message, request):
+ if message.name == 'send':
+ request_content = request['message']
+ response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+ request_content
+ return response
+ elif message.name == 'replay':
+ return 'replay'
+
+if __name__ == '__main__':
+ root = server.Site(txipc.AvroResponderResource(MailResponder()))
+ reactor.listenTCP(9090, root)
+ reactor.run()