You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/01/12 01:50:29 UTC

svn commit: r898139 - in /hadoop/avro/trunk: CHANGES.txt src/py/avro/io.py src/py/avro/ipc.py src/py/avro/protocol.py src/py/avro/schema.py src/test/py/sample_ipc_server.py

Author: cutting
Date: Tue Jan 12 00:50:29 2010
New Revision: 898139

URL: http://svn.apache.org/viewvc?rev=898139&view=rev
Log:
AVRO-288. Implement schema resolution for Python parameters.  Contributed by Jeff Hammerbacher.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/py/avro/io.py
    hadoop/avro/trunk/src/py/avro/ipc.py
    hadoop/avro/trunk/src/py/avro/protocol.py
    hadoop/avro/trunk/src/py/avro/schema.py
    hadoop/avro/trunk/src/test/py/sample_ipc_server.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Jan 12 00:50:29 2010
@@ -191,6 +191,9 @@
     AVRO-298. Fix Java's DatumReader and DatumWriter APIs to better
     use generics.  (philz via cutting)
 
+    AVRO-288. Implement schema resolution for Python parameters.
+    (Jeff Hammerbacher via cutting)
+
   OPTIMIZATIONS
 
     AVRO-172. More efficient schema processing (massie)

Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Tue Jan 12 00:50:29 2010
@@ -109,7 +109,7 @@
         [validate(expected_schema.values, v) for v in datum.values()])
   elif schema_type == 'union':
     return True in [validate(s, datum) for s in expected_schema.schemas]
-  elif schema_type == 'record':
+  elif schema_type in ['record', 'error', 'request']:
     return (isinstance(datum, dict) and
       False not in
         [validate(f.type, datum.get(f.name)) for f in expected_schema.fields])
@@ -354,6 +354,12 @@
           DatumReader.check_props(writers_schema, readers_schema, 
                                   ['fullname'])):
       return True
+    elif (w_type == r_type == 'error' and
+          DatumReader.check_props(writers_schema, readers_schema, 
+                                  ['fullname'])):
+      return True
+    elif (w_type == r_type == 'request'):
+      return True
     elif (w_type == r_type == 'fixed' and 
           DatumReader.check_props(writers_schema, readers_schema, 
                                   ['fullname', 'size'])):
@@ -415,7 +421,7 @@
       for s in readers_schema.schemas:
         if DatumReader.match_schemas(writers_schema, s):
           return self.read_data(writers_schema, s, decoder)
-      fail_msg = 'Schemas do not match.'      
+      fail_msg = 'Schemas do not match.'
       raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
 
     # function dispatch for reading data based on type of writer's schema
@@ -445,7 +451,7 @@
       return self.read_map(writers_schema, readers_schema, decoder)
     elif writers_schema.type == 'union':
       return self.read_union(writers_schema, readers_schema, decoder)
-    elif writers_schema.type == 'record':
+    elif writers_schema.type in ['record', 'error', 'request']:
       return self.read_record(writers_schema, readers_schema, decoder)
     else:
       fail_msg = "Cannot read unknown schema type: %s" % writers_schema.type
@@ -478,7 +484,7 @@
       return self.skip_map(writers_schema, decoder)
     elif writers_schema.type == 'union':
       return self.skip_union(writers_schema, decoder)
-    elif writers_schema.type == 'record':
+    elif writers_schema.type in ['record', 'error', 'request']:
       return self.skip_record(writers_schema, decoder)
     else:
       fail_msg = "Unknown schema type: %s" % schm.type
@@ -745,11 +751,11 @@
       self.write_map(writers_schema, datum, encoder)
     elif writers_schema.type == 'union':
       self.write_union(writers_schema, datum, encoder)
-    elif writers_schema.type == 'record':
+    elif writers_schema.type in ['record', 'error', 'request']:
       self.write_record(writers_schema, datum, encoder)
     else:
       fail_msg = 'Unknown type: %s' % writers_schema.type
-      raise io.AvroException(fail_msg)
+      raise schema.AvroException(fail_msg)
 
   def write_fixed(self, writers_schema, datum, encoder):
     """

Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Tue Jan 12 00:50:29 2010
@@ -178,13 +178,9 @@
     # message parameters
     self.write_request(message.request, request_datum, encoder)
 
-  def write_request(self, request_fields, request_datum, encoder):
-    """
-    Looks an awful lot like new_io.write_record, eh?
-    """
-    for field in request_fields:
-      datum_writer = io.DatumWriter(field.type)
-      datum_writer.write(request_datum.get(field.name), encoder)
+  def write_request(self, request_schema, request_datum, encoder):
+    datum_writer = io.DatumWriter(request_schema)
+    datum_writer.write(request_datum, encoder)
 
   def read_handshake_response(self, decoder):
     handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
@@ -275,7 +271,8 @@
     a response or error. Compare to 'handle()' in Thrift.
     """
     call_request = transport.read_framed_message()
-    buffer_decoder = io.BinaryDecoder(cStringIO.StringIO(call_request))
+    buffer_reader = cStringIO.StringIO(call_request)
+    buffer_decoder = io.BinaryDecoder(buffer_reader)
     buffer_writer = cStringIO.StringIO()
     buffer_encoder = io.BinaryEncoder(buffer_writer)
     error = None
