You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ha...@apache.org on 2010/02/15 23:13:09 UTC

svn commit: r910347 - in /hadoop/avro/trunk: CHANGES.txt lang/py/src/avro/ipc.py lang/py/src/avro/tool.py share/test/interop/bin/test_rpc_interop.sh

Author: hammer
Date: Mon Feb 15 22:13:08 2010
New Revision: 910347

URL: http://svn.apache.org/viewvc?rev=910347&view=rev
Log:
AVRO-287. Make RPC interop tests work with new Python implementation (hammer)


Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/py/src/avro/ipc.py
    hadoop/avro/trunk/lang/py/src/avro/tool.py
    hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Mon Feb 15 22:13:08 2010
@@ -90,6 +90,9 @@
     AVRO-322. Add a working client and server to Python implementation 
     using HTTP as a transport (hammer)
 
+    AVRO-287. Make RPC interop tests work with new Python implementation
+    (hammer)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Modified: hadoop/avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/ipc.py?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/ipc.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/ipc.py Mon Feb 15 22:13:08 2010
@@ -16,6 +16,7 @@
 """
 Support for inter-process calls.
 """
+import httplib
 import cStringIO
 import struct
 from avro import io
@@ -112,10 +113,12 @@
     REMOTE_PROTOCOLS[self.transceiver.remote_name] = self.remote_protocol
   remote_protocol = property(lambda self: self._remote_protocol,
                              set_remote_protocol)
+
   def set_remote_hash(self, new_remote_hash):
     self._remote_hash = new_remote_hash
     REMOTE_HASHES[self.transceiver.remote_name] = self.remote_hash
   remote_hash = property(lambda self: self._remote_hash, set_remote_hash)
+
   def set_send_protocol(self, new_send_protocol):
     self._send_protocol = new_send_protocol
   send_protocol = property(lambda self: self._send_protocol, set_send_protocol)
@@ -140,7 +143,7 @@
     if call_response_exists:
       return self.read_call_response(message_name, buffer_decoder)
     else:
-      self.request(message_name, request_datum)
+      return self.request(message_name, request_datum)
 
   def write_handshake_request(self, encoder):
     local_hash = self.local_protocol.md5
@@ -190,14 +193,16 @@
     elif match == 'CLIENT':
       if self.send_protocol:
         raise schema.AvroException('Handshake failure.')
-      self.remote_protocol = handshake_response.get('serverProtocol')
+      self.remote_protocol = protocol.parse(
+                             handshake_response.get('serverProtocol'))
       self.remote_hash = handshake_response.get('serverHash')
       self.send_protocol = False
       return False
     elif match == 'NONE':
       if self.send_protocol:
         raise schema.AvroException('Handshake failure.')
-      self.remote_protocol = handshake_response.get('serverProtocol')
+      self.remote_protocol = protocol.parse(
+                             handshake_response.get('serverProtocol'))
       self.remote_hash = handshake_response.get('serverHash')
       self.send_protocol = True
       return False
@@ -239,7 +244,8 @@
 
   def read_response(self, writers_schema, readers_schema, decoder):
     datum_reader = io.DatumReader(writers_schema, readers_schema)
-    return datum_reader.read(decoder)
+    result = datum_reader.read(decoder)
+    return result
 
   def read_error(self, writers_schema, readers_schema, decoder):
     datum_reader = io.DatumReader(writers_schema, readers_schema)
