You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/04 23:13:39 UTC

svn commit: r1442398 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/data/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/ lang/java/trevni/avro/src/test/java/org/apach...

Author: cutting
Date: Mon Feb  4 22:13:38 2013
New Revision: 1442398

URL: http://svn.apache.org/viewvc?rev=1442398&view=rev
Log:
AVRO-1228. Java: Fix Trevni to use default values for missing Avro fields.

Added:
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Feb  4 22:13:38 2013
@@ -40,6 +40,9 @@ Trunk (not yet released)
 
     AVRO-1227. Java: Large ByteBuffers can corrupt output. (cutting)
 
+    AVRO-1228. Java: Fix Trevni to use default values for missing Avro fields.
+    (cutting)
+
 Avro 1.7.3 (6 December 2012)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java Mon Feb  4 22:13:38 2013
@@ -17,11 +17,8 @@
  */
 package org.apache.avro.data;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
@@ -29,26 +26,15 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
-import org.codehaus.jackson.JsonNode;
 
 /** Abstract base class for RecordBuilder implementations.  Not thread-safe. */
 public abstract class RecordBuilderBase<T extends IndexedRecord> 
   implements RecordBuilder<T> {
-  private static final ConcurrentMap<String, ConcurrentMap<Integer, Object>> 
-    DEFAULT_VALUE_CACHE = 
-      new ConcurrentHashMap<String, ConcurrentMap<Integer, Object>>();
   private static final Field[] EMPTY_FIELDS = new Field[0];
   private final Schema schema;
   private final Field[] fields;
   private final boolean[] fieldSetFlags;
   private final GenericData data;
-  private BinaryEncoder encoder = null;
-  private BinaryDecoder decoder = null;
   
   protected final Schema schema() { return schema; }
   protected final Field[] fields() { return fields; }
@@ -146,48 +132,7 @@ public abstract class RecordBuilderBase<
    */
   @SuppressWarnings({ "rawtypes", "unchecked" })
   protected Object defaultValue(Field field) throws IOException {    
-    JsonNode defaultJsonValue = field.defaultValue();
-    if (defaultJsonValue == null) {
-      throw new AvroRuntimeException("Field " + field + " not set and has no default value");
-    }
-    if (defaultJsonValue.isNull()
-        && (field.schema().getType() == Type.NULL
-            || (field.schema().getType() == Type.UNION
-                && field.schema().getTypes().get(0).getType() == Type.NULL))) {
-      return null;
-    }
-    
-    // Get the default value
-    Object defaultValue = null;
-    
-    // First try to get the default value from cache:
-    ConcurrentMap<Integer, Object> defaultSchemaValues = 
-      DEFAULT_VALUE_CACHE.get(schema.getFullName());
-    if (defaultSchemaValues == null) {
-      DEFAULT_VALUE_CACHE.putIfAbsent(schema.getFullName(), 
-          new ConcurrentHashMap<Integer, Object>(fields.length));
-      defaultSchemaValues = DEFAULT_VALUE_CACHE.get(schema.getFullName());
-    }
-    defaultValue = defaultSchemaValues.get(field.pos());
-    
-    // If not cached, get the default Java value by encoding the default JSON
-    // value and then decoding it:
-    if (defaultValue == null) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      encoder = EncoderFactory.get().binaryEncoder(baos, encoder);
-      ResolvingGrammarGenerator.encode(
-          encoder, field.schema(), defaultJsonValue);
-      encoder.flush();
-      decoder = DecoderFactory.get().binaryDecoder(
-          baos.toByteArray(), decoder);
-      defaultValue = data.createDatumReader(
-          field.schema()).read(null, decoder);
-      defaultSchemaValues.putIfAbsent(field.pos(), defaultValue);
-    }
-    
-    // Make a deep copy of the default value so that subsequent mutations 
-    // will not affect the default value cache:
-    return data.deepCopy(field.schema(), defaultValue);
+    return data.deepCopy(field.schema(), data.getDefaultValue(field));
   }
 
   @Override

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Mon Feb  4 22:13:38 2013
@@ -18,10 +18,14 @@
 package org.apache.avro.generic;
 
 import java.nio.ByteBuffer;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.AbstractList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.WeakHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -33,9 +37,16 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.UnresolvedUnionException;
 import org.apache.avro.io.BinaryData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
 import org.apache.avro.util.Utf8;
 