@@ -287,7 +284,7 @@
       # handshake failure
       if remote_protocol is None:  
         return buffer_writer.getvalue()
-      
+
       # read request using remote protocol
       request_metadata = META_READER.read(buffer_decoder)
       remote_message_name = buffer_decoder.read_utf8()
@@ -302,9 +299,11 @@
       if local_message is None:
         fail_msg = 'Unknown local message: %s' % remote_message_name
         raise schema.AvroException(fail_msg)
-      writers_fields = remote_message.request
-      # TODO(hammer) pass reader schema
-      request = self.read_request(writers_fields, buffer_decoder)
+      writers_schema = remote_message.request
+      readers_schema = local_message.request
+      request = self.read_request(writers_schema, readers_schema,
+                                  buffer_decoder)
+
       # perform server logic
       try:
         response = self.invoke(local_message, request)
@@ -368,15 +367,9 @@
     """
     pass
 
-  def read_request(self, writers_fields, decoder):
-    """
-    Need to handle schema resolution here. Half-assing it now.
-    """
-    request_data = []
-    for field in writers_fields:
-      datum_reader = io.DatumReader(field.type)
-      request_data.append(datum_reader.read(decoder))
-    return request_data
+  def read_request(self, writers_schema, readers_schema, decoder):
+    datum_reader = io.DatumReader(writers_schema, readers_schema)
+    return datum_reader.read(decoder)
 
   def write_response(self, writers_schema, response_datum, encoder):
     datum_writer = io.DatumWriter(writers_schema)

Modified: hadoop/avro/trunk/src/py/avro/protocol.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/protocol.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/protocol.py (original)
+++ hadoop/avro/trunk/src/py/avro/protocol.py Tue Jan 12 00:50:29 2010
@@ -49,7 +49,7 @@
     for type in types:
       type_object = schema.make_avsc_object(type, type_names)
       if type_object.type not in VALID_TYPE_SCHEMA_TYPES:
-        fail_msg = 'Type %s not an enum, record, or error.' % type
+        fail_msg = 'Type %s not an enum, fixed, record, or error.' % type
         raise ProtocolParseException(fail_msg)
       type_objects.append(type_object)
     return type_objects
@@ -63,7 +63,6 @@
       elif not(hasattr(body, 'get') and callable(body.get)):
         fail_msg = 'Message name "%s" has non-object body %s.' % (name, body)
         raise ProtocolParseException(fail_msg)
-
       request = body.get('request')
       response = body.get('response')
       errors = body.get('errors')
@@ -142,7 +141,7 @@
     if not isinstance(request, list):
       fail_msg = 'Request property not a list: %s' % request
       raise ProtocolParseException(fail_msg)
-    return schema.RecordSchema.make_field_objects(request, names)
+    return schema.RecordSchema(None, None, request, names, 'request')
   
   def _parse_response(self, response, names):
     if isinstance(response, basestring) and names.has_key(response):
@@ -184,7 +183,7 @@
   # TODO(hammer): allow schemas and fields to be JSON Encoded!
   def __str__(self):
     to_dump = {}
-    to_dump['request'] = [json.loads(str(r)) for r in self.request]
+    to_dump['request'] = json.loads(str(self.request))
     if self.response_from_names:
       to_dump['response'] = self.response.fullname
     else:

Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Tue Jan 12 00:50:29 2010
@@ -64,6 +64,7 @@
   'array',
   'map',
   'union',
+  'request',
 )
 
 RESERVED_PROPS = (
@@ -496,7 +497,10 @@
       raise SchemaParseException(fail_msg)
 
     # Call parent ctor (adds own name to namespace, too)
-    NamedSchema.__init__(self, schema_type, name, namespace, names)
+    if schema_type == 'request':
+      Schema.__init__(self, schema_type)
+    else:
+      NamedSchema.__init__(self, schema_type, name, namespace, names)
 
     # Add class members
     field_objects = RecordSchema.make_field_objects(fields, names)
@@ -514,11 +518,11 @@
 
   def __str__(self):
     to_dump = self.props.copy()
-    new_fields = []
-    for field in to_dump['fields']:
-      new_fields.append(json.loads(str(field)))
-    to_dump['fields'] = new_fields
-    return json.dumps(to_dump)
+    to_dump['fields'] = [json.loads(str(f)) for f in self.fields]
+    if self.type == 'request':
+      return json.dumps(to_dump['fields'])
+    else:
+      return json.dumps(to_dump)
 
   def __eq__(self, that):
     to_cmp = json.loads(str(self))

Modified: hadoop/avro/trunk/src/test/py/sample_ipc_server.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/sample_ipc_server.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/sample_ipc_server.py (original)
+++ hadoop/avro/trunk/src/test/py/sample_ipc_server.py Tue Jan 12 00:50:29 2010
@@ -55,7 +55,7 @@
 
   def invoke(self, message, request):
     if message.name == 'send':
-      request_content = request[0]
+      request_content = request['message']
       response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
                  request_content
       return response
@@ -70,4 +70,5 @@
 
 if __name__ == '__main__':
   mail_server = TCPServer(SERVER_ADDRESS, MailHandler)
+  mail_server.allow_reuse_address = True
   mail_server.serve_forever()