You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/05/26 19:38:46 UTC

svn commit: r778807 - in /hadoop/avro/trunk: ./ src/py/avro/ src/test/py/

Author: cutting
Date: Tue May 26 17:38:46 2009
New Revision: 778807

URL: http://svn.apache.org/viewvc?rev=778807&view=rev
Log:
AVRO-38.  Add Python support for fixed-sized types.  Contributed by sharad.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/py/avro/generic.py
    hadoop/avro/trunk/src/py/avro/io.py
    hadoop/avro/trunk/src/py/avro/protocol.py
    hadoop/avro/trunk/src/py/avro/reflect.py
    hadoop/avro/trunk/src/py/avro/schema.py
    hadoop/avro/trunk/src/test/py/testio.py
    hadoop/avro/trunk/src/test/py/testioreflect.py
    hadoop/avro/trunk/src/test/py/testipc.py
    hadoop/avro/trunk/src/test/py/testipcreflect.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue May 26 17:38:46 2009
@@ -23,6 +23,8 @@
 
     AVRO-10.  Add Java support for fixed-sized types. (cutting)
 
+    AVRO-38.  Add Python support for fixed-sized types. (sharad)
+
   IMPROVEMENTS
 
     AVRO-11.  Re-implement specific and reflect datum readers and

Modified: hadoop/avro/trunk/src/py/avro/generic.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/generic.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/generic.py (original)
+++ hadoop/avro/trunk/src/py/avro/generic.py Tue May 26 17:38:46 2009
@@ -79,6 +79,9 @@
                             io._LONG_MIN_VALUE <= object <= io._LONG_MAX_VALUE),
      schema.ENUM : lambda schm, object:
                                 schm.getenumsymbols().__contains__(object),
+     schema.FIXED : lambda schm, object:
+                                (isinstance(object, str) and 
+                                 len(object) == schm.getsize()),
      schema.ARRAY : _validatearray,
      schema.MAP : _validatemap,
      schema.RECORD : _validaterecord,
@@ -106,6 +109,8 @@
      schema.FLOAT : lambda schm, valuereader: valuereader.readfloat(),
      schema.DOUBLE : lambda schm, valuereader: valuereader.readdouble(),
      schema.BYTES : lambda schm, valuereader: valuereader.readbytes(),
+     schema.FIXED : lambda schm, valuereader: 
+                            (valuereader.read(schm.getsize())),
      schema.ARRAY : self.readarray,
      schema.MAP : self.readmap,
      schema.RECORD : self.readrecord,
@@ -181,6 +186,8 @@
                   valuewriter.writedouble(datum),
      schema.BYTES : lambda schm, datum, valuewriter: 
                   valuewriter.writebytes(datum),
+     schema.FIXED : lambda schm, datum, valuewriter: 
+                  valuewriter.write(datum),
      schema.ARRAY : self.writearray,
      schema.MAP : self.writemap,
      schema.RECORD : self.writerecord,

Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Tue May 26 17:38:46 2009
@@ -100,12 +100,14 @@
     return _STRUCT_DOUBLE.unpack(_STRUCT_LONG.pack(bits))[0]
 
   def readbytes(self):
-    len = self.readlong()
-    return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
+    return self.read(self.readlong())
 
   def readutf8(self):
     return unicode(self.readbytes(), "utf-8")
 
+  def read(self, len):
+    return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
+
 class ValueWriter(object):
   """Write leaf values."""
 
@@ -168,6 +170,9 @@
     datum = datum.encode("utf-8")
     self.writebytes(datum)
 
+  def write(self, datum):
+    self.__writer.write(datum)
+
 #Data file constants.
 _VERSION = 0
 _MAGIC = "Obj"+chr(_VERSION)

Modified: hadoop/avro/trunk/src/py/avro/protocol.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/protocol.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/protocol.py (original)
+++ hadoop/avro/trunk/src/py/avro/protocol.py Tue May 26 17:38:46 2009
@@ -97,8 +97,7 @@
     count = 0
     for type in self.__types.values():
       typesCopy = self.__types
-      if (isinstance(type, schema._RecordSchema) or 
-          isinstance(type, schema._EnumSchema)):
+      if isinstance(type, schema.NamedSchema):
         typesCopy = self.__types.copy()
         typesCopy.pop(type.getname(), None)
       str.write(type.str(typesCopy)+"\n")

Modified: hadoop/avro/trunk/src/py/avro/reflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/reflect.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/reflect.py (original)
+++ hadoop/avro/trunk/src/py/avro/reflect.py Tue May 26 17:38:46 2009
@@ -71,6 +71,9 @@
                             io._LONG_MIN_VALUE <= object <= io._LONG_MAX_VALUE),
      schema.ENUM : lambda schm, pkgname, object:
                                 schm.getenumsymbols().__contains__(object),
