You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/18 21:58:22 UTC

svn commit: r805556 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/file/ src/java/org/apache/avro/generic/ src/java/org/apache/avro/reflect/ src/test/java/org/apache/avro/

Author: cutting
Date: Tue Aug 18 19:58:22 2009
New Revision: 805556

URL: http://svn.apache.org/viewvc?rev=805556&view=rev
Log:
AVRO-95.  Fix writing of reflect-based unions.  Also extend DataFileWriter to permit adding branches to a union schema while writing.

Added:
    hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Aug 18 19:58:22 2009
@@ -48,6 +48,10 @@
 
     AVRO-90. Fix Java's JSON codec to correctly encode unions. (cutting)
 
+    AVRO-95. Fix writing of Java reflect-based unions.  Also extend
+    DataFileWriter to permit adding branches to a union schema while
+    writing.
+
 Avro 1.0.0 -- 9 July 2009
 
   INCOMPATIBLE CHANGES

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Tue Aug 18 19:58:22 2009
@@ -27,8 +27,10 @@
 import java.security.MessageDigest;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 
 import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.BinaryEncoder;
@@ -110,6 +112,17 @@
       setMeta(key, Long.toString(value));
     }
 
+  /** If the schema for this file is a union, add a branch to it. */
+  public synchronized void addSchema(Schema branch) {
+    if (schema.getType() != Schema.Type.UNION)
+      throw new AvroRuntimeException("Not a union schema: "+schema);
+    List<Schema> types = schema.getTypes();
+    types.add(branch);
+    this.schema = Schema.createUnion(types);
+    this.dout.setSchema(schema);
+    setMeta("schema", schema.toString());
+  }
+
   /** Append a datum to the file. */
   public synchronized void append(D datum) throws IOException {
       dout.write(datum, bufOut);

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java Tue Aug 18 19:58:22 2009
@@ -186,7 +186,7 @@
     case RECORD:
       if (!isRecord(datum)) return false;
       return (schema.getName() == null) ||
-        schema.getName().equals(((GenericRecord)datum).getSchema().getName());
+        schema.getName().equals(getRecordSchema(datum).getName());
     case ENUM:    return isEnum(datum);
     case ARRAY:   return isArray(datum);
     case MAP:     return isMap(datum);
@@ -203,6 +203,13 @@
     }
   }
 
+  /** Called to obtain the schema of a record.  By default calls
+   * {GenericRecord#getSchema().  May be overridden for alternate record
+   * representations. */
+  protected Schema getRecordSchema(Object record) {
+    return ((GenericRecord)record).getSchema();
+  }
+
   /** Called to write a fixed value.  May be overridden for alternate fixed
    * representations.*/
   protected void writeFixed(Schema schema, Object datum, Encoder out)

Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=805556&r1=805555&r2=805556&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java Tue Aug 18 19:58:22 2009
@@ -58,10 +58,9 @@
     return ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
   }
 
