You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/18 23:03:23 UTC

svn commit: r805575 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/io/ src/py/avro/ src/test/java/org/apache/avro/ src/test/py/

Author: cutting
Date: Tue Aug 18 21:03:22 2009
New Revision: 805575

URL: http://svn.apache.org/viewvc?rev=805575&view=rev
Log:
AVRO-61.  Add Python support for reading blocked data.  Contributed by Ravi Gummadi.

Added:
    hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/build.xml
    hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java
    hadoop/avro/trunk/src/py/avro/genericio.py
    hadoop/avro/trunk/src/py/avro/io.py
    hadoop/avro/trunk/src/test/py/interoptests.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Aug 18 21:03:22 2009
@@ -55,6 +55,9 @@
     AVRO-88. Fix Java's BlockingBinaryEncoder to correctly override
     writeEnum().  (Ravi Gummadi via cutting)
 
+    AVRO-61. Add Python support for reading blocked data.
+    (Ravi Gummadi via cutting)
+
 Avro 1.0.0 -- 9 July 2009
 
   INCOMPATIBLE CHANGES

Modified: hadoop/avro/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Tue Aug 18 21:03:22 2009
@@ -306,12 +306,19 @@
 
   <target name="generate-test-data" depends="compile-test-java, init-py">
     <mkdir dir="${test.java.build.dir}/data-files"/>
+  	 <mkdir dir="${test.java.build.dir}/blocking-data-files"/>
     <java classname="org.apache.avro.RandomData"
       classpathref="test.java.classpath">
       <arg value="${basedir}/src/test/schemata/interop.avsc"/>
       <arg value="${test.java.build.dir}/data-files/test.java.avro"/>
       <arg value="${test.count}"/>
     </java>
