You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/01/12 01:50:29 UTC
svn commit: r898139 - in /hadoop/avro/trunk: CHANGES.txt src/py/avro/io.py
src/py/avro/ipc.py src/py/avro/protocol.py src/py/avro/schema.py
src/test/py/sample_ipc_server.py
Author: cutting
Date: Tue Jan 12 00:50:29 2010
New Revision: 898139
URL: http://svn.apache.org/viewvc?rev=898139&view=rev
Log:
AVRO-288. Implement schema resolution for Python parameters. Contributed by Jeff Hammerbacher.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/py/avro/io.py
hadoop/avro/trunk/src/py/avro/ipc.py
hadoop/avro/trunk/src/py/avro/protocol.py
hadoop/avro/trunk/src/py/avro/schema.py
hadoop/avro/trunk/src/test/py/sample_ipc_server.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Jan 12 00:50:29 2010
@@ -191,6 +191,9 @@
AVRO-298. Fix Java's DatumReader and DatumWriter APIs to better
use generics. (philz via cutting)
+ AVRO-288. Implement schema resolution for Python parameters.
+ (Jeff Hammerbacher via cutting)
+
OPTIMIZATIONS
AVRO-172. More efficient schema processing (massie)
Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Tue Jan 12 00:50:29 2010
@@ -109,7 +109,7 @@
[validate(expected_schema.values, v) for v in datum.values()])
elif schema_type == 'union':
return True in [validate(s, datum) for s in expected_schema.schemas]
- elif schema_type == 'record':
+ elif schema_type in ['record', 'error', 'request']:
return (isinstance(datum, dict) and
False not in
[validate(f.type, datum.get(f.name)) for f in expected_schema.fields])
@@ -354,6 +354,12 @@
DatumReader.check_props(writers_schema, readers_schema,
['fullname'])):
return True
+ elif (w_type == r_type == 'error' and
+ DatumReader.check_props(writers_schema, readers_schema,
+ ['fullname'])):
+ return True
+ elif (w_type == r_type == 'request'):
+ return True
elif (w_type == r_type == 'fixed' and
DatumReader.check_props(writers_schema, readers_schema,
['fullname', 'size'])):
@@ -415,7 +421,7 @@
for s in readers_schema.schemas:
if DatumReader.match_schemas(writers_schema, s):
return self.read_data(writers_schema, s, decoder)
- fail_msg = 'Schemas do not match.'
+ fail_msg = 'Schemas do not match.'
raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
# function dispatch for reading data based on type of writer's schema
@@ -445,7 +451,7 @@
return self.read_map(writers_schema, readers_schema, decoder)
elif writers_schema.type == 'union':
return self.read_union(writers_schema, readers_schema, decoder)
- elif writers_schema.type == 'record':
+ elif writers_schema.type in ['record', 'error', 'request']:
return self.read_record(writers_schema, readers_schema, decoder)
else:
fail_msg = "Cannot read unknown schema type: %s" % writers_schema.type
@@ -478,7 +484,7 @@
return self.skip_map(writers_schema, decoder)
elif writers_schema.type == 'union':
return self.skip_union(writers_schema, decoder)
- elif writers_schema.type == 'record':
+ elif writers_schema.type in ['record', 'error', 'request']:
return self.skip_record(writers_schema, decoder)
else:
fail_msg = "Unknown schema type: %s" % schm.type
@@ -745,11 +751,11 @@
self.write_map(writers_schema, datum, encoder)
elif writers_schema.type == 'union':
self.write_union(writers_schema, datum, encoder)
- elif writers_schema.type == 'record':
+ elif writers_schema.type in ['record', 'error', 'request']:
self.write_record(writers_schema, datum, encoder)
else:
fail_msg = 'Unknown type: %s' % writers_schema.type
- raise io.AvroException(fail_msg)
+ raise schema.AvroException(fail_msg)
def write_fixed(self, writers_schema, datum, encoder):
"""
Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Tue Jan 12 00:50:29 2010
@@ -178,13 +178,9 @@
# message parameters
self.write_request(message.request, request_datum, encoder)
- def write_request(self, request_fields, request_datum, encoder):
- """
- Looks an awful lot like new_io.write_record, eh?
- """
- for field in request_fields:
- datum_writer = io.DatumWriter(field.type)
- datum_writer.write(request_datum.get(field.name), encoder)
+ def write_request(self, request_schema, request_datum, encoder):
+ datum_writer = io.DatumWriter(request_schema)
+ datum_writer.write(request_datum, encoder)
def read_handshake_response(self, decoder):
handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
@@ -275,7 +271,8 @@
a response or error. Compare to 'handle()' in Thrift.
"""
call_request = transport.read_framed_message()
- buffer_decoder = io.BinaryDecoder(cStringIO.StringIO(call_request))
+ buffer_reader = cStringIO.StringIO(call_request)
+ buffer_decoder = io.BinaryDecoder(buffer_reader)
buffer_writer = cStringIO.StringIO()
buffer_encoder = io.BinaryEncoder(buffer_writer)
error = None
@@ -287,7 +284,7 @@
# handshake failure
if remote_protocol is None:
return buffer_writer.getvalue()
-
+
# read request using remote protocol
request_metadata = META_READER.read(buffer_decoder)
remote_message_name = buffer_decoder.read_utf8()
@@ -302,9 +299,11 @@
if local_message is None:
fail_msg = 'Unknown local message: %s' % remote_message_name
raise schema.AvroException(fail_msg)
- writers_fields = remote_message.request
- # TODO(hammer) pass reader schema
- request = self.read_request(writers_fields, buffer_decoder)
+ writers_schema = remote_message.request
+ readers_schema = local_message.request
+ request = self.read_request(writers_schema, readers_schema,
+ buffer_decoder)
+
# perform server logic
try:
response = self.invoke(local_message, request)
@@ -368,15 +367,9 @@
"""
pass
- def read_request(self, writers_fields, decoder):
- """
- Need to handle schema resolution here. Half-assing it now.
- """
- request_data = []
- for field in writers_fields:
- datum_reader = io.DatumReader(field.type)
- request_data.append(datum_reader.read(decoder))
- return request_data
+ def read_request(self, writers_schema, readers_schema, decoder):
+ datum_reader = io.DatumReader(writers_schema, readers_schema)
+ return datum_reader.read(decoder)
def write_response(self, writers_schema, response_datum, encoder):
datum_writer = io.DatumWriter(writers_schema)
Modified: hadoop/avro/trunk/src/py/avro/protocol.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/protocol.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/protocol.py (original)
+++ hadoop/avro/trunk/src/py/avro/protocol.py Tue Jan 12 00:50:29 2010
@@ -49,7 +49,7 @@
for type in types:
type_object = schema.make_avsc_object(type, type_names)
if type_object.type not in VALID_TYPE_SCHEMA_TYPES:
- fail_msg = 'Type %s not an enum, record, or error.' % type
+ fail_msg = 'Type %s not an enum, fixed, record, or error.' % type
raise ProtocolParseException(fail_msg)
type_objects.append(type_object)
return type_objects
@@ -63,7 +63,6 @@
elif not(hasattr(body, 'get') and callable(body.get)):
fail_msg = 'Message name "%s" has non-object body %s.' % (name, body)
raise ProtocolParseException(fail_msg)
-
request = body.get('request')
response = body.get('response')
errors = body.get('errors')
@@ -142,7 +141,7 @@
if not isinstance(request, list):
fail_msg = 'Request property not a list: %s' % request
raise ProtocolParseException(fail_msg)
- return schema.RecordSchema.make_field_objects(request, names)
+ return schema.RecordSchema(None, None, request, names, 'request')
def _parse_response(self, response, names):
if isinstance(response, basestring) and names.has_key(response):
@@ -184,7 +183,7 @@
# TODO(hammer): allow schemas and fields to be JSON Encoded!
def __str__(self):
to_dump = {}
- to_dump['request'] = [json.loads(str(r)) for r in self.request]
+ to_dump['request'] = json.loads(str(self.request))
if self.response_from_names:
to_dump['response'] = self.response.fullname
else:
Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Tue Jan 12 00:50:29 2010
@@ -64,6 +64,7 @@
'array',
'map',
'union',
+ 'request',
)
RESERVED_PROPS = (
@@ -496,7 +497,10 @@
raise SchemaParseException(fail_msg)
# Call parent ctor (adds own name to namespace, too)
- NamedSchema.__init__(self, schema_type, name, namespace, names)
+ if schema_type == 'request':
+ Schema.__init__(self, schema_type)
+ else:
+ NamedSchema.__init__(self, schema_type, name, namespace, names)
# Add class members
field_objects = RecordSchema.make_field_objects(fields, names)
@@ -514,11 +518,11 @@
def __str__(self):
to_dump = self.props.copy()
- new_fields = []
- for field in to_dump['fields']:
- new_fields.append(json.loads(str(field)))
- to_dump['fields'] = new_fields
- return json.dumps(to_dump)
+ to_dump['fields'] = [json.loads(str(f)) for f in self.fields]
+ if self.type == 'request':
+ return json.dumps(to_dump['fields'])
+ else:
+ return json.dumps(to_dump)
def __eq__(self, that):
to_cmp = json.loads(str(self))
Modified: hadoop/avro/trunk/src/test/py/sample_ipc_server.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/sample_ipc_server.py?rev=898139&r1=898138&r2=898139&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/sample_ipc_server.py (original)
+++ hadoop/avro/trunk/src/test/py/sample_ipc_server.py Tue Jan 12 00:50:29 2010
@@ -55,7 +55,7 @@
def invoke(self, message, request):
if message.name == 'send':
- request_content = request[0]
+ request_content = request['message']
response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
request_content
return response
@@ -70,4 +70,5 @@
if __name__ == '__main__':
mail_server = TCPServer(SERVER_ADDRESS, MailHandler)
+ mail_server.allow_reuse_address = True
mail_server.serve_forever()