You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ha...@apache.org on 2010/02/15 23:13:09 UTC
svn commit: r910347 - in /hadoop/avro/trunk: CHANGES.txt
lang/py/src/avro/ipc.py lang/py/src/avro/tool.py
share/test/interop/bin/test_rpc_interop.sh
Author: hammer
Date: Mon Feb 15 22:13:08 2010
New Revision: 910347
URL: http://svn.apache.org/viewvc?rev=910347&view=rev
Log:
AVRO-287. Make RPC interop tests work with new Python implementation (hammer)
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/py/src/avro/ipc.py
hadoop/avro/trunk/lang/py/src/avro/tool.py
hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Mon Feb 15 22:13:08 2010
@@ -90,6 +90,9 @@
AVRO-322. Add a working client and server to Python implementation
using HTTP as a transport (hammer)
+ AVRO-287. Make RPC interop tests work with new Python implementation
+ (hammer)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Modified: hadoop/avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/ipc.py?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/ipc.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/ipc.py Mon Feb 15 22:13:08 2010
@@ -16,6 +16,7 @@
"""
Support for inter-process calls.
"""
+import httplib
import cStringIO
import struct
from avro import io
@@ -112,10 +113,12 @@
REMOTE_PROTOCOLS[self.transceiver.remote_name] = self.remote_protocol
remote_protocol = property(lambda self: self._remote_protocol,
set_remote_protocol)
+
def set_remote_hash(self, new_remote_hash):
self._remote_hash = new_remote_hash
REMOTE_HASHES[self.transceiver.remote_name] = self.remote_hash
remote_hash = property(lambda self: self._remote_hash, set_remote_hash)
+
def set_send_protocol(self, new_send_protocol):
self._send_protocol = new_send_protocol
send_protocol = property(lambda self: self._send_protocol, set_send_protocol)
@@ -140,7 +143,7 @@
if call_response_exists:
return self.read_call_response(message_name, buffer_decoder)
else:
- self.request(message_name, request_datum)
+ return self.request(message_name, request_datum)
def write_handshake_request(self, encoder):
local_hash = self.local_protocol.md5
@@ -190,14 +193,16 @@
elif match == 'CLIENT':
if self.send_protocol:
raise schema.AvroException('Handshake failure.')
- self.remote_protocol = handshake_response.get('serverProtocol')
+ self.remote_protocol = protocol.parse(
+ handshake_response.get('serverProtocol'))
self.remote_hash = handshake_response.get('serverHash')
self.send_protocol = False
return False
elif match == 'NONE':
if self.send_protocol:
raise schema.AvroException('Handshake failure.')
- self.remote_protocol = handshake_response.get('serverProtocol')
+ self.remote_protocol = protocol.parse(
+ handshake_response.get('serverProtocol'))
self.remote_hash = handshake_response.get('serverHash')
self.send_protocol = True
return False
@@ -239,7 +244,8 @@
def read_response(self, writers_schema, readers_schema, decoder):
datum_reader = io.DatumReader(writers_schema, readers_schema)
- return datum_reader.read(decoder)
+ result = datum_reader.read(decoder)
+ return result
def read_error(self, writers_schema, readers_schema, decoder):
datum_reader = io.DatumReader(writers_schema, readers_schema)
@@ -448,16 +454,24 @@
Useful for clients but not for servers
"""
def __init__(self, conn):
- self._conn = conn
+ self.conn = conn
# read-only properties
- conn = property(lambda self: self._conn)
sock = property(lambda self: self.conn.sock)
remote_name = property(lambda self: self.sock.getsockname())
+ # read/write properties
+ def set_conn(self, new_conn):
+ self._conn = new_conn
+ conn = property(lambda self: self._conn, set_conn)
+
def transceive(self, request):
+ self.conn.close()
+ self.conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+ conn_success = self.conn.connect()
self.write_framed_message(request)
- return self.read_framed_message()
+ result = self.read_framed_message()
+ return result
def read_framed_message(self):
response_reader = FramedReader(self.conn.getresponse())
Modified: hadoop/avro/trunk/lang/py/src/avro/tool.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/tool.py?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/tool.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/tool.py Mon Feb 15 22:13:08 2010
@@ -1,3 +1,4 @@
+#! /usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,17 +15,98 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
-Command-line tool for manipulating Avro data files.
+Command-line tool
NOTE: The API for the command-line tool is experimental.
"""
-
import sys
-from avro import datafile, io
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+import httplib
+import urlparse
+from avro import io
+from avro import datafile
+from avro import protocol
+from avro import ipc
+
+class GenericResponder(ipc.Responder):
+ def __init__(self, proto, msg, datum):
+ proto_json = file(proto, 'r').read()
+ ipc.Responder.__init__(self, protocol.parse(proto_json))
+ self.msg = msg
+ self.datum = datum
+
+ def invoke(self, message, request):
+ if message.name == self.msg:
+ print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum)
+ # server will shut down after processing a single Avro request
+ global server_should_shutdown
+ server_should_shutdown = True
+ return self.datum
+
+class GenericHandler(BaseHTTPRequestHandler):
+ def do_POST(self):
+ self.responder = responder
+ call_request_reader = ipc.FramedReader(self.rfile)
+ call_request = call_request_reader.read_framed_message()
+ resp_body = self.responder.respond(call_request)
+ self.send_response(200)
+ self.send_header('Content-Type', 'avro/binary')
+ self.end_headers()
+ resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer.write_framed_message(resp_body)
+ if server_should_shutdown:
+ print >> sys.stderr, "Shutting down server."
+ self.server.force_stop()
+
+class StoppableHTTPServer(HTTPServer):
+ """HTTPServer.shutdown added in Python 2.6. FML."""
+ stopped = False
+ allow_reuse_address = True
+ def __init__(self, *args, **kw):
+ HTTPServer.__init__(self, *args, **kw)
+ self.allow_reuse_address = True
+
+ def serve_forever(self):
+ while not self.stopped:
+ self.handle_request()
+
+ def force_stop(self):
+ self.server_close()
+ self.stopped = True
+ self.serve_forever()
+
+def run_server(uri, proto, msg, datum):
+ url_obj = urlparse.urlparse(uri)
+ server_addr = (url_obj.hostname, url_obj.port)
+ global responder
+ global server_should_shutdown
+ server_should_shutdown = False
+ responder = GenericResponder(proto, msg, datum)
+ server = StoppableHTTPServer(server_addr, GenericHandler)
+ print "Port: %s" % server.server_port
+ sys.stdout.flush()
+ server.allow_reuse_address = True
+ print >> sys.stderr, "Starting server."
+ server.serve_forever()
+
+def send_message(uri, proto, msg, datum):
+ url_obj = urlparse.urlparse(uri)
+ conn = httplib.HTTPConnection(url_obj.hostname, url_obj.port)
+ conn.connect()
+ client = ipc.HTTPTransceiver(conn)
+ proto_json = file(proto, 'r').read()
+ requestor = ipc.Requestor(protocol.parse(proto_json), client)
+ print requestor.request(msg, datum)
+
+def file_or_stdin(f):
+ if f == "-":
+ return sys.stdin
+ else:
+ return file(f)
def main(args=sys.argv):
if len(args) == 1:
- print "Usage: %s (dump)" % args[0]
+ print "Usage: %s [dump|rpcreceive|rpcsend]" % args[0]
return 1
if args[1] == "dump":
@@ -33,13 +115,49 @@
return 1
for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
print repr(d)
+ elif args[1] == "rpcreceive":
+ usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
+ usage_str += "message_name (-data d | -file f)"
+ if len(args) not in [5, 7]:
+ print usage_str
+ return 1
+ uri, proto, msg = args[2:5]
+ datum = None
+ if len(args) > 5:
+ if args[5] == "-file":
+ reader = open(args[6], 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ datum = dfr.next()
+ elif args[5] == "-data":
+ print "JSON Decoder not yet implemented."
+ return 1
+ else:
+ print usage_str
+ return 1
+ run_server(uri, proto, msg, datum)
+ elif args[1] == "rpcsend":
+ usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
+ usage_str += "message_name (-data d | -file f)"
+ if len(args) not in [5, 7]:
+ print usage_str
+ return 1
+ uri, proto, msg = args[2:5]
+ datum = None
+ if len(args) > 5:
+ if args[5] == "-file":
+ reader = open(args[6], 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ datum = dfr.next()
+ elif args[5] == "-data":
+ print "JSON Decoder not yet implemented."
+ return 1
+ else:
+ print usage_str
+ return 1
+ send_message(uri, proto, msg, datum)
return 0
-def file_or_stdin(f):
- if f == "-":
- return sys.stdin
- else:
- return file(f)
-
if __name__ == "__main__":
sys.exit(main(sys.argv))
Modified: hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh (original)
+++ hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh Mon Feb 15 22:13:08 2010
@@ -26,8 +26,13 @@
java_client="java -jar lang/java/build/avro-tools-$VERSION.jar rpcsend"
java_server="java -jar lang/java/build/avro-tools-$VERSION.jar rpcreceive"
-clients=("$java_client")
-servers=("$java_server")
+py_client="python lang/py/src/avro/tool.py rpcsend"
+py_server="python lang/py/src/avro/tool.py rpcreceive"
+
+export PYTHONPATH=$PYTHONPATH:lang/py/src # path to avro Python module
+
+clients=("$java_client" "$py_client")
+servers=("$java_server" "$py_server")
proto=share/test/schemas/simple.avpr