+import org.codehaus.jackson.JsonNode;
+
 /** Utilities for generic Java data. */
 public class GenericData {
 
@@ -802,6 +813,52 @@ public class GenericData {
     }
   }
 
+  private final Map<Field, Object> defaultValueCache
+    = Collections.synchronizedMap(new WeakHashMap<Field, Object>());
+
+  /**
+   * Gets the default value of the given field, if any.
+   * @param field the field whose default value should be retrieved.
+   * @return the default value associated with the given field, 
+   * or null if none is specified in the schema.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public Object getDefaultValue(Field field) {    
+    JsonNode json = field.defaultValue();
+    if (json == null)
+      throw new AvroRuntimeException("Field " + field
+                                     + " not set and has no default value");
+    if (json.isNull()
+        && (field.schema().getType() == Type.NULL
+            || (field.schema().getType() == Type.UNION
+                && field.schema().getTypes().get(0).getType() == Type.NULL))) {
+      return null;
+    }
+    
+    // Check the cache
+    Object defaultValue = defaultValueCache.get(field);
+    
+    // If not cached, get the default Java value by encoding the default JSON
+    // value and then decoding it:
+    if (defaultValue == null)
+      try {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
+        ResolvingGrammarGenerator.encode(encoder, field.schema(), json);
+        encoder.flush();
+        BinaryDecoder decoder =
+          DecoderFactory.get().binaryDecoder(baos.toByteArray(), null);
+        defaultValue =
+          createDatumReader(field.schema()).read(null, decoder);
+
+        defaultValueCache.put(field, defaultValue);
+      } catch (IOException e) {
+        throw new AvroRuntimeException(e);
+      }
+
+    return defaultValue;
+  }
+
   private static final Schema STRINGS = Schema.create(Type.STRING);
 
   /**

Modified: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java Mon Feb  4 22:13:38 2013
@@ -55,6 +55,9 @@ public class AvroColumnReader<D>
   private int[] arrayWidths;
   private int column;                          // current index in values
 
+  private Map<String,Map<String,Object>> defaults =
+    new HashMap<String,Map<String,Object>>();
+
   /** Parameters for reading an Avro column file. */
   public static class Params {
     Input input;
@@ -111,10 +114,59 @@ public class AvroColumnReader<D>
     int j = 0;
     for (ColumnMetaData c : readColumns) {
       Integer n = fileColumnNumbers.get(c.getName());
-      if (n == null)
-        throw new TrevniRuntimeException("No column named: "+c.getName());
-      values[j++] = reader.getValues(n);
+      if (n != null)
+        values[j++] = reader.getValues(n);
     }
+    findDefaults(readSchema, fileSchema);
+  }
+
+  // get defaults for fields in read that are not in write
+  private void findDefaults(Schema read, Schema write) {
+    switch (read.getType()) {
+    case NULL: case BOOLEAN:
+    case INT: case LONG:
+    case FLOAT: case DOUBLE: 
+    case BYTES: case STRING: 
+    case ENUM: case FIXED:
+      if (read.getType() != write.getType())
+        throw new TrevniRuntimeException("Type mismatch: "+read+" & "+write);
+      break;
+    case MAP: 
+      findDefaults(read.getValueType(), write.getValueType());
+      break;
+    case ARRAY: 
+      findDefaults(read.getElementType(), write.getElementType());
+      break;
+    case UNION:
+      for (Schema s : read.getTypes()) {
+        Integer index = write.getIndexNamed(s.getFullName());
+        if (index == null)
+          throw new TrevniRuntimeException("No matching branch: "+s);
+        findDefaults(s, write.getTypes().get(index));
+      }
+      break;
+    case RECORD: 
+      for (Field f : read.getFields()) {
+        Field g = write.getField(f.name());
+        if (g == null)
+          setDefault(read, f);
+        else
+          findDefaults(f.schema(), g.schema());
+      }
+      break;
+    default:
+      throw new TrevniRuntimeException("Unknown schema: "+read);
+    }
+  }
+
+  private void setDefault(Schema record, Field f) {
+    String recordName = record.getFullName();
+    Map<String,Object> recordDefaults = defaults.get(recordName);
+    if (recordDefaults == null) {
+      recordDefaults = new HashMap<String,Object>();
+      defaults.put(recordName, recordDefaults);
+    }
+    recordDefaults.put(f.name(), model.getDefaultValue(f));
   }
 
   @Override
