You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/18 21:58:22 UTC
svn commit: r805556 - in /hadoop/avro/trunk: ./
src/java/org/apache/avro/file/ src/java/org/apache/avro/generic/
src/java/org/apache/avro/reflect/ src/test/java/org/apache/avro/
Author: cutting
Date: Tue Aug 18 19:58:22 2009
New Revision: 805556
URL: http://svn.apache.org/viewvc?rev=805556&view=rev
Log:
AVRO-95. Fix writing of reflect-based unions. Also extend DataFileWriter to permit adding branches to a union schema while writing.
Added:
hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Aug 18 19:58:22 2009
@@ -48,6 +48,10 @@
AVRO-90. Fix Java's JSON codec to correctly encode unions. (cutting)
+ AVRO-95. Fix writing of Java reflect-based unions. Also extend
+ DataFileWriter to permit adding branches to a union schema while
+ writing.
+
Avro 1.0.0 -- 9 July 2009
INCOMPATIBLE CHANGES
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Tue Aug 18 19:58:22 2009
@@ -27,8 +27,10 @@
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.BinaryEncoder;
@@ -110,6 +112,17 @@
setMeta(key, Long.toString(value));
}
+ /** If the schema for this file is a union, add a branch to it. */
+ public synchronized void addSchema(Schema branch) {
+ if (schema.getType() != Schema.Type.UNION)
+ throw new AvroRuntimeException("Not a union schema: "+schema);
+ List<Schema> types = schema.getTypes();
+ types.add(branch);
+ this.schema = Schema.createUnion(types);
+ this.dout.setSchema(schema);
+ setMeta("schema", schema.toString());
+ }
+
/** Append a datum to the file. */
public synchronized void append(D datum) throws IOException {
dout.write(datum, bufOut);
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java Tue Aug 18 19:58:22 2009
@@ -186,7 +186,7 @@
case RECORD:
if (!isRecord(datum)) return false;
return (schema.getName() == null) ||
- schema.getName().equals(((GenericRecord)datum).getSchema().getName());
+ schema.getName().equals(getRecordSchema(datum).getName());
case ENUM: return isEnum(datum);
case ARRAY: return isArray(datum);
case MAP: return isMap(datum);
@@ -203,6 +203,13 @@
}
}
+ /** Called to obtain the schema of a record. By default calls
+ * {GenericRecord#getSchema(). May be overridden for alternate record
+ * representations. */
+ protected Schema getRecordSchema(Object record) {
+ return ((GenericRecord)record).getSchema();
+ }
+
/** Called to write a fixed value. May be overridden for alternate fixed
* representations.*/
protected void writeFixed(Schema schema, Object datum, Encoder out)
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java Tue Aug 18 19:58:22 2009
@@ -58,10 +58,9 @@
return ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
}
- protected boolean instanceOf(Schema schema, Object datum) {
- return (schema.getType() == Type.RECORD)
- ? ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD
- : super.instanceOf(schema, datum);
+ @Override
+ protected Schema getRecordSchema(Object record) {
+ return ReflectData.getSchema(record.getClass());
}
}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java?rev=805556&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java Tue Aug 18 19:58:22 2009
@@ -0,0 +1,35 @@
+/**
+ *
+ */
+package org.apache.avro;
+
+import org.apache.avro.util.Utf8;
+
+public class BarRecord {
+ private Utf8 beerMsg;
+
+ public BarRecord() {
+ }
+
+ public BarRecord(String beerMsg) {
+ this.beerMsg = new Utf8(beerMsg);
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof BarRecord) {
+ return this.beerMsg.equals(((BarRecord) that).beerMsg);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return beerMsg.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return BarRecord.class.getSimpleName() + "{msg=" + beerMsg + "}";
+ }
+}
\ No newline at end of file
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java?rev=805556&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java Tue Aug 18 19:58:22 2009
@@ -0,0 +1,33 @@
+/**
+ *
+ */
+package org.apache.avro;
+
+public class FooRecord {
+ private int fooCount;
+
+ public FooRecord() {
+ }
+
+ public FooRecord(int fooCount) {
+ this.fooCount = fooCount;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof FooRecord) {
+ return this.fooCount == ((FooRecord) that).fooCount;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return fooCount;
+ }
+
+ @Override
+ public String toString() {
+ return FooRecord.class.getSimpleName() + "{count=" + fooCount + "}";
+ }
+}
\ No newline at end of file
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=805556&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java Tue Aug 18 19:58:22 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDataFileReflect {
+
+ private static final File DIR = new File(System.getProperty("test.dir",
+ "/tmp"));
+ private static final File FILE = new File(DIR, "test.avro");
+
+ /*
+ * Test that using multiple schemas in a file works doing a union before
+ * writing any records.
+ */
+ @Test
+ public void testMultiReflectWithUnionBeforeWriting() throws IOException {
+ FileOutputStream fos = new FileOutputStream(FILE);
+
+ List<Schema> schemas = Arrays.asList(new Schema[] {
+ ReflectData.getSchema(FooRecord.class),
+ ReflectData.getSchema(BarRecord.class) });
+ Schema union = Schema.createUnion(schemas);
+ DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
+ new ReflectDatumWriter(union));
+
+ // test writing to a file
+ CheckList check = new CheckList();
+ write(writer, new BarRecord("One beer please"), check);
+ write(writer, new FooRecord(10), check);
+ write(writer, new BarRecord("Two beers please"), check);
+ write(writer, new FooRecord(20), check);
+ writer.close();
+
+ ReflectDatumReader din = new ReflectDatumReader("org.apache.avro.");
+ SeekableFileInput sin = new SeekableFileInput(FILE);
+ DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
+ Object datum = null;
+ long count = reader.getMetaLong("count");
+ for (int i = 0; i < count; i++) {
+ datum = reader.next(datum);
+ check.assertEquals(datum, i);
+ }
+ reader.close();
+ }
+
+ /*
+ * Test that using multiple schemas in a file works doing a union for new
+ * types as they come.
+ */
+ @Test
+ public void testMultiReflectWithUntionAfterWriting() throws IOException {
+ FileOutputStream fos = new FileOutputStream(FILE);
+
+ List<Schema> schemas = new ArrayList<Schema>();
+ schemas.add(ReflectData.getSchema(FooRecord.class));
+ Schema union = Schema.createUnion(schemas);
+ DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
+ new ReflectDatumWriter(union));
+
+ CheckList check = new CheckList();
+ // write known type
+ write(writer, new FooRecord(10), check);
+ write(writer, new FooRecord(15), check);
+
+ // we have a new type, add it to the file
+ writer.addSchema(ReflectData.getSchema(BarRecord.class));
+
+ // test writing those new types to a file
+ write(writer, new BarRecord("One beer please"), check);
+ write(writer, new BarRecord("Two beers please"), check);
+
+ // does foo record still work?
+ write(writer, new FooRecord(20), check);
+
+ // get one more bar in, just for laughs
+ write(writer, new BarRecord("Many beers please"), check);
+
+ writer.close();
+
+ ReflectDatumReader din = new ReflectDatumReader("org.apache.avro.");
+ SeekableFileInput sin = new SeekableFileInput(FILE);
+ DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
+ Object datum = null;
+ long count = reader.getMetaLong("count");
+ for (int i = 0; i < count; i++) {
+ datum = reader.next(datum);
+ check.assertEquals(datum, i);
+ }
+ reader.close();
+ }
+
+ private void write(DataFileWriter<Object> writer, Object o, CheckList l)
+ throws IOException {
+ writer.append(l.addAndReturn(o));
+ }
+
+ @SuppressWarnings("serial")
+ private static class CheckList extends ArrayList<Object> {
+ Object addAndReturn(Object check) {
+ add(check);
+ return check;
+ }
+
+ void assertEquals(Object toCheck, int i) {
+ Assert.assertNotNull(toCheck);
+ Object o = get(i);
+ Assert.assertNotNull(o);
+ Assert.assertEquals(toCheck, o);
+ }
+ }
+}