+     schema.FIXED : lambda schm, pkgname, object:
+                                (isinstance(object, str) and 
+                                 len(object) == schm.getsize()),
      schema.ARRAY : _validatearray,
      schema.MAP : _validatemap,
      schema.RECORD : _validaterecord,

Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Tue May 26 17:38:46 2009
@@ -21,6 +21,7 @@
   An array of values, all of the same schema;
   A map containing string/value pairs, each of a declared schema;
   A union of other schemas;
+  A fixed sized binary object;
   A unicode string;
   A sequence of bytes;
   A 32-bit signed int;
@@ -33,7 +34,7 @@
 import simplejson, odict
 
 #The schema types
-STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL, ARRAY, MAP, UNION, RECORD, ENUM = range(13)
+STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL, ARRAY, MAP, UNION, FIXED, RECORD, ENUM = range(14)
 
 class Schema(object):
   """Base class for all Schema classes."""
@@ -108,19 +109,45 @@
   def str(self, names):
     return "\"null\""
 
-class _RecordSchema(Schema):
-  def __init__(self, fields, name=None, namespace=None, iserror=False):
-    Schema.__init__(self, RECORD)
+class NamedSchema(Schema):
+  def __init__(self, type, name, space):
+    Schema.__init__(self, type)
     self.__name = name
-    self.__namespace = namespace
-    self.__fields = fields
-    self.__iserror = iserror
+    self.__space = space
 
   def getname(self):
     return self.__name
 
-  def getnamespace(self):
-    return self.__namespace
+  def getspace(self):
+    return self.__space
+
+  def equalnames(self, other):
+    if other is None:
+      return False
+    if (self.__name == other.__name and self.__space == other.__space):
+      return True
+    return False
+
+  def namestring(self):
+    str = cStringIO.StringIO()
+    str.write("\"name\": \""+self.__name+"\", ")
+    if self.__space is not None:
+      str.write("\"namespace\": \""+self.__space+"\", ")
+    return str.getvalue()
+
+  def __hash__(self, seen=None):
+    hash = self.gettype().__hash__()
+    hash += self.__name.__hash__()
+    if self.__space is not None:
+      hash += self.__space.__hash__()
+    return hash
+
+
+class _RecordSchema(NamedSchema):
+  def __init__(self, fields, name=None, space=None, iserror=False):
+    NamedSchema.__init__(self, RECORD, name, space)
+    self.__fields = fields
+    self.__iserror = iserror
 
   def getfields(self):
     return self.__fields
@@ -129,10 +156,10 @@
     return self.__iserror
 
   def str(self, names):
-    if names.get(self.__name) is self:
-      return "\""+self.__name+"\""
-    elif self.__name is not None:
-      names[self.__name] = self
+    if names.get(self.getname()) is self:
+      return "\""+self.getname()+"\""
+    elif self.getname() is not None:
+      names[self.getname()] = self
     str = cStringIO.StringIO()
     str.write("{\"type\": \"")
     if self.iserror():
@@ -140,10 +167,7 @@
     else:
       str.write("record")
     str.write("\", ")
-    if self.__name is not None:
-      str.write("\"name\": \""+self.__name+"\", ")
-    #if self.__namespace is not None:
-      #str.write("\"namespace\": \""+self.__namespace+"\", ")
+    str.write(self.namestring())
     str.write("\"fields\": [")
     count=0
     for k,v in self.__fields:
@@ -161,7 +185,7 @@
   def __eq__(self, other, seen={}):
     if self is other or seen.get(id(self)) is other:
       return True
-    if isinstance(other, _RecordSchema):
+    if isinstance(other, _RecordSchema) and self.equalnames(other):
       size = len(self.__fields)
       if len(other.__fields) != size:
         return False
@@ -177,7 +201,7 @@
     if seen.__contains__(id(self)):
       return 0
     seen.add(id(self))
-    hash = self.gettype().__hash__() 
+    hash = NamedSchema.__hash__(self, seen)
     for field, fieldschm in self.__fields:
       hash = hash + fieldschm.__hash__(seen)
     return hash
@@ -283,11 +307,9 @@
       hash = hash + elem.__hash__(seen)
     return hash
 
-class _EnumSchema(Schema):
+class _EnumSchema(NamedSchema):
   def __init__(self, name, space, symbols):
-    Schema.__init__(self, ENUM)
-    self.__name = name
-    self.__space = space
+    NamedSchema.__init__(self, ENUM, name, space)
     self.__symbols = symbols
     self.__ordinals = dict()
     i = 0
@@ -295,12 +317,6 @@
       self.__ordinals[symbol] = i
       i+=1
 
-  def getname(self):
-    return self.__name
-
-  def getnamespace(self):
-    return self.__namespace
-
   def getenumsymbols(self):
     return self.__symbols
 