+  	 <java classname="org.apache.avro.GenerateBlockingData"
+  	      classpathref="test.java.classpath">
+  	      <arg value="${basedir}/src/test/schemata/interop.avsc"/>
+  	      <arg value="${test.java.build.dir}/blocking-data-files/test.java.blocking.avro"/>
+  	      <arg value="${test.count}"/>
+  	 </java>
 
     <taskdef name="py-run" classname="org.pyant.tasks.PythonRunTask">
       <classpath refid="java.classpath" />

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java Tue Aug 18 21:03:22 2009
@@ -33,7 +33,7 @@
  *  the reading of leaf values (for example, {@link #readLong} and
  *  {@link #readString}).
  *
- *  The other type of methods support the writing of maps and arrays.
+ *  The other type of methods support the reading of maps and arrays.
  *  These methods are {@link #readArrayStart}, {@link #arrayNext},
  *  and similar methods for maps).  See {@link #readArrayStart} for
  *  details on these methods.)

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java Tue Aug 18 21:03:22 2009
@@ -30,7 +30,7 @@
  *  the reading of leaf values (for example, {@link #readLong} and
  *  {@link #readString}).
  *
- *  The other type of methods support the writing of maps and arrays.
+ *  The other type of methods support the reading of maps and arrays.
  *  These methods are {@link #readArrayStart}, {@link #arrayNext},
  *  and similar methods for maps).  See {@link #readArrayStart} for
  *  details on these methods.)

Modified: hadoop/avro/trunk/src/py/avro/genericio.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/genericio.py?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/genericio.py (original)
+++ hadoop/avro/trunk/src/py/avro/genericio.py Tue Aug 18 21:03:22 2009
@@ -174,21 +174,28 @@
       self.__raisematchException(actual, expected)
     result = dict()
     size = decoder.readlong()
-    if size != 0:
+    while size != 0:
+      if size < 0:
+        size = -size
+        decoder.readlong() #ignore bytecount if this is a blocking map
       for i in range(0, size):
         key = decoder.readutf8()
         result[key] = self.readdata(actual.getvaluetype(), 
                                     expected.getvaluetype(), decoder)
-      decoder.readlong()
+      size = decoder.readlong()
     return result
 
   def skipmap(self, schm, decoder):
     size = decoder.readlong()
-    if size != 0:
-      for i in range(0, size):
-        decoder.skiputf8()
-        self.skipdata(schm.getvaluetype(), decoder)
-      decoder.skiplong()
+    while size != 0:
+      if size < 0:
+        decoder.readlong() #bytecount of block if this is a blocking map
+        decoder.skip(bytecount)
+      else:
+        for i in range(0, size):
+          decoder.skiputf8()
+          self.skipdata(schm.getvaluetype(), decoder)
+      size = decoder.readlong()
 
   def readarray(self, actual, expected, decoder):
     if (actual.getelementtype().gettype() != 
@@ -196,19 +203,26 @@
       self.__raisematchException(actual, expected)
     result = list()
     size = decoder.readlong()
-    if size != 0:
+    while size != 0:
+      if size < 0:
+        size = -size
+        bytecount = decoder.readlong() #ignore bytecount if this is a blocking array
       for i in range(0, size):
         result.append(self.readdata(actual.getelementtype(), 
                                     expected.getelementtype(), decoder))
-      decoder.readlong()
+      size = decoder.readlong()
     return result
 
   def skiparray(self, schm, decoder):
     size = decoder.readlong()
-    if size != 0:
-      for i in range(0, size):
-        self.skipdata(schm.getelementtype(), decoder)
-      decoder.skiplong()
+    while size != 0:
+      if size < 0:
+        decoder.readlong() #bytecount of block if this is a blocking array
+        decoder.skip(bytecount)
+      else:
+        for i in range(0, size):
+          self.skipdata(schm.getelementtype(), decoder)
+      size = decoder.readlong()
 
   def createrecord(self, schm):
     return dict()

Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Tue Aug 18 21:03:22 2009
@@ -304,6 +304,9 @@
             int(ord(self.__reader.read(1))))
     seekpos = self.__reader.seek(self.__length-footersize)
     metalength = self.__decoder.readlong()
+    if metalength < 0:
+      metalength = -metalength
+      self.__decoder.readlong() #ignore byteCount if this is a blocking map
     self.__meta = dict()
     for i in range(0, metalength):
       key = self.__decoder.readutf8()

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java?rev=805575&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java Tue Aug 18 21:03:22 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BlockingBinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Generates file with objects of a specific schema(that doesn't contain nesting
+ * of arrays and maps) with random data. This is only for testing.
+ * Generated file contains the count of objects of the specified schema followed
+ * by objects serialized using BlockingBinaryEncoder. No other metadata is
+ * written to the file. See interoptests.py for more details(interoptests.py
+ * reads the file generated here and validates the contents).
+ */
+public class GenerateBlockingData {
+  private static final int SYNC_INTERVAL = 1000;
+  private static ByteArrayOutputStream buffer =
+                      new ByteArrayOutputStream(2*SYNC_INTERVAL);
+  
+  private static Encoder bufOut = new BlockingBinaryEncoder(buffer);
+  private static int blockCount;
+
+  private static void writeBlock(Encoder vout, FileOutputStream out)
+               throws IOException{
+    vout.writeLong(blockCount);
+    bufOut.flush();
+    buffer.writeTo(out);
+    buffer.reset();
+    blockCount = 0;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    if(args.length != 3) {
+      System.out.println(
+          "Usage: GenerateBlockingData <schemafile> <outputfile> <count>");
+      System.exit(-1);
+    }
+    
+    Schema sch = Schema.parse(new File(args[0]));
+    File FILE = new File(args[1]);
+    int numObjects = Integer.parseInt(args[2]);
+    
+    FileOutputStream out = new FileOutputStream(FILE, false);
+    DatumWriter<Object> dout = new GenericDatumWriter<Object>();
+    dout.setSchema(sch);
+    Encoder vout = new BinaryEncoder(out);
+    vout.writeLong(numObjects); // metadata:the count of objects in the file
+    
+    for (Object datum : new RandomData(sch, numObjects)) {
+      dout.write(datum, bufOut);
+      blockCount++;
+      if (buffer.size() >= SYNC_INTERVAL) {
+        writeBlock(vout, out);
+      }
+    }
+    if (blockCount > 0) {
+      writeBlock(vout, out);
+    }
+    out.flush();
+    out.close();
+  }
+}

Modified: hadoop/avro/trunk/src/test/py/interoptests.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (original)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Tue Aug 18 21:03:22 2009
@@ -23,6 +23,7 @@
 import avro.reflectipc as reflectipc
 import testio, testipc, testioreflect, testipcreflect
 
+_BLOCKINGFILE_DIR = "build/test/blocking-data-files/"
 _DATAFILE_DIR = "build/test/data-files/"
 _SERVER_PORTS_DIR = testio._DIR + "server-ports/"
 
@@ -46,6 +47,21 @@
       for i in range(0,count):
         datum = dr.next()
         self.assertTrue(self.__validator(origschm, datum))
+    # validate reading of blocking arrays, blocking maps
+    for file in os.listdir(_BLOCKINGFILE_DIR):
+      print "Validating:", file.__str__()
+      reader = open(_BLOCKINGFILE_DIR+file, "rb")
+      decoder = io.Decoder(reader)
+      dreader = self.__datumreader()
+      dreader.setschema(origschm)
+      count = int(decoder.readlong()) #metadata:the count of objects in the file
+      blockcount = decoder.readlong()
+      for i in range(0,count):
+        while blockcount == 0:
+          blockcount = decoder.readlong()
+        blockcount -= 1
+        datum = dreader.read(decoder)
+        self.assertTrue(self.__validator(origschm, datum))
 
 class TestReflectGeneratedFiles(TestGeneratedFiles):