You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/26 23:15:03 UTC

svn commit: r989927 - in /avro/trunk: CHANGES.txt lang/py/src/avro/ipc.py lang/py/src/avro/txipc.py lang/py/test/txsample_http_client.py lang/py/test/txsample_http_server.py

Author: cutting
Date: Thu Aug 26 21:15:02 2010
New Revision: 989927

URL: http://svn.apache.org/viewvc?rev=989927&view=rev
Log:
AVRO-528. Python: Add support for Twisted.  Contributed by Esteve Fernandez.

Added:
    avro/trunk/lang/py/src/avro/txipc.py
    avro/trunk/lang/py/test/txsample_http_client.py
    avro/trunk/lang/py/test/txsample_http_server.py
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/py/src/avro/ipc.py

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=989927&r1=989926&r2=989927&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Aug 26 21:15:02 2010
@@ -60,6 +60,8 @@ Avro 1.4.0 (unreleased)
 
     AVRO-611. IDL: Add support for one-way messages. (cutting)
 
+    AVRO-528. Python: Add support for Twisted.  (Esteve Fernandez via cutting)
+
   IMPROVEMENTS
 
     AVRO-629. Prefer the JSON module of python's stdlib over simplejson.

Modified: avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ipc.py?rev=989927&r1=989926&r2=989927&view=diff
==============================================================================
--- avro/trunk/lang/py/src/avro/ipc.py (original)
+++ avro/trunk/lang/py/src/avro/ipc.py Thu Aug 26 21:15:02 2010
@@ -75,7 +75,7 @@ class ConnectionClosedException(schema.A
 # Base IPC Classes (Requestor/Responder)
 #
 
-class Requestor(object):
+class BaseRequestor(object):
   """Base class for the client side of a protocol interaction."""
   def __init__(self, local_protocol, transceiver):
     self._local_protocol = local_protocol
@@ -116,15 +116,7 @@ class Requestor(object):
 
     # send the handshake and call request; block until call response
     call_request = buffer_writer.getvalue()
-    call_response = self.transceiver.transceive(call_request)
-
-    # process the handshake and call response
-    buffer_decoder = io.BinaryDecoder(StringIO(call_response))
-    call_response_exists = self.read_handshake_response(buffer_decoder)
-    if call_response_exists:
-      return self.read_call_response(message_name, buffer_decoder)
-    else:
-      return self.request(message_name, request_datum)
+    return self.issue_request(call_request, message_name, request_datum)
 
   def write_handshake_request(self, encoder):
     local_hash = self.local_protocol.md5
@@ -232,6 +224,19 @@ class Requestor(object):
     datum_reader = io.DatumReader(writers_schema, readers_schema)
     return AvroRemoteException(datum_reader.read(decoder))
 
+class Requestor(BaseRequestor):
+
+  def issue_request(self, call_request, message_name, request_datum):
+    call_response = self.transceiver.transceive(call_request)
+
+    # process the handshake and call response
+    buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+    call_response_exists = self.read_handshake_response(buffer_decoder)
+    if call_response_exists:
+      return self.read_call_response(message_name, buffer_decoder)
+    else:
+      return self.request(message_name, request_datum)
+
 class Responder(object):
   """Base class for the server side of a protocol interaction."""
   def __init__(self, local_protocol):

Added: avro/trunk/lang/py/src/avro/txipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/txipc.py?rev=989927&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/txipc.py (added)
+++ avro/trunk/lang/py/src/avro/txipc.py Thu Aug 26 21:15:02 2010
@@ -0,0 +1,222 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+try:
+  from cStringIO import StringIO
+except ImportError:
+  from StringIO import StringIO
+from avro import ipc
+from avro import io
+
+from zope.interface import implements
+
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+from twisted.internet.defer import maybeDeferred, Deferred
+from twisted.web.iweb import IBodyProducer
+from twisted.web import resource, server
+from twisted.internet.protocol import Protocol
+
+class TwistedRequestor(ipc.BaseRequestor):
+  """A Twisted-compatible requestor. Returns a Deferred that will fire with the
+     returning value, instead of blocking until the request completes."""
+  def _process_handshake(self, call_response, message_name, request_datum):
+    # process the handshake and call response
+    buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+    call_response_exists = self.read_handshake_response(buffer_decoder)
+    if call_response_exists:
+      return self.read_call_response(message_name, buffer_decoder)
+    else:
+      return self.request(message_name, request_datum)
+
+  def issue_request(self, call_request, message_name, request_datum):
+    d = self.transceiver.transceive(call_request)
+    d.addCallback(self._process_handshake, message_name, request_datum)
+    return d
+
+class RequestStreamingProducer(object):
+  """A streaming producer for issuing requests with the Twisted.web Agent."""
+  implements(IBodyProducer)
+
+  paused = False
+  stopped = False
+  started = False
+
+  def __init__(self, message):
+    self._message = message
+    self._length = len(message)
+    # We need a buffer length header for every buffer and an additional
+    # zero-length buffer as the message terminator
+    self._length += (self._length / ipc.BUFFER_SIZE + 2) \
+      * ipc.BUFFER_HEADER_LENGTH
+    self._total_bytes_sent = 0
+    self._deferred = Deferred()
+
+  # read-only properties
+  message = property(lambda self: self._message)
+  length = property(lambda self: self._length)
+  consumer = property(lambda self: self._consumer)
+  deferred = property(lambda self: self._deferred)
+
+  def _get_total_bytes_sent(self):
+    return self._total_bytes_sent
+
+  def _set_total_bytes_sent(self, bytes_sent):
+    self._total_bytes_sent = bytes_sent
+
+  total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent)
+
+  def startProducing(self, consumer):
+    if self.started:
+      return
+
+    self.started = True
+    self._consumer = consumer
+    # Keep writing data to the consumer until we're finished,
+    # paused (pauseProducing()) or stopped (stopProducing())
+    while self.length - self.total_bytes_sent > 0 and \
+      not self.paused and not self.stopped:
+      self.write()
+    # self.write will fire this deferred once it has written
+    # the entire message to the consumer
+    return self.deferred
+
+  def resumeProducing(self):
+    self.paused = False
+    self.write(self)
+
+  def pauseProducing(self):
+    self.paused = True
+
+  def stopProducing(self):
+    self.stopped = True
+
+  def write(self):
+    if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE:
+      buffer_length = ipc.BUFFER_SIZE
+    else:
+      buffer_length = self.length - self.total_bytes_sent
+    self.write_buffer(self.message[self.total_bytes_sent:
+                              (self.total_bytes_sent + buffer_length)])
+    self.total_bytes_sent += buffer_length
+    # Make sure we wrote the entire message
+    if self.total_bytes_sent == self.length and not self.stopped:
+      self.stopProducing()
+      # A message is always terminated by a zero-length buffer.
+      self.write_buffer_length(0)
+      self.deferred.callback(None)
+
+  def write_buffer(self, chunk):
+    buffer_length = len(chunk)
+    self.write_buffer_length(buffer_length)
+    self.consumer.write(chunk)
+
+  def write_buffer_length(self, n):
+    self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n))
+
+class AvroProtocol(Protocol):
+
+  recvd = ''
+  done = False
+
+  def __init__(self, finished):
+    self.finished = finished
+    self.message = []
+
+  def dataReceived(self, data):
+    self.recvd = self.recvd + data
+    while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH:
+      buffer_length ,= ipc.BIG_ENDIAN_INT_STRUCT.unpack(
+        self.recvd[:ipc.BUFFER_HEADER_LENGTH])
+      if buffer_length == 0:
+        response = ''.join(self.message)
+        self.done = True
+        self.finished.callback(response)
+        break
+      if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH:
+        break
+      buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + ipc.BUFFER_HEADER_LENGTH]
+      self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:]
+      self.message.append(buffer)
+
+  def connectionLost(self, reason):
+    if not self.done:
+      self.finished.errback(ipc.ConnectionClosedException("Reader read 0 bytes."))
+
+class TwistedHTTPTransceiver(object):
+  """This transceiver uses the Agent class present in Twisted.web >= 9.0
+     for issuing requests to the remote endpoint."""
+  def __init__(self, host, port, remote_name=None, reactor=None):
+    self.url = "http://%s:%d/" % (host, port)
+
+    if remote_name is None:
+      # There's no easy way to get this peer's remote address
+      # in Twisted so I use a random UUID to identify ourselves
+      import uuid
+      self.remote_name = uuid.uuid4()
+
+    if reactor is None:
+      from twisted.internet import reactor
+    self.agent = Agent(reactor)
+
+  def read_framed_message(self, response):
+    finished = Deferred()
+    response.deliverBody(AvroProtocol(finished))
+    return finished
+
+  def transceive(self, request):
+    req_method = 'POST'
+    req_headers = {
+      'Content-Type': ['avro/binary'],
+      'Accept-Encoding': ['identity'],
+    }
+
+    body_producer = RequestStreamingProducer(request)
+    d = self.agent.request(
+      req_method,
+      self.url,
+      headers=Headers(req_headers),
+      bodyProducer=body_producer)
+    return d.addCallback(self.read_framed_message)
+
+class AvroResponderResource(resource.Resource):
+  """This Twisted.web resource can be placed anywhere in a URL hierarchy
+     to provide an Avro endpoint. Different Avro protocols can be served
+     by the same web server as long as they are in different resources in
+     a URL hierarchy."""
+  isLeaf = True
+
+  def __init__(self, responder):
+    resource.Resource.__init__(self)
+    self.responder = responder
+
+  def cb_render_POST(self, resp_body, request):
+    request.setResponseCode(200)
+    request.setHeader('Content-Type', 'avro/binary')
+    resp_writer = ipc.FramedWriter(request)
+    resp_writer.write_framed_message(resp_body)
+    request.finish()
+
+  def render_POST(self, request):
+    # Unfortunately, Twisted.web doesn't support incoming
+    # streamed input yet, the whole payload must be kept in-memory
+    request.content.seek(0, 0)
+    call_request_reader = ipc.FramedReader(request.content)
+    call_request = call_request_reader.read_framed_message()
+    d = maybeDeferred(self.responder.respond, call_request)
+    d.addCallback(self.cb_render_POST, request)
+    return server.NOT_DONE_YET