@@ -308,14 +324,13 @@
     return self.__ordinals.get(symbol)
 
   def str(self, names):
-    if names.get(self.__name) is self:
-      return "\""+self.__name+"\""
-    elif self.__name is not None:
-      names[self.__name] = self
+    if names.get(self.getname()) is self:
+      return "\""+self.getname()+"\""
+    elif self.getname() is not None:
+      names[self.getname()] = self
     str = cStringIO.StringIO()
     str.write("{\"type\": \"enum\", ")
-    if self.__name is not None:
-      str.write("\"name\": \""+self.__name+"\", ")
+    str.write(self.namestring())
     str.write("\"symbols\": [")
     count = 0
     for symbol in self.__symbols:
@@ -329,7 +344,7 @@
   def __eq__(self, other, seen={}):
     if self is other or seen.get(id(self)) is other:
       return True
-    if isinstance(other, _EnumSchema):
+    if isinstance(other, _EnumSchema) and self.equalnames(other):
       size = len(self.__symbols)
       if len(other.__symbols) != size:
         return False
@@ -345,11 +360,41 @@
     if seen.__contains__(id(self)):
       return 0
     seen.add(id(self))
-    hash = self.gettype().__hash__()
+    hash = NamedSchema.__hash__(self, seen)
     for symbol in self.__symbols:
       hash += symbol.__hash__()
     return hash
 