@@ -448,16 +454,24 @@
   Useful for clients but not for servers
   """
   def __init__(self, conn):
-    self._conn = conn
+    self.conn = conn
 
   # read-only properties
-  conn = property(lambda self: self._conn)
   sock = property(lambda self: self.conn.sock)
   remote_name = property(lambda self: self.sock.getsockname())
 
+  # read/write properties
+  def set_conn(self, new_conn):
+    self._conn = new_conn
+  conn = property(lambda self: self._conn, set_conn)
+
   def transceive(self, request):
+    self.conn.close()
+    self.conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+    conn_success = self.conn.connect()
     self.write_framed_message(request)
-    return self.read_framed_message()
+    result = self.read_framed_message()
+    return result
 
   def read_framed_message(self):
     response_reader = FramedReader(self.conn.getresponse())

Modified: hadoop/avro/trunk/lang/py/src/avro/tool.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/tool.py?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/tool.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/tool.py Mon Feb 15 22:13:08 2010
@@ -1,3 +1,4 @@
+#! /usr/bin/env python
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,17 +15,98 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 """
-Command-line tool for manipulating Avro data files.
+Command-line tool
 
 NOTE: The API for the command-line tool is experimental.
 """
-
 import sys
-from avro import datafile, io
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+import httplib
+import urlparse
+from avro import io
+from avro import datafile
+from avro import protocol
+from avro import ipc
+
+class GenericResponder(ipc.Responder):
+  def __init__(self, proto, msg, datum):
+    proto_json = file(proto, 'r').read()
+    ipc.Responder.__init__(self, protocol.parse(proto_json))
+    self.msg = msg
+    self.datum = datum
+
+  def invoke(self, message, request):
+    if message.name == self.msg:
+      print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum)
+      # server will shut down after processing a single Avro request
+      global server_should_shutdown
+      server_should_shutdown = True
+      return self.datum
+
+class GenericHandler(BaseHTTPRequestHandler):
+  def do_POST(self):
+    self.responder = responder
+    call_request_reader = ipc.FramedReader(self.rfile)
+    call_request = call_request_reader.read_framed_message()
+    resp_body = self.responder.respond(call_request)
+    self.send_response(200)
+    self.send_header('Content-Type', 'avro/binary')
+    self.end_headers()
+    resp_writer = ipc.FramedWriter(self.wfile)
+    resp_writer.write_framed_message(resp_body)
+    if server_should_shutdown:
+      print >> sys.stderr, "Shutting down server."
+      self.server.force_stop()
+
+class StoppableHTTPServer(HTTPServer):
+  """HTTPServer.shutdown added in Python 2.6. FML."""
+  stopped = False
+  allow_reuse_address = True
+  def __init__(self, *args, **kw):
+    HTTPServer.__init__(self, *args, **kw)
+    self.allow_reuse_address = True
+
+  def serve_forever(self):
+    while not self.stopped:
+      self.handle_request()
+
+  def force_stop(self):
+    self.server_close()
+    self.stopped = True
+    self.serve_forever()
+
+def run_server(uri, proto, msg, datum):
+  url_obj = urlparse.urlparse(uri)
+  server_addr = (url_obj.hostname, url_obj.port)
+  global responder
+  global server_should_shutdown
+  server_should_shutdown = False
+  responder = GenericResponder(proto, msg, datum)
+  server = StoppableHTTPServer(server_addr, GenericHandler)
+  print "Port: %s" % server.server_port
+  sys.stdout.flush()
+  server.allow_reuse_address = True
+  print >> sys.stderr, "Starting server."
+  server.serve_forever()
+
+def send_message(uri, proto, msg, datum):
+  url_obj = urlparse.urlparse(uri)
+  conn = httplib.HTTPConnection(url_obj.hostname, url_obj.port)
+  conn.connect()
+  client = ipc.HTTPTransceiver(conn)
+  proto_json = file(proto, 'r').read()
+  requestor = ipc.Requestor(protocol.parse(proto_json), client)
+  print requestor.request(msg, datum)
+
+def file_or_stdin(f):
+  if f == "-":
+    return sys.stdin
+  else:
+    return file(f)
 
 def main(args=sys.argv):
   if len(args) == 1:
-    print "Usage: %s (dump)" % args[0]
+    print "Usage: %s [dump|rpcreceive|rpcsend]" % args[0]
     return 1
 
   if args[1] == "dump":
@@ -33,13 +115,49 @@
       return 1
     for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
       print repr(d)
+  elif args[1] == "rpcreceive":
+    usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
+    usage_str += "message_name (-data d | -file f)"
+    if len(args) not in [5, 7]:
+      print usage_str
+      return 1
+    uri, proto, msg = args[2:5]
+    datum = None
+    if len(args) > 5:
+      if args[5] == "-file":
+        reader = open(args[6], 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        datum = dfr.next()
+      elif args[5] == "-data":
+        print "JSON Decoder not yet implemented."
+        return 1
+      else:
+        print usage_str
+        return 1
+    run_server(uri, proto, msg, datum)
+  elif args[1] == "rpcsend":
+    usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
+    usage_str += "message_name (-data d | -file f)"
+    if len(args) not in [5, 7]:
+      print usage_str
+      return 1
+    uri, proto, msg = args[2:5]
+    datum = None
+    if len(args) > 5:
+      if args[5] == "-file":
+        reader = open(args[6], 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        datum = dfr.next()
+      elif args[5] == "-data":
+        print "JSON Decoder not yet implemented."
+        return 1
+      else:
+        print usage_str
+        return 1
+    send_message(uri, proto, msg, datum)
   return 0
   
-def file_or_stdin(f):
-  if f == "-":
-    return sys.stdin
-  else:
-    return file(f)
-
 if __name__ == "__main__":
   sys.exit(main(sys.argv))

Modified: hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh?rev=910347&r1=910346&r2=910347&view=diff
==============================================================================
--- hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh (original)
+++ hadoop/avro/trunk/share/test/interop/bin/test_rpc_interop.sh Mon Feb 15 22:13:08 2010
@@ -26,8 +26,13 @@
 java_client="java -jar lang/java/build/avro-tools-$VERSION.jar rpcsend"
 java_server="java -jar lang/java/build/avro-tools-$VERSION.jar rpcreceive"
 
-clients=("$java_client")
-servers=("$java_server")
+py_client="python lang/py/src/avro/tool.py rpcsend"
+py_server="python lang/py/src/avro/tool.py rpcreceive"
+
+export PYTHONPATH=$PYTHONPATH:lang/py/src         # path to avro Python module
+
+clients=("$java_client" "$py_client")
+servers=("$java_server" "$py_server")
 
 proto=share/test/schemas/simple.avpr