You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/04 23:13:39 UTC
svn commit: r1442398 - in /avro/trunk: ./
lang/java/avro/src/main/java/org/apache/avro/data/
lang/java/avro/src/main/java/org/apache/avro/generic/
lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/
lang/java/trevni/avro/src/test/java/org/apach...
Author: cutting
Date: Mon Feb 4 22:13:38 2013
New Revision: 1442398
URL: http://svn.apache.org/viewvc?rev=1442398&view=rev
Log:
AVRO-1228. Java: Fix Trevni to use default values for missing Avro fields.
Added:
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Feb 4 22:13:38 2013
@@ -40,6 +40,9 @@ Trunk (not yet released)
AVRO-1227. Java: Large ByteBuffers can corrupt output. (cutting)
+ AVRO-1228. Java: Fix Trevni to use default values for missing Avro fields.
+ (cutting)
+
Avro 1.7.3 (6 December 2012)
NEW FEATURES
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/data/RecordBuilderBase.java Mon Feb 4 22:13:38 2013
@@ -17,11 +17,8 @@
*/
package org.apache.avro.data;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
@@ -29,26 +26,15 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
-import org.codehaus.jackson.JsonNode;
/** Abstract base class for RecordBuilder implementations. Not thread-safe. */
public abstract class RecordBuilderBase<T extends IndexedRecord>
implements RecordBuilder<T> {
- private static final ConcurrentMap<String, ConcurrentMap<Integer, Object>>
- DEFAULT_VALUE_CACHE =
- new ConcurrentHashMap<String, ConcurrentMap<Integer, Object>>();
private static final Field[] EMPTY_FIELDS = new Field[0];
private final Schema schema;
private final Field[] fields;
private final boolean[] fieldSetFlags;
private final GenericData data;
- private BinaryEncoder encoder = null;
- private BinaryDecoder decoder = null;
protected final Schema schema() { return schema; }
protected final Field[] fields() { return fields; }
@@ -146,48 +132,7 @@ public abstract class RecordBuilderBase<
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
protected Object defaultValue(Field field) throws IOException {
- JsonNode defaultJsonValue = field.defaultValue();
- if (defaultJsonValue == null) {
- throw new AvroRuntimeException("Field " + field + " not set and has no default value");
- }
- if (defaultJsonValue.isNull()
- && (field.schema().getType() == Type.NULL
- || (field.schema().getType() == Type.UNION
- && field.schema().getTypes().get(0).getType() == Type.NULL))) {
- return null;
- }
-
- // Get the default value
- Object defaultValue = null;
-
- // First try to get the default value from cache:
- ConcurrentMap<Integer, Object> defaultSchemaValues =
- DEFAULT_VALUE_CACHE.get(schema.getFullName());
- if (defaultSchemaValues == null) {
- DEFAULT_VALUE_CACHE.putIfAbsent(schema.getFullName(),
- new ConcurrentHashMap<Integer, Object>(fields.length));
- defaultSchemaValues = DEFAULT_VALUE_CACHE.get(schema.getFullName());
- }
- defaultValue = defaultSchemaValues.get(field.pos());
-
- // If not cached, get the default Java value by encoding the default JSON
- // value and then decoding it:
- if (defaultValue == null) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- encoder = EncoderFactory.get().binaryEncoder(baos, encoder);
- ResolvingGrammarGenerator.encode(
- encoder, field.schema(), defaultJsonValue);
- encoder.flush();
- decoder = DecoderFactory.get().binaryDecoder(
- baos.toByteArray(), decoder);
- defaultValue = data.createDatumReader(
- field.schema()).read(null, decoder);
- defaultSchemaValues.putIfAbsent(field.pos(), defaultValue);
- }
-
- // Make a deep copy of the default value so that subsequent mutations
- // will not affect the default value cache:
- return data.deepCopy(field.schema(), defaultValue);
+ return data.deepCopy(field.schema(), data.getDefaultValue(field));
}
@Override
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Mon Feb 4 22:13:38 2013
@@ -18,10 +18,14 @@
package org.apache.avro.generic;
import java.nio.ByteBuffer;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.WeakHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -33,9 +37,16 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.io.BinaryData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
import org.apache.avro.util.Utf8;
+import org.codehaus.jackson.JsonNode;
+
/** Utilities for generic Java data. */
public class GenericData {
@@ -802,6 +813,52 @@ public class GenericData {
}
}
+ private final Map<Field, Object> defaultValueCache
+ = Collections.synchronizedMap(new WeakHashMap<Field, Object>());
+
+ /**
+ * Gets the default value of the given field, if any.
+ * @param field the field whose default value should be retrieved.
+ * @return the default value associated with the given field,
+ * or null if none is specified in the schema.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Object getDefaultValue(Field field) {
+ JsonNode json = field.defaultValue();
+ if (json == null)
+ throw new AvroRuntimeException("Field " + field
+ + " not set and has no default value");
+ if (json.isNull()
+ && (field.schema().getType() == Type.NULL
+ || (field.schema().getType() == Type.UNION
+ && field.schema().getTypes().get(0).getType() == Type.NULL))) {
+ return null;
+ }
+
+ // Check the cache
+ Object defaultValue = defaultValueCache.get(field);
+
+ // If not cached, get the default Java value by encoding the default JSON
+ // value and then decoding it:
+ if (defaultValue == null)
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
+ ResolvingGrammarGenerator.encode(encoder, field.schema(), json);
+ encoder.flush();
+ BinaryDecoder decoder =
+ DecoderFactory.get().binaryDecoder(baos.toByteArray(), null);
+ defaultValue =
+ createDatumReader(field.schema()).read(null, decoder);
+
+ defaultValueCache.put(field, defaultValue);
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
+
+ return defaultValue;
+ }
+
private static final Schema STRINGS = Schema.create(Type.STRING);
/**
Modified: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/AvroColumnReader.java Mon Feb 4 22:13:38 2013
@@ -55,6 +55,9 @@ public class AvroColumnReader<D>
private int[] arrayWidths;
private int column; // current index in values
+ private Map<String,Map<String,Object>> defaults =
+ new HashMap<String,Map<String,Object>>();
+
/** Parameters for reading an Avro column file. */
public static class Params {
Input input;
@@ -111,10 +114,59 @@ public class AvroColumnReader<D>
int j = 0;
for (ColumnMetaData c : readColumns) {
Integer n = fileColumnNumbers.get(c.getName());
- if (n == null)
- throw new TrevniRuntimeException("No column named: "+c.getName());
- values[j++] = reader.getValues(n);
+ if (n != null)
+ values[j++] = reader.getValues(n);
}
+ findDefaults(readSchema, fileSchema);
+ }
+
+ // get defaults for fields in read that are not in write
+ private void findDefaults(Schema read, Schema write) {
+ switch (read.getType()) {
+ case NULL: case BOOLEAN:
+ case INT: case LONG:
+ case FLOAT: case DOUBLE:
+ case BYTES: case STRING:
+ case ENUM: case FIXED:
+ if (read.getType() != write.getType())
+ throw new TrevniRuntimeException("Type mismatch: "+read+" & "+write);
+ break;
+ case MAP:
+ findDefaults(read.getValueType(), write.getValueType());
+ break;
+ case ARRAY:
+ findDefaults(read.getElementType(), write.getElementType());
+ break;
+ case UNION:
+ for (Schema s : read.getTypes()) {
+ Integer index = write.getIndexNamed(s.getFullName());
+ if (index == null)
+ throw new TrevniRuntimeException("No matching branch: "+s);
+ findDefaults(s, write.getTypes().get(index));
+ }
+ break;
+ case RECORD:
+ for (Field f : read.getFields()) {
+ Field g = write.getField(f.name());
+ if (g == null)
+ setDefault(read, f);
+ else
+ findDefaults(f.schema(), g.schema());
+ }
+ break;
+ default:
+ throw new TrevniRuntimeException("Unknown schema: "+read);
+ }
+ }
+
+ private void setDefault(Schema record, Field f) {
+ String recordName = record.getFullName();
+ Map<String,Object> recordDefaults = defaults.get(recordName);
+ if (recordDefaults == null) {
+ recordDefaults = new HashMap<String,Object>();
+ defaults.put(recordName, recordDefaults);
+ }
+ recordDefaults.put(f.name(), model.getDefaultValue(f));
}
@Override
@@ -132,7 +184,8 @@ public class AvroColumnReader<D>
public D next() {
try {
for (int i = 0; i < values.length; i++)
- values[i].startRow();
+ if (values[i] != null)
+ values[i].startRow();
this.column = 0;
return (D)read(readSchema);
} catch (IOException e) {
@@ -160,8 +213,13 @@ public class AvroColumnReader<D>
return map;
case RECORD:
Object record = model.newRecord(null, s);
- for (Field f : s.getFields())
- model.setField(record, f.name(), f.pos(), read(f.schema()));
+ Map<String,Object> rDefaults = defaults.get(s.getFullName());
+ for (Field f : s.getFields()) {
+ Object value = ((rDefaults != null) && rDefaults.containsKey(f.name()))
+ ? model.deepCopy(f.schema(), rDefaults.get(f.name()))
+ : read(f.schema());
+ model.setField(record, f.name(), f.pos(), value);
+ }
return record;
case ARRAY:
int length = values[column].nextLength();
Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/RandomData.java Mon Feb 4 22:13:38 2013
@@ -36,6 +36,8 @@ import org.apache.trevni.TestUtil;
/** Generates schema data as Java objects with random values. */
public class RandomData implements Iterable<Object> {
+ public static final String USE_DEFAULT = "use-default";
+
private final Schema root;
private final int count;
@@ -62,8 +64,12 @@ public class RandomData implements Itera
switch (schema.getType()) {
case RECORD:
GenericRecord record = new GenericData.Record(schema);
- for (Schema.Field field : schema.getFields())
- record.put(field.name(), generate(field.schema(), random, d+1));
+ for (Schema.Field field : schema.getFields()) {
+ Object value = (field.getJsonProp(USE_DEFAULT) == null)
+ ? generate(field.schema(), random, d+1)
+ : GenericData.get().getDefaultValue(field);
+ record.put(field.name(), value);
+ }
return record;
case ENUM:
List<String> symbols = schema.getEnumSymbols();
Added: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java?rev=1442398&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java Mon Feb 4 22:13:38 2013
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.trevni.avro;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.avro.AvroColumnReader.Params;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEvolvedSchema extends TestCase {
+ private static String writerSchema = "{"
+ + " \"namespace\": \"org.apache.avro\","
+ + " \"name\": \"test_evolution\"," + " \"type\": \"record\","
+ + " \"fields\": ["
+ + " { \"name\": \"a\", \"type\":\"string\" },"
+ + " { \"name\": \"b\", \"type\":\"int\" }"
+ + " ]"
+ + "}";
+ private static String innerSchema = "{\"name\":\"c1\","
+ + " \"type\":\"record\","
+ + " \"fields\":[{\"name\":\"c11\", \"type\":\"int\", \"default\": 2},"
+ + " {\"name\":\"c12\", \"type\":\"string\", \"default\":\"goodbye\"}]}";
+ private static String evolvedSchema2 = "{"
+ + " \"namespace\": \"org.apache.avro\","
+ + " \"name\": \"test_evolution\"," + " \"type\": \"record\","
+ + " \"fields\": ["
+ + " { \"name\": \"a\", \"type\":\"string\" },"
+ + " { \"name\": \"b\", \"type\":\"int\" },"
+ + " { \"name\": \"c\", \"type\":" + innerSchema + ","
+ + " \"default\":{\"c11\": 1, \"c12\": \"hello\"}"
+ + " }"
+ + " ]"
+ + "}";
+
+ GenericData.Record writtenRecord;
+ GenericData.Record evolvedRecord;
+ GenericData.Record innerRecord;
+
+ private static final Schema writer = Schema.parse(writerSchema);
+ private static final Schema evolved = Schema.parse(evolvedSchema2);
+ private static final Schema inner = Schema.parse(innerSchema);
+
+ @Before
+ public void setUp() {
+ writtenRecord = new GenericData.Record(writer);
+ writtenRecord.put("a", "record");
+ writtenRecord.put("b", 21);
+
+ innerRecord = new GenericData.Record(inner);
+ innerRecord.put("c11", 1);
+ innerRecord.put("c12", "hello");
+
+ evolvedRecord = new GenericData.Record(evolved);
+ evolvedRecord.put("a", "record");
+ evolvedRecord.put("b", 21);
+ evolvedRecord.put("c", innerRecord);
+ }
+
+ @Test
+ public void testTrevniEvolvedRead() throws IOException {
+ AvroColumnWriter<GenericRecord> acw =
+ new AvroColumnWriter<GenericRecord>(writer, new ColumnFileMetaData());
+ acw.write(writtenRecord);
+ File serializedTrevni = File.createTempFile("trevni", null);
+ acw.writeTo(serializedTrevni);
+
+ AvroColumnReader.Params params = new Params(serializedTrevni);
+ params.setSchema(evolved);
+ AvroColumnReader<GenericRecord> acr =
+ new AvroColumnReader<GenericRecord>(params);
+ GenericRecord readRecord = acr.next();
+ assertEquals(evolvedRecord, readRecord);
+ assertFalse(acr.hasNext());
+ }
+
+ @Test
+ public void testAvroEvolvedRead() throws IOException {
+ File serializedAvro = File.createTempFile("avro", null);
+ DatumWriter<GenericRecord> dw =
+ new GenericDatumWriter<GenericRecord>(writer);
+ DataFileWriter<GenericRecord> dfw =
+ new DataFileWriter<GenericRecord>(dw);
+ dfw.create(writer, serializedAvro);
+ dfw.append(writtenRecord);
+ dfw.flush();
+ dfw.close();
+
+ GenericDatumReader<GenericRecord> reader =
+ new GenericDatumReader<GenericRecord>(writer);
+ reader.setExpected(evolved);
+ DataFileReader<GenericRecord> dfr =
+ new DataFileReader<GenericRecord>(serializedAvro, reader);
+ GenericRecord readRecord = dfr.next();
+ assertEquals(evolvedRecord, readRecord);
+ assertFalse(dfr.hasNext());
+ }
+
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestEvolvedSchema.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java?rev=1442398&r1=1442397&r2=1442398&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestShredder.java Mon Feb 4 22:13:38 2013
@@ -62,10 +62,13 @@ public class TestShredder {
new ColumnMetaData("F", ValueType.BYTES));
}
+ private static final String SIMPLE_FIELDS =
+ "{\"name\":\"x\",\"type\":\"int\"},"+
+ "{\"name\":\"y\",\"type\":\"string\"}";
+
private static final String SIMPLE_RECORD =
"{\"type\":\"record\",\"name\":\"R\",\"fields\":["
- +"{\"name\":\"x\",\"type\":\"int\"},"
- +"{\"name\":\"y\",\"type\":\"string\"}"
+ +SIMPLE_FIELDS
+"]}";
@Test public void testSimpleRecord() throws Exception {
@@ -74,6 +77,17 @@ public class TestShredder {
new ColumnMetaData("y", ValueType.STRING));
}
+ @Test public void testDefaultValue() throws Exception {
+ String s =
+ "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+ +SIMPLE_FIELDS+","
+ +"{\"name\":\"z\",\"type\":\"int\","
+ +"\"default\":1,\""+RandomData.USE_DEFAULT+"\":true}"
+ +"]}";
+ checkWrite(Schema.parse(SIMPLE_RECORD));
+ checkRead(Schema.parse(s));
+ }
+
@Test public void testNestedRecord() throws Exception {
String s =
"{\"type\":\"record\",\"name\":\"S\",\"fields\":["
@@ -252,7 +266,8 @@ public class TestShredder {
private void checkRead(Schema schema) throws IOException {
AvroColumnReader<Object> reader =
- new AvroColumnReader<Object>(new AvroColumnReader.Params(FILE));
+ new AvroColumnReader<Object>(new AvroColumnReader.Params(FILE)
+ .setSchema(schema));
for (Object expected : new RandomData(schema, COUNT))
assertEquals(expected, reader.next());
reader.close();