+class _FixedSchema(NamedSchema):
+  def __init__(self, name, space, size):
+    NamedSchema.__init__(self, FIXED, name, space)
+    self.__size = size
+
+  def getsize(self):
+    return self.__size
+
+  def str(self, names):
+    if names.get(self.getname()) is self:
+      return "\""+self.getname()+"\""
+    elif self.getname() is not None:
+      names[self.getname()] = self
+    str = cStringIO.StringIO()
+    str.write("{\"type\": \"fixed\", ")
+    str.write(self.namestring())
+    str.write("\"size\": "+repr(self.__size)+"}")
+    return str.getvalue()
+
+  def __eq__(self, other, seen=None):
+    if self is other:
+      return True
+    if (isinstance(other, _FixedSchema) and self.equalnames(other) 
+        and self.__size == other.__size):
+      return True
+    return False
+
+  def __hash__(self, seen=None):
+    return NamedSchema.__hash__(self, seen) + self.__size.__hash__()
+
 _PRIMITIVES = {'string':_StringSchema(),
         'bytes':_BytesSchema(),
         'int':_IntSchema(),
@@ -392,38 +437,42 @@
     type = obj.get("type")
     if type is None:
       raise SchemaParseException("No type: "+obj.__str__())
-    if type == "record" or type == "error":
+    if (type == "record" or type == "error" or 
+        type == "enum" or type == "fixed"):
       name = obj.get("name")
-      namespace = obj.get("namespace")
-      fields = list()
-      schema = _RecordSchema(fields, name, namespace, type == "error")
-      if name is not None:
+      space = obj.get("namespace")
+      if name is None:
+        raise SchemaParseException("No name in schema: "+obj.__str__())
+      if type == "record" or type == "error":
+        fields = list()
+        schema = _RecordSchema(fields, name, space, type == "error")
         names[name] = schema
-      fieldsnode = obj.get("fields")
-      if fieldsnode is None:
-        raise SchemaParseException("Record has no fields: "+obj.__str__())
-      for field in fieldsnode:
-        fieldname = field.get("name")
-        if fieldname is None:
-          raise SchemaParseException("No field name: "+field.__str__())
-        fieldtype = field.get("type")
-        if fieldtype is None:
-          raise SchemaParseException("No field type: "+field.__str__())
-        fields.append((fieldname, _parse(fieldtype, names)))
-      return schema
-    elif type == "enum":
-      name = obj.get("name")
-      namespace = obj.get("namespace")
-      symbolsnode = obj.get("symbols")
-      if symbolsnode == None or not isinstance(symbolsnode, list):
-        raise SchemaParseException("Enum has no symbols: "+obj.__str__())
-      symbols = list()
-      for symbol in symbolsnode:
-        symbols.append(symbol)
-      schema = _EnumSchema(name, namespace, symbols)
-      if name is not None:
+        fieldsnode = obj.get("fields")
+        if fieldsnode is None:
+          raise SchemaParseException("Record has no fields: "+obj.__str__())
+        for field in fieldsnode:
+          fieldname = field.get("name")
+          if fieldname is None:
+            raise SchemaParseException("No field name: "+field.__str__())
+          fieldtype = field.get("type")
+          if fieldtype is None:
+            raise SchemaParseException("No field type: "+field.__str__())
+          fields.append((fieldname, _parse(fieldtype, names)))
+        return schema
+      elif type == "enum":
+        symbolsnode = obj.get("symbols")
+        if symbolsnode == None or not isinstance(symbolsnode, list):
+          raise SchemaParseException("Enum has no symbols: "+obj.__str__())
+        symbols = list()
+        for symbol in symbolsnode:
+          symbols.append(symbol)
+        schema = _EnumSchema(name, space, symbols)
         names[name] = schema
-      return schema
+        return schema
+      elif type == "fixed":
+        schema = _FixedSchema(name, space, obj.get("size"))
+        names[name] = schema
+        return schema
     elif type == "array":
       return _ArraySchema(_parse(obj.get("items"), names))
     elif type == "map":

Modified: hadoop/avro/trunk/src/test/py/testio.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testio.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testio.py (original)
+++ hadoop/avro/trunk/src/test/py/testio.py Tue May 26 17:38:46 2009
@@ -87,6 +87,11 @@
       if len == 0:
         return None
       return symbols[random.randint(0,len)-1]
+    elif schm.gettype() == schema.FIXED:
+      string = cStringIO.StringIO()
+      for i in range(0, schm.getsize()):
+        string.write(struct.pack('c',random.sample('12345abcd', 1)[0]))
+      return string.getvalue()
 
 class TestSchema(unittest.TestCase):
 
@@ -132,11 +137,13 @@
     self.check("{\"type\":\"map\", \"values\": \"string\"}")
 
   def testRecord(self):
-    self.check("{\"type\":\"record\",\"fields\":[{\"name\":\"f\", \"type\":" +
+    self.check("{\"type\":\"record\", \"name\":\"Test\"," +
+               "\"fields\":[{\"name\":\"f\", \"type\":" +
                "\"string\"}, {\"name\":\"fb\", \"type\":\"bytes\"}]}")
 
   def testEnum(self):
-    self.check("{\"type\": \"enum\", \"symbols\": [\"A\", \"B\"]}")
+    self.check("{\"type\": \"enum\", \"name\":\"Test\","+
+               "\"symbols\": [\"A\", \"B\"]}")
 
   def testRecursive(self):
     self.check("{\"type\": \"record\", \"name\": \"Node\", \"fields\": ["
@@ -157,6 +164,9 @@
       +"{\"name\":\"car\", \"type\":\"string\"}," 
       +"{\"name\":\"cdr\", \"type\":\"string\"}]}]")
 
+  def testFixed(self):
+    self.check("{\"type\": \"fixed\", \"name\":\"Test\", \"size\": 1}") 
+
   def check(self, string):
     schm = schema.parse(string)
     st = schema.stringval(schm)

Modified: hadoop/avro/trunk/src/test/py/testioreflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testioreflect.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testioreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testioreflect.py Tue May 26 17:38:46 2009
@@ -49,12 +49,6 @@
 
 class TestSchema(testio.TestSchema):
 
-  # the schema MUST have name
-  def testRecord(self):
-    self.check(
-    "{\"type\":\"record\",\"name\":\"TestRec\",\"fields\":[{\"name\":\"f\"," +
-       "\"type\":\"string\"}, {\"name\":\"fb\", \"type\":\"bytes\"}]}")
-
   def __init__(self, methodName):
     testio.TestSchema.__init__(self, methodName, dyvalidator, ReflectDWriter,
                                ReflectDReader, DyRandomData, False)

Modified: hadoop/avro/trunk/src/test/py/testipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipc.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipc.py (original)
+++ hadoop/avro/trunk/src/test/py/testipc.py Tue May 26 17:38:46 2009
@@ -14,7 +14,7 @@
 #See the License for the specific language governing permissions and
 #limitations under the License.
 
-import unittest, socket
+import unittest, socket, struct
 import testio
 import avro.ipc as ipc
 import avro.generic as generic
@@ -77,6 +77,7 @@
     record = dict()
     record['name'] = unicode('foo')
     record['kind'] = 'BAR'
+    record['hash'] = struct.pack('16s','0123456789012345')
     params = dict()
     params['record'] = record
     echoed = self.requestor.call('echo', params)

Modified: hadoop/avro/trunk/src/test/py/testipcreflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipcreflect.py?rev=778807&r1=778806&r2=778807&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipcreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testipcreflect.py Tue May 26 17:38:46 2009
@@ -14,7 +14,7 @@
 #See the License for the specific language governing permissions and
 #limitations under the License.
 
-import socket
+import socket, struct
 import avro.schema as schema
 import avro.reflect as reflect
 import avro.ipc as ipc
@@ -59,6 +59,7 @@
     record = TestRecord()
     record.name = unicode('foo')
     record.kind = 'BAR'
+    record.hash = struct.pack('16s','0123456789012345')
     echoed = self.proxy.echo(record)
     self.assertEquals(record.name, echoed.name)