-  protected boolean instanceOf(Schema schema, Object datum) {
-    return (schema.getType() == Type.RECORD)
-      ? ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD
-      : super.instanceOf(schema, datum);
+  @Override
+  protected Schema getRecordSchema(Object record) {
+    return ReflectData.getSchema(record.getClass());
   }
 
 }

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java?rev=805556&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/BarRecord.java Tue Aug 18 19:58:22 2009
@@ -0,0 +1,35 @@
+/**
+ * 
+ */
+package org.apache.avro;
+
+import org.apache.avro.util.Utf8;
+
+public class BarRecord {
+  private Utf8 beerMsg;
+
+  public BarRecord() {
+  }
+
+  public BarRecord(String beerMsg) {
+    this.beerMsg = new Utf8(beerMsg);
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof BarRecord) {
+      return this.beerMsg.equals(((BarRecord) that).beerMsg);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return beerMsg.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return BarRecord.class.getSimpleName() + "{msg=" + beerMsg + "}";
+  }
+}
\ No newline at end of file

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java?rev=805556&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/FooRecord.java Tue Aug 18 19:58:22 2009
@@ -0,0 +1,33 @@
+/**
+ * 
+ */
+package org.apache.avro;
+
+public class FooRecord {
+  private int fooCount;
+
+  public FooRecord() {
+  }
+
+  public FooRecord(int fooCount) {
+    this.fooCount = fooCount;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof FooRecord) {
+      return this.fooCount == ((FooRecord) that).fooCount;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return fooCount;
+  }
+
+  @Override
+  public String toString() {
+    return FooRecord.class.getSimpleName() + "{count=" + fooCount + "}";
+  }
+}
\ No newline at end of file

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=805556&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java Tue Aug 18 19:58:22 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDataFileReflect {
+
+  private static final File DIR = new File(System.getProperty("test.dir",
+      "/tmp"));
+  private static final File FILE = new File(DIR, "test.avro");
+
+  /*
+   * Test that using multiple schemas in a file works doing a union before
+   * writing any records.
+   */
+  @Test
+  public void testMultiReflectWithUnionBeforeWriting() throws IOException {
+    FileOutputStream fos = new FileOutputStream(FILE);
+
+    List<Schema> schemas = Arrays.asList(new Schema[] {
+        ReflectData.getSchema(FooRecord.class),
+        ReflectData.getSchema(BarRecord.class) });
+    Schema union = Schema.createUnion(schemas);
+    DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
+        new ReflectDatumWriter(union));
+
+    // test writing to a file
+    CheckList check = new CheckList();
+    write(writer, new BarRecord("One beer please"), check);
+    write(writer, new FooRecord(10), check);
+    write(writer, new BarRecord("Two beers please"), check);
+    write(writer, new FooRecord(20), check);
+    writer.close();
+
+    ReflectDatumReader din = new ReflectDatumReader("org.apache.avro.");
+    SeekableFileInput sin = new SeekableFileInput(FILE);
+    DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
+    Object datum = null;
+    long count = reader.getMetaLong("count");
+    for (int i = 0; i < count; i++) {
+      datum = reader.next(datum);
+      check.assertEquals(datum, i);
+    }
+    reader.close();
+  }
+
+  /*
+   * Test that using multiple schemas in a file works doing a union for new
+   * types as they come.
+   */
+  @Test
+  public void testMultiReflectWithUntionAfterWriting() throws IOException {
+    FileOutputStream fos = new FileOutputStream(FILE);
+
+    List<Schema> schemas = new ArrayList<Schema>();
+    schemas.add(ReflectData.getSchema(FooRecord.class));
+    Schema union = Schema.createUnion(schemas);
+    DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
+        new ReflectDatumWriter(union));
+
+    CheckList check = new CheckList();
+    // write known type
+    write(writer, new FooRecord(10), check);
+    write(writer, new FooRecord(15), check);
+
+    // we have a new type, add it to the file
+    writer.addSchema(ReflectData.getSchema(BarRecord.class));
+
+    // test writing those new types to a file
+    write(writer, new BarRecord("One beer please"), check);
+    write(writer, new BarRecord("Two beers please"), check);
+
+    // does foo record still work?
+    write(writer, new FooRecord(20), check);
+
+    // get one more bar in, just for laughs
+    write(writer, new BarRecord("Many beers please"), check);
+
+    writer.close();
+
+    ReflectDatumReader din = new ReflectDatumReader("org.apache.avro.");
+    SeekableFileInput sin = new SeekableFileInput(FILE);
+    DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
+    Object datum = null;
+    long count = reader.getMetaLong("count");
+    for (int i = 0; i < count; i++) {
+      datum = reader.next(datum);
+      check.assertEquals(datum, i);
+    }
+    reader.close();
+  }
+
+  private void write(DataFileWriter<Object> writer, Object o, CheckList l)
+      throws IOException {
+    writer.append(l.addAndReturn(o));
+  }
+
+  @SuppressWarnings("serial")
+  private static class CheckList extends ArrayList<Object> {
+    Object addAndReturn(Object check) {
+      add(check);
+      return check;
+    }
+
+    void assertEquals(Object toCheck, int i) {
+      Assert.assertNotNull(toCheck);
+      Object o = get(i);
+      Assert.assertNotNull(o);
+      Assert.assertEquals(toCheck, o);
+    }
+  }
+}