Added: avro/trunk/lang/py/test/txsample_http_client.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/txsample_http_client.py?rev=989927&view=auto
==============================================================================
--- avro/trunk/lang/py/test/txsample_http_client.py (added)
+++ avro/trunk/lang/py/test/txsample_http_client.py Thu Aug 26 21:15:02 2010
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+
+from twisted.internet import reactor, defer
+from twisted.python.util import println
+
+from avro import protocol
+from avro import txipc
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+  def __init__(self, value):
+    self.value = value
+  def __str__(self):
+    return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+  client = txipc.TwistedHTTPTransceiver(SERVER_HOST, SERVER_PORT)
+  return txipc.TwistedRequestor(protocol, client)
+
+if __name__ == '__main__':
+  if len(sys.argv) not in [4, 5]:
+    raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+  # client code - attach to the server and send a message
+  # fill in the Message record
+  message = dict()
+  message['to'] = sys.argv[1]
+  message['from'] = sys.argv[2]
+  message['body'] = sys.argv[3]
+
+  try:
+    num_messages = int(sys.argv[4])
+  except:
+    num_messages = 1
+
+  # build the parameters for the request
+  params = {}
+  params['message'] = message
+
+  requests = []
+  # send the requests and print the result
+  for msg_count in range(num_messages):
+    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+    d = requestor.request('send', params)
+    d.addCallback(lambda result: println("Result: " + result))
+    requests.append(d)
+  results = defer.gatherResults(requests)
+
+  def replay_cb(result):
+    print("Replay Result: " + result)
+    reactor.stop()
+
+  def replay(_):
+    # try out a replay message
+    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+    d = requestor.request('replay', dict())
+    d.addCallback(replay_cb)
+
+  results.addCallback(replay)
+  reactor.run()

Added: avro/trunk/lang/py/test/txsample_http_server.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/txsample_http_server.py?rev=989927&view=auto
==============================================================================
--- avro/trunk/lang/py/test/txsample_http_server.py (added)
+++ avro/trunk/lang/py/test/txsample_http_server.py Thu Aug 26 21:15:02 2010
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.web import server
+from twisted.internet import reactor
+
+from avro import ipc
+from avro import protocol
+from avro import txipc
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+  def __init__(self):
+    ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+  def invoke(self, message, request):
+    if message.name == 'send':
+      request_content = request['message']
+      response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+                 request_content
+      return response
+    elif message.name == 'replay':
+      return 'replay'
+
+if __name__ == '__main__':
+  root = server.Site(txipc.AvroResponderResource(MailResponder()))
+  reactor.listenTCP(9090, root)
+  reactor.run()