You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/18 23:03:23 UTC
svn commit: r805575 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/io/
src/py/avro/ src/test/java/org/apache/avro/ src/test/py/
Author: cutting
Date: Tue Aug 18 21:03:22 2009
New Revision: 805575
URL: http://svn.apache.org/viewvc?rev=805575&view=rev
Log:
AVRO-61. Add Python support for reading blocked data. Contributed by Ravi Gummadi.
Added:
hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/build.xml
hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java
hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java
hadoop/avro/trunk/src/py/avro/genericio.py
hadoop/avro/trunk/src/py/avro/io.py
hadoop/avro/trunk/src/test/py/interoptests.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Aug 18 21:03:22 2009
@@ -55,6 +55,9 @@
AVRO-88. Fix Java's BlockingBinaryEncoder to correctly override
writeEnum(). (Ravi Gummadi via cutting)
+ AVRO-61. Add Python support for reading blocked data.
+ (Ravi Gummadi via cutting)
+
Avro 1.0.0 -- 9 July 2009
INCOMPATIBLE CHANGES
Modified: hadoop/avro/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Tue Aug 18 21:03:22 2009
@@ -306,12 +306,19 @@
<target name="generate-test-data" depends="compile-test-java, init-py">
<mkdir dir="${test.java.build.dir}/data-files"/>
+ <mkdir dir="${test.java.build.dir}/blocking-data-files"/>
<java classname="org.apache.avro.RandomData"
classpathref="test.java.classpath">
<arg value="${basedir}/src/test/schemata/interop.avsc"/>
<arg value="${test.java.build.dir}/data-files/test.java.avro"/>
<arg value="${test.count}"/>
</java>
+ <java classname="org.apache.avro.GenerateBlockingData"
+ classpathref="test.java.classpath">
+ <arg value="${basedir}/src/test/schemata/interop.avsc"/>
+ <arg value="${test.java.build.dir}/blocking-data-files/test.java.blocking.avro"/>
+ <arg value="${test.count}"/>
+ </java>
<taskdef name="py-run" classname="org.pyant.tasks.PythonRunTask">
<classpath refid="java.classpath" />
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java Tue Aug 18 21:03:22 2009
@@ -33,7 +33,7 @@
* the reading of leaf values (for example, {@link #readLong} and
* {@link #readString}).
*
- * The other type of methods support the writing of maps and arrays.
+ * The other type of methods support the reading of maps and arrays.
* These methods are {@link #readArrayStart}, {@link #arrayNext},
* and similar methods for maps). See {@link #readArrayStart} for
* details on these methods.)
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java Tue Aug 18 21:03:22 2009
@@ -30,7 +30,7 @@
* the reading of leaf values (for example, {@link #readLong} and
* {@link #readString}).
*
- * The other type of methods support the writing of maps and arrays.
+ * The other type of methods support the reading of maps and arrays.
* These methods are {@link #readArrayStart}, {@link #arrayNext},
* and similar methods for maps). See {@link #readArrayStart} for
* details on these methods.)
Modified: hadoop/avro/trunk/src/py/avro/genericio.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/genericio.py?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/genericio.py (original)
+++ hadoop/avro/trunk/src/py/avro/genericio.py Tue Aug 18 21:03:22 2009
@@ -174,21 +174,28 @@
self.__raisematchException(actual, expected)
result = dict()
size = decoder.readlong()
- if size != 0:
+ while size != 0:
+ if size < 0:
+ size = -size
+ decoder.readlong() #ignore bytecount if this is a blocking map
for i in range(0, size):
key = decoder.readutf8()
result[key] = self.readdata(actual.getvaluetype(),
expected.getvaluetype(), decoder)
- decoder.readlong()
+ size = decoder.readlong()
return result
def skipmap(self, schm, decoder):
size = decoder.readlong()
- if size != 0:
- for i in range(0, size):
- decoder.skiputf8()
- self.skipdata(schm.getvaluetype(), decoder)
- decoder.skiplong()
+ while size != 0:
+ if size < 0:
+ decoder.readlong() #bytecount of block if this is a blocking map
+ decoder.skip(bytecount)
+ else:
+ for i in range(0, size):
+ decoder.skiputf8()
+ self.skipdata(schm.getvaluetype(), decoder)
+ size = decoder.readlong()
def readarray(self, actual, expected, decoder):
if (actual.getelementtype().gettype() !=
@@ -196,19 +203,26 @@
self.__raisematchException(actual, expected)
result = list()
size = decoder.readlong()
- if size != 0:
+ while size != 0:
+ if size < 0:
+ size = -size
+ bytecount = decoder.readlong() #ignore bytecount if this is a blocking array
for i in range(0, size):
result.append(self.readdata(actual.getelementtype(),
expected.getelementtype(), decoder))
- decoder.readlong()
+ size = decoder.readlong()
return result
def skiparray(self, schm, decoder):
size = decoder.readlong()
- if size != 0:
- for i in range(0, size):
- self.skipdata(schm.getelementtype(), decoder)
- decoder.skiplong()
+ while size != 0:
+ if size < 0:
+ decoder.readlong() #bytecount of block if this is a blocking array
+ decoder.skip(bytecount)
+ else:
+ for i in range(0, size):
+ self.skipdata(schm.getelementtype(), decoder)
+ size = decoder.readlong()
def createrecord(self, schm):
return dict()
Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Tue Aug 18 21:03:22 2009
@@ -304,6 +304,9 @@
int(ord(self.__reader.read(1))))
seekpos = self.__reader.seek(self.__length-footersize)
metalength = self.__decoder.readlong()
+ if metalength < 0:
+ metalength = -metalength
+ self.__decoder.readlong() #ignore byteCount if this is a blocking map
self.__meta = dict()
for i in range(0, metalength):
key = self.__decoder.readutf8()
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java?rev=805575&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/GenerateBlockingData.java Tue Aug 18 21:03:22 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BlockingBinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Generates file with objects of a specific schema(that doesn't contain nesting
+ * of arrays and maps) with random data. This is only for testing.
+ * Generated file contains the count of objects of the specified schema followed
+ * by objects serialized using BlockingBinaryEncoder. No other metadata is
+ * written to the file. See interoptests.py for more details(interoptests.py
+ * reads the file generated here and validates the contents).
+ */
+public class GenerateBlockingData {
+ private static final int SYNC_INTERVAL = 1000;
+ private static ByteArrayOutputStream buffer =
+ new ByteArrayOutputStream(2*SYNC_INTERVAL);
+
+ private static Encoder bufOut = new BlockingBinaryEncoder(buffer);
+ private static int blockCount;
+
+ private static void writeBlock(Encoder vout, FileOutputStream out)
+ throws IOException{
+ vout.writeLong(blockCount);
+ bufOut.flush();
+ buffer.writeTo(out);
+ buffer.reset();
+ blockCount = 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ if(args.length != 3) {
+ System.out.println(
+ "Usage: GenerateBlockingData <schemafile> <outputfile> <count>");
+ System.exit(-1);
+ }
+
+ Schema sch = Schema.parse(new File(args[0]));
+ File FILE = new File(args[1]);
+ int numObjects = Integer.parseInt(args[2]);
+
+ FileOutputStream out = new FileOutputStream(FILE, false);
+ DatumWriter<Object> dout = new GenericDatumWriter<Object>();
+ dout.setSchema(sch);
+ Encoder vout = new BinaryEncoder(out);
+ vout.writeLong(numObjects); // metadata:the count of objects in the file
+
+ for (Object datum : new RandomData(sch, numObjects)) {
+ dout.write(datum, bufOut);
+ blockCount++;
+ if (buffer.size() >= SYNC_INTERVAL) {
+ writeBlock(vout, out);
+ }
+ }
+ if (blockCount > 0) {
+ writeBlock(vout, out);
+ }
+ out.flush();
+ out.close();
+ }
+}
Modified: hadoop/avro/trunk/src/test/py/interoptests.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=805575&r1=805574&r2=805575&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (original)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Tue Aug 18 21:03:22 2009
@@ -23,6 +23,7 @@
import avro.reflectipc as reflectipc
import testio, testipc, testioreflect, testipcreflect
+_BLOCKINGFILE_DIR = "build/test/blocking-data-files/"
_DATAFILE_DIR = "build/test/data-files/"
_SERVER_PORTS_DIR = testio._DIR + "server-ports/"
@@ -46,6 +47,21 @@
for i in range(0,count):
datum = dr.next()
self.assertTrue(self.__validator(origschm, datum))
+ # validate reading of blocking arrays, blocking maps
+ for file in os.listdir(_BLOCKINGFILE_DIR):
+ print "Validating:", file.__str__()
+ reader = open(_BLOCKINGFILE_DIR+file, "rb")
+ decoder = io.Decoder(reader)
+ dreader = self.__datumreader()
+ dreader.setschema(origschm)
+ count = int(decoder.readlong()) #metadata:the count of objects in the file
+ blockcount = decoder.readlong()
+ for i in range(0,count):
+ while blockcount == 0:
+ blockcount = decoder.readlong()
+ blockcount -= 1
+ datum = dreader.read(decoder)
+ self.assertTrue(self.__validator(origschm, datum))
class TestReflectGeneratedFiles(TestGeneratedFiles):