You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/04/14 23:42:17 UTC
svn commit: r764968 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/
src/java/org/apache/avro/generic/ src/java/org/apache/avro/reflect/
src/java/org/apache/avro/specific/ src/test/java/org/apache/avro/
Author: cutting
Date: Tue Apr 14 21:42:16 2009
New Revision: 764968
URL: http://svn.apache.org/viewvc?rev=764968&view=rev
Log:
AVRO-6. Provide easier implementation of alternate generic data representations. Contributed by Hong Tang.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Apr 14 21:42:16 2009
@@ -6,6 +6,10 @@
NEW FEATURES
+ AVRO-6. Permit easier implementation of alternate generic data
+ representations, especially records with integer-indexed fields.
+ (Hong Tang via cutting)
+
IMPROVEMENTS
OPTIMIZATIONS
Modified: hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java Tue Apr 14 21:42:16 2009
@@ -17,12 +17,20 @@
*/
package org.apache.avro;
-import java.util.*;
-import java.io.*;
-
-import org.codehaus.jackson.map.*;
-import org.codehaus.jackson.map.node.*;
-import org.codehaus.jackson.*;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.JsonNode;
/** A set of messages forming an application protocol.
* <p> A protocol consists of:
@@ -72,7 +80,7 @@
StringBuilder buffer = new StringBuilder();
buffer.append("{\"request\": {");
int count = 0;
- for (Map.Entry<String,Schema> entry : request.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> entry : request.getFieldSchemas()) {
buffer.append("\"");
buffer.append(entry.getKey());
buffer.append("\": ");
Modified: hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Schema.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Schema.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Schema.java Tue Apr 14 21:42:16 2009
@@ -17,11 +17,22 @@
*/
package org.apache.avro;
-import java.util.*;
-import java.io.*;
-
-import org.codehaus.jackson.map.*;
-import org.codehaus.jackson.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.JsonNode;
+import org.codehaus.jackson.map.JsonTypeMapper;
/** An abstract data type.
* <p>A schema may be one of:
@@ -74,13 +85,14 @@
/** Create an anonymous record schema. */
public static Schema create(Map<String,Schema> fields) {
- return create(null, null, fields, false);
+ Schema result = create(null, null, false);
+ result.setFields(fields);
+ return result;
}
/** Create a named record schema. */
- public static Schema create(String name, String namespace,
- Map<String,Schema> fields, boolean isError) {
- return new RecordSchema(name, namespace, fields, isError);
+ public static Schema create(String name, String namespace, boolean isError) {
+ return new RecordSchema(name, namespace, isError);
}
/** Create an array schema. */
@@ -102,7 +114,17 @@
public Type getType() { return type; }
/** If this is a record, returns its fields. */
- public Map<String,Schema> getFields() {
+ public Map<String, Field> getFields() {
+ throw new AvroRuntimeException("Not a record: "+this);
+ }
+
+ /** If this is a record, enumerate its field names and their schemas. */
+ public Iterable<Map.Entry<String,Schema>> getFieldSchemas() {
+ throw new AvroRuntimeException("Not a record: "+this);
+ }
+
+ /** If this is a record, set its fields. */
+ public void setFields(Map<String,Schema> fields) {
throw new AvroRuntimeException("Not a record: "+this);
}
@@ -153,23 +175,54 @@
}
public int hashCode() { return getType().hashCode(); }
+ /** A field within a record. */
+ public static class Field {
+ int position;
+ Schema schema;
+ Field(int pos, Schema schema) {
+ this.position = pos;
+ this.schema = schema;
+ }
+ /** The position of this field within the record. */
+ public int pos() { return position; }
+ /** This field's {@link Schema}. */
+ public Schema schema() { return schema; }
+ public boolean equals(Object other) {
+ if (!(other instanceof Field)) return false;
+ Field that = (Field) other;
+ return (position == that.position) && (schema.equals(that.schema));
+ }
+ }
+
static class RecordSchema extends Schema {
private final String name;
private final String namespace;
- private final Map<String, Schema> fields;
+ private Map<String,Field> fields;
+ private Iterable<Map.Entry<String,Schema>> fieldSchemas;
private final boolean isError;
- public RecordSchema(String name, String namespace,
- Map<String, Schema> fields, boolean isError) {
+ public RecordSchema(String name, String namespace, boolean isError) {
super(Type.RECORD);
this.name = name;
this.namespace = namespace;
- this.fields = fields;
this.isError = isError;
}
public String getName() { return name; }
public String getNamespace() { return namespace; }
public boolean isError() { return isError; }
- public Map<String, Schema> getFields() { return fields; }
+ public Map<String, Field> getFields() { return fields; }
+ public Iterable<Map.Entry<String, Schema>> getFieldSchemas() {
+ return fieldSchemas;
+ }
+ public void setFields(Map<String,Schema> fields) {
+ if (this.fields != null)
+ throw new AvroRuntimeException("Fields are already set");
+ this.fields = new LinkedHashMap<String, Field>();
+ int i = 0;
+ this.fieldSchemas = fields.entrySet();
+ for (Map.Entry<String, Schema> field : this.fieldSchemas) {
+ this.fields.put(field.getKey(), new Field(i++, field.getValue()));
+ }
+ }
public boolean equals(Object o) {
if (o == this) return true;
return o instanceof RecordSchema
@@ -184,7 +237,7 @@
+(name==null?"":"\"name\": \""+name+"\", ")
+"\"fields\": {");
int count = 0;
- for (Map.Entry<String,Schema> entry : fields.entrySet()) {
+ for (Map.Entry<String, Schema> entry : fieldSchemas) {
buffer.append("\"");
buffer.append(entry.getKey());
buffer.append("\": ");
@@ -413,13 +466,14 @@
JsonNode spaceNode = schema.getFieldValue("namespace");
String space = spaceNode != null ? spaceNode.getTextValue() : null;
RecordSchema result =
- new RecordSchema(name, space, fields, type.equals("error"));
+ new RecordSchema(name, space, type.equals("error"));
if (name != null) names.put(name, result);
JsonNode props = schema.getFieldValue("fields");
for (Iterator<String> i = props.getFieldNames(); i.hasNext();) {
String prop = i.next();
fields.put(prop, parse(props.getFieldValue(prop), names));
}
+ result.setFields(fields);
return result;
} else if (type.equals("array")) { // array
return new ArraySchema(parse(schema.getFieldValue("items"), names));
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Tue Apr 14 21:42:16 2009
@@ -17,11 +17,14 @@
*/
package org.apache.avro.generic;
-import java.io.*;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
-import org.apache.avro.*;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.util.Utf8;
@@ -99,7 +102,7 @@
if (!(datum instanceof GenericRecord)) return false;
@SuppressWarnings(value="unchecked")
GenericRecord fields = (GenericRecord)datum;
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
if (!validate(entry.getValue(), fields.get(entry.getKey())))
return false;
return true;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Tue Apr 14 21:42:16 2009
@@ -17,16 +17,21 @@
*/
package org.apache.avro.generic;
-import java.io.*;
-import java.util.*;
-
-import org.apache.avro.*;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
-import org.apache.avro.io.*;
-import org.apache.avro.util.Utf8;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.ValueReader;
/** {@link DatumReader} for generic Java objects. */
-public class GenericDatumReader implements DatumReader<Object> {
+public class GenericDatumReader<D> implements DatumReader<D> {
private Schema actual;
private Schema expected;
@@ -43,8 +48,9 @@
public void setSchema(Schema actual) { this.actual = actual; }
- public Object read(Object reuse, ValueReader in) throws IOException {
- return read(reuse, actual, expected != null ? expected : actual, in);
+ @SuppressWarnings("unchecked")
+ public D read(D reuse, ValueReader in) throws IOException {
+ return (D) read(reuse, actual, expected != null ? expected : actual, in);
}
/** Called to read data.*/
@@ -105,41 +111,69 @@
throw new AvroTypeException("Expected "+expected+", found "+actual);
}
- /** Called to read a record instance. May be overridden for alternate record
+ /** Called to read a record instance. May be overridden for alternate record
* representations.*/
protected Object readRecord(Object old, Schema actual, Schema expected,
ValueReader in) throws IOException {
+ /* TODO: We may want to compute the expected and actual mapping and cache
+ * the mapping (keyed by <actual, expected>). */
String recordName = expected.getName();
if (recordName != null && !recordName.equals(actual.getName()))
throw new AvroTypeException("Expected "+expected+", found "+actual);
- Map<String,Schema> expectedFields = expected.getFields();
- GenericRecord record = newRecord(old, expected);
+ Map<String, Field> expectedFields = expected.getFields();
+ // all fields not in expected should be removed by newRecord.
+ Object record = newRecord(old, expected);
int size = 0;
- for (Map.Entry<String,Schema> entry : actual.getFields().entrySet()) {
- String name = entry.getKey();
- Schema aField = entry.getValue();
- Schema eField = expected == actual ? aField : expectedFields.get(name);
- if (eField == null) {
- skip(aField, in);
+ for (Map.Entry<String, Field> entry : actual.getFields().entrySet()) {
+ String fieldName = entry.getKey();
+ Field actualField = entry.getValue();
+ Field expectedField =
+ expected == actual ? actualField : expectedFields.get(entry.getKey());
+ if (expectedField == null) {
+ skip(actualField.schema(), in);
continue;
}
- Object oldDatum = old != null ? record.get(name) : null;
- record.put(name, read(oldDatum, aField, eField, in));
+ int fieldPosition = expectedField.pos();
+ Object oldDatum =
+ (old != null) ? getField(record, fieldName, fieldPosition) : null;
+ addField(record, fieldName, fieldPosition,
+ read(oldDatum,actualField.schema(),expectedField.schema(), in));
size++;
}
- if (record.size() > size) { // clear old fields
- Iterator<String> i = record.keySet().iterator();
- while (i.hasNext()) {
- String f = i.next();
- if (!(actual.getFields().containsKey(f) &&
- expected.getFields().containsKey(f)))
- i.remove();
+ if (expectedFields.size() > size) {
+ // clear old fields (in expected, but not in actual)
+ Set<String> actualFields = actual.getFields().keySet();
+ for (Map.Entry<String, Field> entry : expectedFields.entrySet()) {
+ String f = entry.getKey();
+ if (!actualFields.contains(f))
+ removeField(record, f, entry.getValue().pos());
}
}
return record;
}
- /** Called to read a array instance. May be overridden for alternate array
+ /** Called by the default implementation of {@link #readRecord} to add a
+ * record fields value to a record instance. The default implementation is
+ * for {@link GenericRecord}.*/
+ protected void addField(Object record, String name, int position, Object o) {
+ ((GenericRecord) record).put(name, o);
+ }
+
+ /** Called by the default implementation of {@link #readRecord} to retrieve a
+ * record field value from a reused instance. The default implementation is
+ * for {@link GenericRecord}.*/
+ protected Object getField(Object record, String name, int position) {
+ return ((GenericRecord) record).get(name);
+ }
+
+ /** Called by the default implementation of {@link #readRecord} to remove a
+ * record field value from a reused instance. The default implementation is
+ * for {@link GenericRecord}.*/
+ protected void removeField(Object record, String field, int position) {
+ ((GenericRecord) record).remove(field);
+ }
+
+ /** Called to read an array instance. May be overridden for alternate array
* representations.*/
@SuppressWarnings(value="unchecked")
protected Object readArray(Object old, Schema actual, Schema expected,
@@ -147,18 +181,28 @@
Schema actualType = actual.getElementType();
Schema expectedType = expected.getElementType();
long firstBlockSize = in.readLong();
- GenericArray array;
- if (old instanceof GenericArray) {
- array = (GenericArray)old;
- array.clear();
- } else
- array = newArray((int)firstBlockSize);
+ Object array = newArray(old, (int) firstBlockSize);
for (long l = firstBlockSize; l > 0; l = in.readLong())
for (long i = 0; i < l; i++)
- array.add(read(array.peek(), actualType, expectedType, in));
+ addToArray(array, read(peekArray(array), actualType, expectedType, in));
return array;
}
+ /** Called by the default implementation of {@link #readArray} to retrieve a
+ * value from a reused instance. The default implementation is for {@link
+ * GenericArray}.*/
+ @SuppressWarnings("unchecked")
+ protected Object peekArray(Object array) {
+ return ((GenericArray) array).peek();
+ }
+
+ /** Called by the default implementation of {@link #readArray} to add a value.
+ * The default implementation is for {@link GenericArray}.*/
+ @SuppressWarnings("unchecked")
+ protected void addToArray(Object array, Object e) {
+ ((GenericArray) array).add(e);
+ }
+
/** Called to read a map instance. May be overridden for alternate map
* representations.*/
@SuppressWarnings(value="unchecked")
@@ -169,22 +213,30 @@
Schema eKey = expected.getKeyType();
Schema eValue = expected.getValueType();
int firstBlockSize = (int)in.readLong();
- Map map;
- if (old instanceof Map) {
- map = (Map)old;
- map.clear();
- } else
- map = newMap(firstBlockSize);
+ Object map = newMap(old, firstBlockSize);
for (long l = firstBlockSize; l > 0; l = in.readLong())
for (long i = 0; i < l; i++)
- map.put(read(null, aKey, eKey, in), read(null, aValue, eValue, in));
+ addToMap(map, read(null, aKey, eKey, in),
+ read(null, aValue, eValue, in));
return map;
}
- /** Called to create new record instances. Subclasses may override to use a
- * different record implementation. By default, this returns a {@link
- * GenericData.Record}.*/
- protected GenericRecord newRecord(Object old, Schema schema) {
+ /** Called by the default implementation of {@link #readMap} to add a
+ * key/value pair. The default implementation is for {@link Map}.*/
+ @SuppressWarnings("unchecked")
+ protected void addToMap(Object map, Object key, Object value) {
+ ((Map) map).put(key, value);
+ }
+
+ /**
+ * Called to create new record instances. Subclasses may override to use a
+ * different record implementation. The returned instance must conform to the
+ * schema provided. If the old object contains fields not present in the
+ * schema, they should either be removed from the old object, or it should
+ * create a new instance that conforms to the schema. By default, this returns
+ * a {@link GenericData.Record}.
+ */
+ protected Object newRecord(Object old, Schema schema) {
if (old instanceof GenericRecord) {
GenericRecord record = (GenericRecord)old;
if (record.getSchema() == schema)
@@ -196,15 +248,21 @@
/** Called to create new array instances. Subclasses may override to use a
* different array implementation. By default, this returns a {@link
* GenericData.Array}.*/
- protected GenericArray newArray(int size) {
- return new GenericData.Array(size);
+ protected Object newArray(Object old, int size) {
+ if (old instanceof GenericArray) {
+ ((GenericArray) old).clear();
+ return old;
+ } else return new GenericData.Array(size);
}
/** Called to create new array instances. Subclasses may override to use a
* different map implementation. By default, this returns a {@link
* HashMap}.*/
- protected Map<Object,Object> newMap(int size) {
- return new HashMap<Object,Object>(size);
+ protected Object newMap(Object old, int size) {
+ if (old instanceof Map) {
+ ((Map) old).clear();
+ return old;
+ } else return new HashMap<Object, Object>(size);
}
/** Called to read strings. Subclasses may override to use a different
@@ -225,7 +283,7 @@
public static void skip(Schema schema, ValueReader in) throws IOException {
switch (schema.getType()) {
case RECORD:
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
skip(entry.getValue(), in);
break;
case ARRAY:
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java Tue Apr 14 21:42:16 2009
@@ -17,16 +17,23 @@
*/
package org.apache.avro.generic;
-import java.io.*;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.avro.*;
-import org.apache.avro.io.*;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.ValueWriter;
import org.apache.avro.util.Utf8;
/** {@link DatumWriter} for generic Java objects. */
-public class GenericDatumWriter implements DatumWriter<Object> {
+public class GenericDatumWriter<D> implements DatumWriter<D> {
private Schema root;
public GenericDatumWriter() {}
@@ -37,7 +44,7 @@
public void setSchema(Schema root) { this.root = root; }
- public void write(Object datum, ValueWriter out) throws IOException {
+ public void write(D datum, ValueWriter out) throws IOException {
write(root, datum, out);
}
@@ -69,40 +76,58 @@
* representations.*/
protected void writeRecord(Schema schema, Object datum, ValueWriter out)
throws IOException {
- if (!(datum instanceof GenericRecord)) error(schema,datum);
- GenericRecord record = (GenericRecord)datum;
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet())
- write(entry.getValue(), record.get(entry.getKey()), out);
-
+ for (Entry<String, Field> entry : schema.getFields().entrySet()) {
+ Field field = entry.getValue();
+ write(field.schema(), getField(datum, entry.getKey(), field.pos()), out);
+ }
}
-
+
+ /** Called by the default implementation of {@link #writeRecord} to retrieve
+ * a record field value. The default implementation is for {@link
+ * GenericRecord}.*/
+ protected Object getField(Object record, String field, int position) {
+ return ((GenericRecord) record).get(field);
+ }
+
/** Called to write a array. May be overridden for alternate array
* representations.*/
protected void writeArray(Schema schema, Object datum, ValueWriter out)
throws IOException {
- if (!(datum instanceof GenericArray)) error(schema,datum);
Schema element = schema.getElementType();
- GenericArray array = (GenericArray)datum;
- if (array.size() > 0) {
- out.writeLong(array.size());
- for (Object o : array)
- write(element, o, out);
+ long size = getArraySize(datum);
+ if (size > 0) {
+ out.writeLong(size);
+ for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();)
+ write(element, it.next(), out);
}
out.writeLong(0);
}
+ /** Called by the default implementation of {@link #writeArray} to get the
+ * size of an array. The default implementation is for {@link
+ * GenericArray}.*/
+ @SuppressWarnings("unchecked")
+ protected long getArraySize(Object array) {
+ return ((GenericArray) array).size();
+ }
+
+ /** Called by the default implementation of {@link #writeArray} to enumerate
+ * array elements. The default implementation is for {@link GenericArray}.*/
+ @SuppressWarnings("unchecked")
+ protected Iterator<? extends Object> getArrayElements(Object array) {
+ return ((GenericArray) array).iterator();
+ }
+
/** Called to write a map. May be overridden for alternate map
* representations.*/
protected void writeMap(Schema schema, Object datum, ValueWriter out)
throws IOException {
- if (!(datum instanceof Map)) error(schema,datum);
Schema key = schema.getKeyType();
Schema value = schema.getValueType();
- @SuppressWarnings(value="unchecked")
- Map<Object,Object> map = (Map<Object,Object>)datum;
- if (map.size() > 0) {
- out.writeLong(map.size()); // write a single block
- for (Map.Entry<Object,Object> entry : map.entrySet()) {
+ int size = getMapSize(datum);
+ if (size > 0) {
+ out.writeLong(size); // write a single block
+ for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
write(key, entry.getKey(), out);
write(value, entry.getValue(), out);
}
@@ -110,6 +135,20 @@
out.writeLong(0);
}
+ /** Called by the default implementation of {@link #writeMap} to get the size
+ * of a map. The default implementation is for {@link Map}.*/
+ @SuppressWarnings("unchecked")
+ protected int getMapSize(Object map) {
+ return ((Map) map).size();
+ }
+
+ /** Called by the default implementation of {@link #writeMap} to enumerate
+ * map elements. The default implementation is for {@link Map}.*/
+ @SuppressWarnings("unchecked")
+ protected Iterable<Map.Entry<Object,Object>> getMapEntries(Object map) {
+ return ((Map) map).entrySet();
+ }
+
/** Called to write a string. May be overridden for alternate string
* representations.*/
protected void writeString(Object datum, ValueWriter out) throws IOException {
@@ -137,15 +176,13 @@
protected boolean instanceOf(Schema schema, Object datum) {
switch (schema.getType()) {
case RECORD:
- if (!(datum instanceof GenericRecord)) return false;
+ if (!isRecord(datum)) return false;
return (schema.getName() == null) ||
schema.getName().equals(((GenericRecord)datum).getSchema().getName());
- case ARRAY:
- return datum instanceof GenericArray;
- case MAP:
- return datum instanceof Map && !(datum instanceof GenericRecord);
- case STRING: return datum instanceof Utf8;
- case BYTES: return datum instanceof ByteBuffer;
+ case ARRAY: return isArray(datum);
+ case MAP: return isMap(datum);
+ case STRING: return isString(datum);
+ case BYTES: return isBytes(datum);
case INT: return datum instanceof Integer;
case LONG: return datum instanceof Long;
case FLOAT: return datum instanceof Float;
@@ -155,7 +192,32 @@
default: throw new AvroRuntimeException("Unexpected type: " +schema);
}
}
+
+ /** Called by the default implementation of {@link #instanceOf}.*/
+ protected boolean isArray(Object datum) {
+ return datum instanceof GenericArray;
+ }
+
+ /** Called by the default implementation of {@link #instanceOf}.*/
+ protected boolean isRecord(Object datum) {
+ return datum instanceof GenericRecord;
+ }
+ /** Called by the default implementation of {@link #instanceOf}.*/
+ protected boolean isMap(Object datum) {
+ return (datum instanceof Map) && (!(datum instanceof GenericRecord));
+ }
+
+ /** Called by the default implementation of {@link #instanceOf}.*/
+ protected boolean isString(Object datum) {
+ return datum instanceof Utf8;
+ }
+
+ /** Called by the default implementation of {@link #instanceOf}.*/
+ protected boolean isBytes(Object datum) {
+ return datum instanceof ByteBuffer;
+ }
+
private void error(Schema schema, Object datum) {
throw new AvroTypeException("Not a "+schema+": "+datum);
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java Tue Apr 14 21:42:16 2009
@@ -42,16 +42,16 @@
public void writeRequest(Schema schema, Object request, ValueWriter out)
throws IOException {
- new GenericDatumWriter(schema).write(request, out);
+ new GenericDatumWriter<Object>(schema).write(request, out);
}
public Object readResponse(Schema schema, ValueReader in) throws IOException {
- return new GenericDatumReader(schema).read(null, in);
+ return new GenericDatumReader<Object>(schema).read(null, in);
}
public AvroRemoteException readError(Schema schema, ValueReader in)
throws IOException {
- return new AvroRemoteException(new GenericDatumReader(schema).read(null,in));
+ return new AvroRemoteException(new GenericDatumReader<Object>(schema).read(null,in));
}
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java Tue Apr 14 21:42:16 2009
@@ -18,13 +18,14 @@
package org.apache.avro.generic;
-import java.util.*;
-import java.nio.ByteBuffer;
-import java.io.*;
-
-import org.apache.avro.*;
-import org.apache.avro.io.*;
-import org.apache.avro.ipc.*;
+import java.io.IOException;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.io.ValueReader;
+import org.apache.avro.io.ValueWriter;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Responder;
/** {@link Responder} implementation for generic Java data. */
public abstract class GenericResponder extends Responder {
@@ -35,19 +36,19 @@
/** Reads a request message. */
public Object readRequest(Schema schema, ValueReader in) throws IOException {
- return new GenericDatumReader(schema).read(null, in);
+ return new GenericDatumReader<Object>(schema).read(null, in);
}
/** Writes a response message. */
public void writeResponse(Schema schema, Object response, ValueWriter out)
throws IOException {
- new GenericDatumWriter(schema).write(response, out);
+ new GenericDatumWriter<Object>(schema).write(response, out);
}
/** Writes an error message. */
public void writeError(Schema schema, AvroRemoteException error,
ValueWriter out) throws IOException {
- new GenericDatumWriter(schema).write(error.getValue(), out);
+ new GenericDatumWriter<Object>(schema).write(error.getValue(), out);
}
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java Tue Apr 14 21:42:16 2009
@@ -17,21 +17,30 @@
*/
package org.apache.avro.reflect;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.lang.reflect.*;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
-
-import com.thoughtworks.paranamer.Paranamer;
-import com.thoughtworks.paranamer.CachingParanamer;
-
-import org.apache.avro.*;
-import org.apache.avro.Schema.Type;
+import java.lang.reflect.ParameterizedType;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
import org.apache.avro.Protocol.Message;
-import org.apache.avro.util.Utf8;
-import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+
+import com.thoughtworks.paranamer.CachingParanamer;
+import com.thoughtworks.paranamer.Paranamer;
/** Utilities to use existing Java classes and interfaces via reflection. */
public class ReflectData {
@@ -43,7 +52,7 @@
case RECORD:
Class recordClass = datum.getClass();
if (!(datum instanceof Object)) return false;
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
try {
if (!validate(entry.getValue(),
recordClass.getField(entry.getKey()).get(datum)))
@@ -138,7 +147,7 @@
Schema schema = names.get(name);
if (schema == null) {
Map<String,Schema> fields = new LinkedHashMap<String,Schema>();
- schema = Schema.create(name, c.getPackage().getName(), fields,
+ schema = Schema.create(name, c.getPackage().getName(),
Throwable.class.isAssignableFrom(c));
if (!names.containsKey(name))
names.put(name, schema);
@@ -146,6 +155,7 @@
if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0)
fields.put(field.getName(),
createSchema(field.getGenericType(), names));
+ schema.setFields(fields);
}
return schema;
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java Tue Apr 14 21:42:16 2009
@@ -17,18 +17,20 @@
*/
package org.apache.avro.reflect;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.lang.reflect.*;
-
-import org.apache.avro.*;
-import org.apache.avro.io.*;
-import org.apache.avro.util.Utf8;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.ValueReader;
/** {@link DatumReader} for existing classes via Java reflection. */
-public class ReflectDatumReader extends GenericDatumReader {
+public class ReflectDatumReader extends GenericDatumReader<Object> {
protected String packageName;
public ReflectDatumReader(String packageName) {
@@ -49,16 +51,16 @@
throw new AvroRuntimeException(e);
}
expected = ReflectData.getSchema(recordClass);
- Map<String,Schema> expectedFields = expected.getFields();
+ Map<String,Schema.Field> expectedFields = expected.getFields();
Object record = recordClass.isInstance(old) ? old : newInstance(recordClass);
- for (Map.Entry<String,Schema> entry : actual.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> entry : actual.getFieldSchemas()) {
try {
Field field = recordClass.getField(entry.getKey());
field.setAccessible(true);
String key = entry.getKey();
Schema aField = entry.getValue();
- Schema eField =
- field.getType() == Object.class ? aField : expectedFields.get(key);
+ Schema eField = field.getType() ==
+ Object.class ? aField : expectedFields.get(key).schema();
field.set(record, read(null, aField, eField, in));
} catch (NoSuchFieldException e) { // ignore unmatched field
} catch (IllegalAccessException e) {
@@ -72,6 +74,7 @@
private static final Map<Class,Constructor> CTOR_CACHE =
new ConcurrentHashMap<Class,Constructor>();
+ /** Create a new instance of the named class. */
@SuppressWarnings("unchecked")
protected static Object newInstance(Class c) {
Object result;
@@ -89,4 +92,23 @@
return result;
}
+ @Override
+ protected void addField(Object record, String name, int position, Object o) {
+ throw new AvroRuntimeException("Not implemented");
+ }
+
+ @Override
+ protected Object getField(Object record, String name, int position) {
+ throw new AvroRuntimeException("Not implemented");
+ }
+
+ @Override
+ protected void removeField(Object record, String field, int position) {
+ throw new AvroRuntimeException("Not implemented");
+ }
+
+ @Override
+ protected Object newRecord(Object old, Schema schema) {
+ throw new AvroRuntimeException("Not implemented");
+ }
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java Tue Apr 14 21:42:16 2009
@@ -17,19 +17,19 @@
*/
package org.apache.avro.reflect;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.lang.reflect.*;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
-import org.apache.avro.*;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
-import org.apache.avro.io.*;
-import org.apache.avro.util.Utf8;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.ValueWriter;
/** {@link DatumWriter} for existing classes via Java reflection. */
-public class ReflectDatumWriter extends GenericDatumWriter {
+public class ReflectDatumWriter extends GenericDatumWriter<Object> {
public ReflectDatumWriter() {}
public ReflectDatumWriter(Schema root) {
@@ -39,7 +39,7 @@
protected void writeRecord(Schema schema, Object datum, ValueWriter out)
throws IOException {
Class recordClass = datum.getClass();
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
try {
Field field = recordClass.getField(entry.getKey());
write(entry.getValue(), field.get(datum), out);
@@ -50,6 +50,16 @@
}
}
}
+
+ @Override
+ protected boolean isRecord(Object datum) {
+ return ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
+ }
+
+ @Override
+ protected Object getField(Object record, String field, int position) {
+ throw new AvroRuntimeException("Not implemented");
+ }
protected boolean instanceOf(Schema schema, Object datum) {
return (schema.getType() == Type.RECORD)
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java Tue Apr 14 21:42:16 2009
@@ -18,13 +18,21 @@
package org.apache.avro.reflect;
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.*;
-
-import org.apache.avro.*;
-import org.apache.avro.io.*;
-import org.apache.avro.ipc.*;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.ValueReader;
+import org.apache.avro.io.ValueWriter;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Transceiver;
/** A {@link Requestor} for existing interfaces via Java reflection. */
public class ReflectRequestor extends Requestor implements InvocationHandler {
@@ -53,7 +61,7 @@
throws IOException {
Object[] args = (Object[])request;
int i = 0;
- for (Map.Entry<String,Schema> param : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> param : schema.getFieldSchemas())
getDatumWriter(param.getValue()).write(args[i++], out);
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java Tue Apr 14 21:42:16 2009
@@ -18,17 +18,24 @@
package org.apache.avro.reflect;
-import java.util.*;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
-import java.io.*;
-import java.lang.reflect.*;
+import java.util.Map;
+import java.util.Set;
-import org.apache.avro.*;
-import org.apache.avro.io.*;
-import org.apache.avro.util.*;
-import org.apache.avro.ipc.*;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.ValueReader;
+import org.apache.avro.io.ValueWriter;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.util.Utf8;
/** {@link Responder} for existing interfaces via Java reflection.*/
public class ReflectResponder extends Responder {
@@ -51,10 +58,9 @@
/** Reads a request message. */
public Object readRequest(Schema schema, ValueReader in) throws IOException {
- Map<String,Schema> params = schema.getFields();
- Object[] args = new Object[params.size()];
+ Object[] args = new Object[schema.getFields().size()];
int i = 0;
- for (Map.Entry<String,Schema> param : params.entrySet())
+ for (Map.Entry<String, Schema> param : schema.getFieldSchemas())
args[i++] = getDatumReader(param.getValue()).read(null, in);
return args;
}
@@ -73,11 +79,10 @@
public Object respond(Message message, Object request)
throws AvroRemoteException {
- Map<String,Schema> params = message.getRequest().getFields();
- Class[] paramTypes = new Class[params.size()];
+ Class[] paramTypes = new Class[message.getRequest().getFields().size()];
int i = 0;
try {
- for (Map.Entry<String,Schema> param : params.entrySet())
+ for (Map.Entry<String,Schema> param: message.getRequest().getFieldSchemas())
paramTypes[i++] = paramType(param.getValue());
Method method = impl.getClass().getMethod(message.getName(), paramTypes);
return method.invoke(impl, (Object[])request);
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java Tue Apr 14 21:42:16 2009
@@ -17,14 +17,15 @@
*/
package org.apache.avro.specific;
-import java.io.*;
-import java.util.*;
-
-import org.codehaus.jackson.map.*;
-import org.codehaus.jackson.*;
-
-import org.apache.avro.*;
-import org.apache.avro.Protocol.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.Protocol.Message;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.JsonTypeMapper;
/** Generate specific Java interfaces and classes for protocols and schemas. */
public class SpecificCompiler {
@@ -90,7 +91,7 @@
private String params(Schema request) {
StringBuilder b = new StringBuilder();
int count = 0;
- for (Map.Entry<String,Schema> param : request.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> param : request.getFieldSchemas()) {
String paramName = param.getKey();
b.append(type(param.getValue(), paramName));
b.append(" ");
@@ -123,7 +124,7 @@
line(d+1, "private static final Schema _SCHEMA = Schema.parse(\""
+esc(schema)+"\");");
// field declations
- for (Map.Entry<String,Schema> field : schema.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> field : schema.getFieldSchemas()) {
String fieldName = field.getKey();
line(d+1, "public "+type(field.getValue(),fieldName)+" "+fieldName+";");
}
@@ -133,7 +134,7 @@
line(d+1, "public Object get(int _field) {");
line(d+2, "switch (_field) {");
int i = 0;
- for (Map.Entry<String,Schema> field : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> field : schema.getFieldSchemas())
line(d+2, "case "+(i++)+": return "+field.getKey()+";");
line(d+2, "default: throw new AvroRuntimeException(\"Bad index\");");
line(d+2, "}");
@@ -143,7 +144,7 @@
line(d+1, "public void set(int _field, Object _value) {");
line(d+2, "switch (_field) {");
i = 0;
- for (Map.Entry<String,Schema> field : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> field : schema.getFieldSchemas())
line(d+2, "case "+(i++)+": "+field.getKey()+" = ("+
type(field.getValue(),field.getKey())+")_value; break;");
line(d+2, "default: throw new AvroRuntimeException(\"Bad index\");");
@@ -153,7 +154,7 @@
// nested classes
if (d == 0)
- for (Map.Entry<String,Schema> field : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> field : schema.getFieldSchemas())
compile(field.getValue(), null, d+1);
break;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java Tue Apr 14 21:42:16 2009
@@ -17,13 +17,16 @@
*/
package org.apache.avro.specific;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.lang.reflect.*;
-
-import org.apache.avro.*;
-import org.apache.avro.io.*;
+import java.io.IOException;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.ValueReader;
import org.apache.avro.reflect.ReflectDatumReader;
/** {@link DatumReader} for generated Java classes. */
@@ -38,17 +41,18 @@
protected Object readRecord(Object old, Schema remote, Schema local,
ValueReader in) throws IOException {
+ /* TODO: Use schema's field numbers instead of creating our own map? */
Class c = getClass(remote.getName());
SpecificRecord record =
(SpecificRecord)(c.isInstance(old) ? old : newInstance(c));
local = record.schema();
- Map<String,Schema> localFields = local.getFields();
+ Map<String,Schema.Field> localFields = local.getFields();
int[] map = getMap(local, remote);
int i = 0, size = 0, j = 0;
- for (Map.Entry<String,Schema> entry : remote.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> entry : remote.getFieldSchemas()) {
String key = entry.getKey();
Schema rField = entry.getValue();
- Schema lField = local == remote ? rField : localFields.get(key);
+ Schema lField = local == remote ? rField : localFields.get(key).schema();
int fieldNum = map[i++];
if (fieldNum == -1) {
skip(rField, in);
@@ -59,9 +63,8 @@
size++;
}
if (local.getFields().size() > size) // clear unset fields
- for (Map.Entry<String,Schema> entry : local.getFields().entrySet()) {
- if (!(remote.getFields().containsKey(entry.getKey()) &&
- local.getFields().containsKey(entry.getKey())))
+ for (Map.Entry<String, Schema> entry : local.getFieldSchemas()) {
+ if (!(remote.getFields().containsKey(entry.getKey())))
record.set(j, null);
j++;
}
@@ -105,7 +108,7 @@
private static int[] createMap(Schema remote, Schema local) {
int[] map = new int[remote.getFields().size()];
int i = 0;
- for (Map.Entry<String,Schema> f : remote.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> f : remote.getFieldSchemas()) {
map[i++] = getLocalIndex(f.getKey(), f.getValue().getType(), local);
}
return map;
@@ -114,7 +117,7 @@
private static int getLocalIndex(String name, Schema.Type type,
Schema local) {
int i = 0;
- for (Map.Entry<String,Schema> f : local.getFields().entrySet()) {
+ for (Map.Entry<String, Schema> f : local.getFieldSchemas()) {
if (f.getKey().equals(name) && f.getValue().getType().equals(type))
return i;
i++;
@@ -122,4 +125,23 @@
return -1;
}
+ @Override
+ protected void addField(Object record, String name, int position, Object o) {
+ throw new AvroRuntimeException("Not implemented");
+ }
+
+ @Override
+ protected Object getField(Object record, String name, int position) {
+ throw new AvroRuntimeException("Not implemented");
+ }
+
+ @Override
+ protected void removeField(Object record, String field, int position) {
+ throw new AvroRuntimeException("Not implemented");
+ }
+
+ @Override
+ protected Object newRecord(Object old, Schema schema) {
+ throw new AvroRuntimeException("Not implemented");
+ }
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java Tue Apr 14 21:42:16 2009
@@ -17,11 +17,12 @@
*/
package org.apache.avro.specific;
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
-import org.apache.avro.*;
-import org.apache.avro.io.*;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.ValueWriter;
import org.apache.avro.reflect.ReflectDatumWriter;
/** {@link DatumWriter} for generated Java classes. */
@@ -36,8 +37,7 @@
throws IOException {
SpecificRecord record = (SpecificRecord)datum;
int i = 0;
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
write(entry.getValue(), record.get(i++), out);
}
}
-
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java Tue Apr 14 21:42:16 2009
@@ -18,14 +18,17 @@
package org.apache.avro.specific;
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
import java.lang.reflect.Proxy;
-import org.apache.avro.*;
-import org.apache.avro.io.*;
-import org.apache.avro.ipc.*;
-import org.apache.avro.reflect.*;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectRequestor;
/** {@link Requestor} for generated interfaces. */
public class SpecificRequestor extends ReflectRequestor {
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java Tue Apr 14 21:42:16 2009
@@ -20,11 +20,18 @@
import java.io.File;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
-import org.apache.avro.util.Utf8;
-import org.apache.avro.generic.*;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
/** Generates schema data as Java objects with random values. */
public class RandomData implements Iterable<Object> {
@@ -60,7 +67,7 @@
switch (schema.getType()) {
case RECORD:
GenericRecord record = new GenericData.Record(schema);
- for (Map.Entry<String,Schema> entry : schema.getFields().entrySet())
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
record.put(entry.getKey(), generate(entry.getValue(), random, d+1));
return record;
case ARRAY:
@@ -116,7 +123,7 @@
DataFileWriter<Object> writer =
new DataFileWriter<Object>(sch,
new FileOutputStream(new File(args[1]),false),
- new GenericDatumWriter());
+ new GenericDatumWriter<Object>());
try {
for (Object datum : new RandomData(sch, Integer.parseInt(args[2]))) {
writer.append(datum);
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Tue Apr 14 21:42:16 2009
@@ -17,17 +17,20 @@
*/
package org.apache.avro;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
import junit.framework.TestCase;
-import org.codehaus.jackson.map.JsonNode;
-import org.apache.avro.io.*;
-import org.apache.avro.file.*;
-import org.apache.avro.generic.*;
-import org.apache.avro.specific.*;
-import org.apache.avro.reflect.*;
-import org.apache.avro.util.*;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
public class TestDataFile extends TestCase {
private static final int COUNT =
@@ -51,7 +54,7 @@
DataFileWriter<Object> writer =
new DataFileWriter<Object>(SCHEMA,
new FileOutputStream(FILE),
- new GenericDatumWriter());
+ new GenericDatumWriter<Object>());
try {
for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
writer.append(datum);
@@ -64,7 +67,7 @@
public void testGenericRead() throws IOException {
DataFileReader<Object> reader =
new DataFileReader<Object>(new SeekableFileInput(FILE),
- new GenericDatumReader());
+ new GenericDatumReader<Object>());
try {
Object datum = null;
if (VALIDATE) {
@@ -83,7 +86,7 @@
}
public void testGeneratedGeneric() throws IOException {
- readFiles(new GenericDatumReader());
+ readFiles(new GenericDatumReader<Object>());
}
public void testGeneratedSpecific() throws IOException {
@@ -118,10 +121,10 @@
if (args.length > 1)
projection = Schema.parse(new File(args[1]));
TestDataFile tester = new TestDataFile();
- tester.readFile(input, new GenericDatumReader(null, projection), false);
+ tester.readFile(input, new GenericDatumReader<Object>(null, projection), false);
long start = System.currentTimeMillis();
for (int i = 0; i < 4; i++)
- tester.readFile(input, new GenericDatumReader(null, projection), false);
+ tester.readFile(input, new GenericDatumReader<Object>(null, projection), false);
System.out.println("Time: "+(System.currentTimeMillis()-start));
}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java?rev=764968&r1=764967&r2=764968&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java Tue Apr 14 21:42:16 2009
@@ -111,7 +111,7 @@
GenericData.validate(schema, datum));
checkSerialization(schema, datum,
- new GenericDatumWriter(), new GenericDatumReader());
+ new GenericDatumWriter<Object>(), new GenericDatumReader<Object>());
}
}