@@ -132,7 +184,8 @@ public class AvroColumnReader<D>
   public D next() {
     try {
       for (int i = 0; i < values.length; i++)
-        values[i].startRow();
+        if (values[i] != null)
+          values[i].startRow();
       this.column = 0;
       return (D)read(readSchema);
     } catch (IOException e) {
@@ -160,8 +213,13 @@ public class AvroColumnReader<D>
       return map;
     case RECORD: 
       Object record = model.newRecord(null, s);
-      for (Field f : s.getFields())
-        model.setField(record, f.name(), f.pos(), read(f.schema()));
+      Map<String,Object> rDefaults = defaults.get(s.getFullName());
+      for (Field f : s.getFields()) {
+        Object value = ((rDefaults != null) && rDefaults.containsKey(f.name()))
+          ? model.deepCopy(f.schema(), rDefaults.get(f.name()))
+          : read(f.schema());
+        model.setField(record, f.name(), f.pos(), value);
+      }
       return record;
     case ARRAY: 
       int length = values[column].nextLength();

Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java Mon Feb  4 22:13:38 2013
@@ -36,6 +36,8 @@ import org.apache.trevni.TestUtil;
 
 /** Generates schema data as Java objects with random values. */
 public class RandomData implements Iterable<Object> {
+  public static final String USE_DEFAULT = "use-default";
+
   private final Schema root;
   private final int count;
 
@@ -62,8 +64,12 @@ public class RandomData implements Itera
     switch (schema.getType()) {
     case RECORD:
       GenericRecord record = new GenericData.Record(schema);
-      for (Schema.Field field : schema.getFields())
-        record.put(field.name(), generate(field.schema(), random, d+1));
+      for (Schema.Field field : schema.getFields()) {
+        Object value = (field.getJsonProp(USE_DEFAULT) == null) 
+          ? generate(field.schema(), random, d+1)
+          : GenericData.get().getDefaultValue(field);
+        record.put(field.name(), value);
+      }
       return record;
     case ENUM:
       List<String> symbols = schema.getEnumSymbols();

Added: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java?rev=1442398&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java Mon Feb  4 22:13:38 2013
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.trevni.avro;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.avro.AvroColumnReader.Params;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEvolvedSchema extends TestCase {
+  private static String writerSchema = "{"
+    + "    \"namespace\": \"org.apache.avro\","
+    + "    \"name\": \"test_evolution\"," + "    \"type\": \"record\","
+    + "    \"fields\": ["
+    + "        { \"name\": \"a\", \"type\":\"string\" },"
+    + "        { \"name\": \"b\", \"type\":\"int\" }"
+    + "     ]"
+    + "}";
+  private static String innerSchema = "{\"name\":\"c1\","
+    + "          \"type\":\"record\","
+    + "          \"fields\":[{\"name\":\"c11\", \"type\":\"int\", \"default\": 2},"
+    + "                      {\"name\":\"c12\", \"type\":\"string\", \"default\":\"goodbye\"}]}";
+  private static String evolvedSchema2 = "{"
+    + "    \"namespace\": \"org.apache.avro\","
+    + "    \"name\": \"test_evolution\"," + "    \"type\": \"record\","
+    + "    \"fields\": ["
+    + "        { \"name\": \"a\", \"type\":\"string\" },"
+    + "        { \"name\": \"b\", \"type\":\"int\" },"
+    + "        { \"name\": \"c\", \"type\":" + innerSchema + ","
+    + "          \"default\":{\"c11\": 1, \"c12\": \"hello\"}"
+    + "        }"
+    + "     ]"
+    + "}";
+
+  GenericData.Record writtenRecord;
+  GenericData.Record evolvedRecord;
+  GenericData.Record innerRecord;
+
+  private static final Schema writer = Schema.parse(writerSchema);
+  private static final Schema evolved = Schema.parse(evolvedSchema2);
+  private static final Schema inner = Schema.parse(innerSchema);
+
+  @Before
+    public void setUp() {
+    writtenRecord = new GenericData.Record(writer);
+    writtenRecord.put("a", "record");
+    writtenRecord.put("b", 21);
+
+    innerRecord = new GenericData.Record(inner);
+    innerRecord.put("c11", 1);
+    innerRecord.put("c12", "hello");
+
+    evolvedRecord = new GenericData.Record(evolved);
+    evolvedRecord.put("a", "record");
+    evolvedRecord.put("b", 21);
+    evolvedRecord.put("c", innerRecord);
+  }
+
+  @Test
+    public void testTrevniEvolvedRead() throws IOException {
+    AvroColumnWriter<GenericRecord> acw =
+      new AvroColumnWriter<GenericRecord>(writer, new ColumnFileMetaData());
+    acw.write(writtenRecord);
+    File serializedTrevni = File.createTempFile("trevni", null);
+    acw.writeTo(serializedTrevni);
+
+    AvroColumnReader.Params params = new Params(serializedTrevni);
+    params.setSchema(evolved);
+    AvroColumnReader<GenericRecord> acr =
+      new AvroColumnReader<GenericRecord>(params);
+    GenericRecord readRecord = acr.next();
+    assertEquals(evolvedRecord, readRecord);
+    assertFalse(acr.hasNext());
+  }
+
+  @Test
+    public void testAvroEvolvedRead() throws IOException {
+    File serializedAvro = File.createTempFile("avro", null);
+    DatumWriter<GenericRecord> dw =
+      new GenericDatumWriter<GenericRecord>(writer);
+    DataFileWriter<GenericRecord> dfw =
+      new DataFileWriter<GenericRecord>(dw);
+    dfw.create(writer, serializedAvro);
+    dfw.append(writtenRecord);
+    dfw.flush();
+    dfw.close();
+
+    GenericDatumReader<GenericRecord> reader =
+      new GenericDatumReader<GenericRecord>(writer);
+    reader.setExpected(evolved);
+    DataFileReader<GenericRecord> dfr =
+      new DataFileReader<GenericRecord>(serializedAvro, reader);
+    GenericRecord readRecord = dfr.next();
+    assertEquals(evolvedRecord, readRecord);
+    assertFalse(dfr.hasNext());
+  }
+
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java Mon Feb  4 22:13:38 2013
@@ -62,10 +62,13 @@ public class TestShredder {
           new ColumnMetaData("F", ValueType.BYTES));
   }
 
+  private static final String SIMPLE_FIELDS =
+    "{\"name\":\"x\",\"type\":\"int\"},"+
+    "{\"name\":\"y\",\"type\":\"string\"}";
+
   private static final String SIMPLE_RECORD =
     "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
-    +"{\"name\":\"x\",\"type\":\"int\"},"
-    +"{\"name\":\"y\",\"type\":\"string\"}"
+    +SIMPLE_FIELDS
     +"]}";
 
   @Test public void testSimpleRecord() throws Exception {
@@ -74,6 +77,17 @@ public class TestShredder {
           new ColumnMetaData("y", ValueType.STRING));
   }
 
+  @Test public void testDefaultValue() throws Exception {
+    String s = 
+      "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+      +SIMPLE_FIELDS+","
+      +"{\"name\":\"z\",\"type\":\"int\","
+      +"\"default\":1,\""+RandomData.USE_DEFAULT+"\":true}"
+      +"]}";
+    checkWrite(Schema.parse(SIMPLE_RECORD));
+    checkRead(Schema.parse(s));
+  }
+
   @Test public void testNestedRecord() throws Exception {
     String s = 
       "{\"type\":\"record\",\"name\":\"S\",\"fields\":["
@@ -252,7 +266,8 @@ public class TestShredder {
 
   private void checkRead(Schema schema) throws IOException {
     AvroColumnReader<Object> reader =
-      new AvroColumnReader<Object>(new AvroColumnReader.Params(FILE));
+      new AvroColumnReader<Object>(new AvroColumnReader.Params(FILE)
+                                   .setSchema(schema));
     for (Object expected : new RandomData(schema, COUNT))
       assertEquals(expected, reader.next());
     reader.close();