You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/05/26 19:38:46 UTC
svn commit: r778807 - in /hadoop/avro/trunk: ./ src/py/avro/ src/test/py/
Author: cutting
Date: Tue May 26 17:38:46 2009
New Revision: 778807
URL: http://svn.apache.org/viewvc?rev=778807&view=rev
Log:
AVRO-38. Add Python support for fixed-sized types. Contributed by sharad.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/py/avro/generic.py
hadoop/avro/trunk/src/py/avro/io.py
hadoop/avro/trunk/src/py/avro/protocol.py
hadoop/avro/trunk/src/py/avro/reflect.py
hadoop/avro/trunk/src/py/avro/schema.py
hadoop/avro/trunk/src/test/py/testio.py
hadoop/avro/trunk/src/test/py/testioreflect.py
hadoop/avro/trunk/src/test/py/testipc.py
hadoop/avro/trunk/src/test/py/testipcreflect.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue May 26 17:38:46 2009
@@ -23,6 +23,8 @@
AVRO-10. Add Java support for fixed-sized types. (cutting)
+ AVRO-38. Add Python support for fixed-sized types. (sharad)
+
IMPROVEMENTS
AVRO-11. Re-implement specific and reflect datum readers and
Modified: hadoop/avro/trunk/src/py/avro/generic.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/generic.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/generic.py (original)
+++ hadoop/avro/trunk/src/py/avro/generic.py Tue May 26 17:38:46 2009
@@ -79,6 +79,9 @@
io._LONG_MIN_VALUE <= object <= io._LONG_MAX_VALUE),
schema.ENUM : lambda schm, object:
schm.getenumsymbols().__contains__(object),
+ schema.FIXED : lambda schm, object:
+ (isinstance(object, str) and
+ len(object) == schm.getsize()),
schema.ARRAY : _validatearray,
schema.MAP : _validatemap,
schema.RECORD : _validaterecord,
@@ -106,6 +109,8 @@
schema.FLOAT : lambda schm, valuereader: valuereader.readfloat(),
schema.DOUBLE : lambda schm, valuereader: valuereader.readdouble(),
schema.BYTES : lambda schm, valuereader: valuereader.readbytes(),
+ schema.FIXED : lambda schm, valuereader:
+ (valuereader.read(schm.getsize())),
schema.ARRAY : self.readarray,
schema.MAP : self.readmap,
schema.RECORD : self.readrecord,
@@ -181,6 +186,8 @@
valuewriter.writedouble(datum),
schema.BYTES : lambda schm, datum, valuewriter:
valuewriter.writebytes(datum),
+ schema.FIXED : lambda schm, datum, valuewriter:
+ valuewriter.write(datum),
schema.ARRAY : self.writearray,
schema.MAP : self.writemap,
schema.RECORD : self.writerecord,
Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Tue May 26 17:38:46 2009
@@ -100,12 +100,14 @@
return _STRUCT_DOUBLE.unpack(_STRUCT_LONG.pack(bits))[0]
def readbytes(self):
- len = self.readlong()
- return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
+ return self.read(self.readlong())
def readutf8(self):
return unicode(self.readbytes(), "utf-8")
+ def read(self, len):
+ return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
+
class ValueWriter(object):
"""Write leaf values."""
@@ -168,6 +170,9 @@
datum = datum.encode("utf-8")
self.writebytes(datum)
+ def write(self, datum):
+ self.__writer.write(datum)
+
#Data file constants.
_VERSION = 0
_MAGIC = "Obj"+chr(_VERSION)
Modified: hadoop/avro/trunk/src/py/avro/protocol.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/protocol.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/protocol.py (original)
+++ hadoop/avro/trunk/src/py/avro/protocol.py Tue May 26 17:38:46 2009
@@ -97,8 +97,7 @@
count = 0
for type in self.__types.values():
typesCopy = self.__types
- if (isinstance(type, schema._RecordSchema) or
- isinstance(type, schema._EnumSchema)):
+ if isinstance(type, schema.NamedSchema):
typesCopy = self.__types.copy()
typesCopy.pop(type.getname(), None)
str.write(type.str(typesCopy)+"\n")
Modified: hadoop/avro/trunk/src/py/avro/reflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/reflect.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/reflect.py (original)
+++ hadoop/avro/trunk/src/py/avro/reflect.py Tue May 26 17:38:46 2009
@@ -71,6 +71,9 @@
io._LONG_MIN_VALUE <= object <= io._LONG_MAX_VALUE),
schema.ENUM : lambda schm, pkgname, object:
schm.getenumsymbols().__contains__(object),
+ schema.FIXED : lambda schm, pkgname, object:
+ (isinstance(object, str) and
+ len(object) == schm.getsize()),
schema.ARRAY : _validatearray,
schema.MAP : _validatemap,
schema.RECORD : _validaterecord,
Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Tue May 26 17:38:46 2009
@@ -21,6 +21,7 @@
An array of values, all of the same schema;
A map containing string/value pairs, each of a declared schema;
A union of other schemas;
+ A fixed sized binary object;
A unicode string;
A sequence of bytes;
A 32-bit signed int;
@@ -33,7 +34,7 @@
import simplejson, odict
#The schema types
-STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL, ARRAY, MAP, UNION, RECORD, ENUM = range(13)
+STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL, ARRAY, MAP, UNION, FIXED, RECORD, ENUM = range(14)
class Schema(object):
"""Base class for all Schema classes."""
@@ -108,19 +109,45 @@
def str(self, names):
return "\"null\""
-class _RecordSchema(Schema):
- def __init__(self, fields, name=None, namespace=None, iserror=False):
- Schema.__init__(self, RECORD)
+class NamedSchema(Schema):
+ def __init__(self, type, name, space):
+ Schema.__init__(self, type)
self.__name = name
- self.__namespace = namespace
- self.__fields = fields
- self.__iserror = iserror
+ self.__space = space
def getname(self):
return self.__name
- def getnamespace(self):
- return self.__namespace
+ def getspace(self):
+ return self.__space
+
+ def equalnames(self, other):
+ if other is None:
+ return False
+ if (self.__name == other.__name and self.__space == other.__space):
+ return True
+ return False
+
+ def namestring(self):
+ str = cStringIO.StringIO()
+ str.write("\"name\": \""+self.__name+"\", ")
+ if self.__space is not None:
+ str.write("\"namespace\": \""+self.__space+"\", ")
+ return str.getvalue()
+
+ def __hash__(self, seen=None):
+ hash = self.gettype().__hash__()
+ hash += self.__name.__hash__()
+ if self.__space is not None:
+ hash += self.__space.__hash__()
+ return hash
+
+
+class _RecordSchema(NamedSchema):
+ def __init__(self, fields, name=None, space=None, iserror=False):
+ NamedSchema.__init__(self, RECORD, name, space)
+ self.__fields = fields
+ self.__iserror = iserror
def getfields(self):
return self.__fields
@@ -129,10 +156,10 @@
return self.__iserror
def str(self, names):
- if names.get(self.__name) is self:
- return "\""+self.__name+"\""
- elif self.__name is not None:
- names[self.__name] = self
+ if names.get(self.getname()) is self:
+ return "\""+self.getname()+"\""
+ elif self.getname() is not None:
+ names[self.getname()] = self
str = cStringIO.StringIO()
str.write("{\"type\": \"")
if self.iserror():
@@ -140,10 +167,7 @@
else:
str.write("record")
str.write("\", ")
- if self.__name is not None:
- str.write("\"name\": \""+self.__name+"\", ")
- #if self.__namespace is not None:
- #str.write("\"namespace\": \""+self.__namespace+"\", ")
+ str.write(self.namestring())
str.write("\"fields\": [")
count=0
for k,v in self.__fields:
@@ -161,7 +185,7 @@
def __eq__(self, other, seen={}):
if self is other or seen.get(id(self)) is other:
return True
- if isinstance(other, _RecordSchema):
+ if isinstance(other, _RecordSchema) and self.equalnames(other):
size = len(self.__fields)
if len(other.__fields) != size:
return False
@@ -177,7 +201,7 @@
if seen.__contains__(id(self)):
return 0
seen.add(id(self))
- hash = self.gettype().__hash__()
+ hash = NamedSchema.__hash__(self, seen)
for field, fieldschm in self.__fields:
hash = hash + fieldschm.__hash__(seen)
return hash
@@ -283,11 +307,9 @@
hash = hash + elem.__hash__(seen)
return hash
-class _EnumSchema(Schema):
+class _EnumSchema(NamedSchema):
def __init__(self, name, space, symbols):
- Schema.__init__(self, ENUM)
- self.__name = name
- self.__space = space
+ NamedSchema.__init__(self, ENUM, name, space)
self.__symbols = symbols
self.__ordinals = dict()
i = 0
@@ -295,12 +317,6 @@
self.__ordinals[symbol] = i
i+=1
- def getname(self):
- return self.__name
-
- def getnamespace(self):
- return self.__namespace
-
def getenumsymbols(self):
return self.__symbols
@@ -308,14 +324,13 @@
return self.__ordinals.get(symbol)
def str(self, names):
- if names.get(self.__name) is self:
- return "\""+self.__name+"\""
- elif self.__name is not None:
- names[self.__name] = self
+ if names.get(self.getname()) is self:
+ return "\""+self.getname()+"\""
+ elif self.getname() is not None:
+ names[self.getname()] = self
str = cStringIO.StringIO()
str.write("{\"type\": \"enum\", ")
- if self.__name is not None:
- str.write("\"name\": \""+self.__name+"\", ")
+ str.write(self.namestring())
str.write("\"symbols\": [")
count = 0
for symbol in self.__symbols:
@@ -329,7 +344,7 @@
def __eq__(self, other, seen={}):
if self is other or seen.get(id(self)) is other:
return True
- if isinstance(other, _EnumSchema):
+ if isinstance(other, _EnumSchema) and self.equalnames(other):
size = len(self.__symbols)
if len(other.__symbols) != size:
return False
@@ -345,11 +360,41 @@
if seen.__contains__(id(self)):
return 0
seen.add(id(self))
- hash = self.gettype().__hash__()
+ hash = NamedSchema.__hash__(self, seen)
for symbol in self.__symbols:
hash += symbol.__hash__()
return hash
+class _FixedSchema(NamedSchema):
+ def __init__(self, name, space, size):
+ NamedSchema.__init__(self, FIXED, name, space)
+ self.__size = size
+
+ def getsize(self):
+ return self.__size
+
+ def str(self, names):
+ if names.get(self.getname()) is self:
+ return "\""+self.getname()+"\""
+ elif self.getname() is not None:
+ names[self.getname()] = self
+ str = cStringIO.StringIO()
+ str.write("{\"type\": \"fixed\", ")
+ str.write(self.namestring())
+ str.write("\"size\": "+repr(self.__size)+"}")
+ return str.getvalue()
+
+ def __eq__(self, other, seen=None):
+ if self is other:
+ return True
+ if (isinstance(other, _FixedSchema) and self.equalnames(other)
+ and self.__size == other.__size):
+ return True
+ return False
+
+ def __hash__(self, seen=None):
+ return NamedSchema.__hash__(self, seen) + self.__size.__hash__()
+
_PRIMITIVES = {'string':_StringSchema(),
'bytes':_BytesSchema(),
'int':_IntSchema(),
@@ -392,38 +437,42 @@
type = obj.get("type")
if type is None:
raise SchemaParseException("No type: "+obj.__str__())
- if type == "record" or type == "error":
+ if (type == "record" or type == "error" or
+ type == "enum" or type == "fixed"):
name = obj.get("name")
- namespace = obj.get("namespace")
- fields = list()
- schema = _RecordSchema(fields, name, namespace, type == "error")
- if name is not None:
+ space = obj.get("namespace")
+ if name is None:
+ raise SchemaParseException("No name in schema: "+obj.__str__())
+ if type == "record" or type == "error":
+ fields = list()
+ schema = _RecordSchema(fields, name, space, type == "error")
names[name] = schema
- fieldsnode = obj.get("fields")
- if fieldsnode is None:
- raise SchemaParseException("Record has no fields: "+obj.__str__())
- for field in fieldsnode:
- fieldname = field.get("name")
- if fieldname is None:
- raise SchemaParseException("No field name: "+field.__str__())
- fieldtype = field.get("type")
- if fieldtype is None:
- raise SchemaParseException("No field type: "+field.__str__())
- fields.append((fieldname, _parse(fieldtype, names)))
- return schema
- elif type == "enum":
- name = obj.get("name")
- namespace = obj.get("namespace")
- symbolsnode = obj.get("symbols")
- if symbolsnode == None or not isinstance(symbolsnode, list):
- raise SchemaParseException("Enum has no symbols: "+obj.__str__())
- symbols = list()
- for symbol in symbolsnode:
- symbols.append(symbol)
- schema = _EnumSchema(name, namespace, symbols)
- if name is not None:
+ fieldsnode = obj.get("fields")
+ if fieldsnode is None:
+ raise SchemaParseException("Record has no fields: "+obj.__str__())
+ for field in fieldsnode:
+ fieldname = field.get("name")
+ if fieldname is None:
+ raise SchemaParseException("No field name: "+field.__str__())
+ fieldtype = field.get("type")
+ if fieldtype is None:
+ raise SchemaParseException("No field type: "+field.__str__())
+ fields.append((fieldname, _parse(fieldtype, names)))
+ return schema
+ elif type == "enum":
+ symbolsnode = obj.get("symbols")
+ if symbolsnode == None or not isinstance(symbolsnode, list):
+ raise SchemaParseException("Enum has no symbols: "+obj.__str__())
+ symbols = list()
+ for symbol in symbolsnode:
+ symbols.append(symbol)
+ schema = _EnumSchema(name, space, symbols)
names[name] = schema
- return schema
+ return schema
+ elif type == "fixed":
+ schema = _FixedSchema(name, space, obj.get("size"))
+ names[name] = schema
+ return schema
elif type == "array":
return _ArraySchema(_parse(obj.get("items"), names))
elif type == "map":
Modified: hadoop/avro/trunk/src/test/py/testio.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testio.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testio.py (original)
+++ hadoop/avro/trunk/src/test/py/testio.py Tue May 26 17:38:46 2009
@@ -87,6 +87,11 @@
if len == 0:
return None
return symbols[random.randint(0,len)-1]
+ elif schm.gettype() == schema.FIXED:
+ string = cStringIO.StringIO()
+ for i in range(0, schm.getsize()):
+ string.write(struct.pack('c',random.sample('12345abcd', 1)[0]))
+ return string.getvalue()
class TestSchema(unittest.TestCase):
@@ -132,11 +137,13 @@
self.check("{\"type\":\"map\", \"values\": \"string\"}")
def testRecord(self):
- self.check("{\"type\":\"record\",\"fields\":[{\"name\":\"f\", \"type\":" +
+ self.check("{\"type\":\"record\", \"name\":\"Test\"," +
+ "\"fields\":[{\"name\":\"f\", \"type\":" +
"\"string\"}, {\"name\":\"fb\", \"type\":\"bytes\"}]}")
def testEnum(self):
- self.check("{\"type\": \"enum\", \"symbols\": [\"A\", \"B\"]}")
+ self.check("{\"type\": \"enum\", \"name\":\"Test\","+
+ "\"symbols\": [\"A\", \"B\"]}")
def testRecursive(self):
self.check("{\"type\": \"record\", \"name\": \"Node\", \"fields\": ["
@@ -157,6 +164,9 @@
+"{\"name\":\"car\", \"type\":\"string\"},"
+"{\"name\":\"cdr\", \"type\":\"string\"}]}]")
+ def testFixed(self):
+ self.check("{\"type\": \"fixed\", \"name\":\"Test\", \"size\": 1}")
+
def check(self, string):
schm = schema.parse(string)
st = schema.stringval(schm)
Modified: hadoop/avro/trunk/src/test/py/testioreflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testioreflect.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testioreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testioreflect.py Tue May 26 17:38:46 2009
@@ -49,12 +49,6 @@
class TestSchema(testio.TestSchema):
- # the schema MUST have name
- def testRecord(self):
- self.check(
- "{\"type\":\"record\",\"name\":\"TestRec\",\"fields\":[{\"name\":\"f\"," +
- "\"type\":\"string\"}, {\"name\":\"fb\", \"type\":\"bytes\"}]}")
-
def __init__(self, methodName):
testio.TestSchema.__init__(self, methodName, dyvalidator, ReflectDWriter,
ReflectDReader, DyRandomData, False)
Modified: hadoop/avro/trunk/src/test/py/testipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipc.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipc.py (original)
+++ hadoop/avro/trunk/src/test/py/testipc.py Tue May 26 17:38:46 2009
@@ -14,7 +14,7 @@
#See the License for the specific language governing permissions and
#limitations under the License.
-import unittest, socket
+import unittest, socket, struct
import testio
import avro.ipc as ipc
import avro.generic as generic
@@ -77,6 +77,7 @@
record = dict()
record['name'] = unicode('foo')
record['kind'] = 'BAR'
+ record['hash'] = struct.pack('16s','0123456789012345')
params = dict()
params['record'] = record
echoed = self.requestor.call('echo', params)
Modified: hadoop/avro/trunk/src/test/py/testipcreflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipcreflect.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipcreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testipcreflect.py Tue May 26 17:38:46 2009
@@ -14,7 +14,7 @@
#See the License for the specific language governing permissions and
#limitations under the License.
-import socket
+import socket, struct
import avro.schema as schema
import avro.reflect as reflect
import avro.ipc as ipc
@@ -59,6 +59,7 @@
record = TestRecord()
record.name = unicode('foo')
record.kind = 'BAR'
+ record.hash = struct.pack('16s','0123456789012345')
echoed = self.proxy.echo(record)
self.assertEquals(record.name, echoed.name)