You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2019/05/01 19:11:21 UTC
[avro] 11/14: Fleshed out rest of cases in GenericReader2 (but do
not handle logical types).
This is an automated email from the ASF dual-hosted git repository.
thiru pushed a commit to branch fast-decoder-thiru
in repository https://gitbox.apache.org/repos/asf/avro.git
commit 5e50f64995559c647fc98e3f87e1aebccec3f7c7
Author: rstata <rs...@yahoo.com>
AuthorDate: Tue Apr 30 16:34:39 2019 -0700
Fleshed out rest of cases in GenericReader2 (but do not handle logical types).
---
.../java/org/apache/avro/generic/Advancer.java | 107 ++++++++++++++-------
.../apache/avro/generic/GenericDatumReader2.java | 96 ++++++++++++------
2 files changed, 137 insertions(+), 66 deletions(-)
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
index d80e812..7c469cb 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
@@ -57,9 +57,9 @@ import org.apache.avro.util.Utf8;
* and return the actual value.)
*
* Traversing records, arrays, and maps is more involved. In the
- * case of an array or map, call {@link getContainerAdvancer} and
- * proceed as described in the documentation for {@link
- * Advancer.Container}. For records, best to just look at the
+ * case of an array or map, call {@link getArrayAdvancer} {@link
+ * getMapAdvancer} and proceed as described in the documentation for
+ * {@link Advancer.Container}. For records, best to just look at the
* implementation of {@link GenericDatumReader2}.
**/
abstract class Advancer {
@@ -112,11 +112,18 @@ abstract class Advancer {
return null;
}
- /** Access to advancer for array or map type. */
- public Container getContainerAdvancer(Decoder in) throws IOException {
+ /** Access to advancer for array type. */
+ public Container getArrayAdvancer(Decoder in) throws IOException {
exception();
return null;
}
+
+ /** Access to advancer for array type. */
+ public Map getMapAdvancer(Decoder in) throws IOException {
+ exception();
+ return null;
+ }
+
/** Access to advancer for record type. */
public Record getRecordAdvancer(Decoder in) throws IOException {
exception();
@@ -168,8 +175,8 @@ abstract class Advancer {
case CONTAINER:
Advancer ea = Advancer.from(((Resolver.Container)a).elementAction);
if (a.writer.getType() == Schema.Type.ARRAY)
- return new ArrayContainer(a.writer, a.reader, ea);
- else return new MapContainer(a.writer, a.reader, ea);
+ return new Container(a.writer, a.reader, ea);
+ else return new Map(a.writer, a.reader, ea);
case RECORD:
return Advancer.Record.from((Resolver.RecordAdjust)a);
@@ -188,7 +195,7 @@ abstract class Advancer {
ru.firstMatch, Advancer.from(ru.actualAction));
case ERROR:
- return new Error(w,r, a.toString());
+ return new Error(a.writer,a.reader, a.toString());
case SKIP:
throw new RuntimeException("Internal error. Skip should've been consumed.");
default:
@@ -224,42 +231,54 @@ abstract class Advancer {
}
}
- /** Used for Array and Map. The following fragment illustrates how
+ /** Used for Array. The following fragment illustrates how
* to use to read an array of int:
*
* <pre>
- * Advancer c = advancer.getContainerAdvancer(in);
- * Advancer.Container ec = c.getElementAdvancer(in);
+ * Advancer.Container c = advancer.getArrayAdvancer(in);
* for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) {
* for (long j = 0; j < i; j++) {
- * int element = c.readInt(in);
+ * int element = c.elementAdvancer.readInt(in);
* // .. do something with this element
* }
* }
* </pre>
* See the implementation of {@link GenericDatumReader2} for more
* illustrations. */
- public abstract static class Container extends Advancer {
- private final Advancer elementAdvancer;
- public Container(Schema w, Schema r, Advancer ea) { super(wr); elementAdvancer = ea; }
- public Container getContainerAdvancer(Decoder in) { return this; }
- public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; }
- public abstract long firstChunk(Decoder in) throws IOException;
- public abstract long nextChunk(Decoder in) throws IOException;
- }
+ public static class Container extends Advancer {
+ public final Advancer elementAdvancer;
+ public Container(Schema w, Schema r, Advancer ea)
+ { super(w,r); elementAdvancer = ea; }
- private static class ArrayContainer extends Container {
- public ArrayContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
public long firstChunk(Decoder in) throws IOException
{ return in.readArrayStart(); }
+
public long nextChunk(Decoder in) throws IOException
{ return in.arrayNext(); }
}
- private static class MapContainer extends Container {
- public MapContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
+ /** Used for Map. The following fragment illustrates how
+ * to use to read an array of int:
+ *
+ * <pre>
+ * Advancer.Map c = advancer.getMapAdvancer(in);
+ * for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) {
+ * for (long j = 0; j < i; j++) {
+ * String key = c.keyAdvancer.readString(in);
+ * int element = c.elementAdvancer.readInt(in);
+ * // .. do something with this element
+ * }
+ * }
+ * </pre>
+ * See the implementation of {@link GenericDatumReader2} for more
+ * illustrations. */
+ public static class Map extends Container {
+ public final Advancer keyAdvancer = StringFast.instance;
+ public Map(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
+
public long firstChunk(Decoder in) throws IOException
{ return in.readMapStart(); }
+
public long nextChunk(Decoder in) throws IOException
{ return in.mapNext(); }
}
@@ -270,7 +289,8 @@ abstract class Advancer {
private static class NullFast extends Advancer {
public static final NullFast instance = new NullFast();
- private NullFast() { Schema s = Schema.create(Schema.Type.NULL); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.NULL);
+ private NullFast() { super(s,s); }
public Object nextNull(Decoder in) throws IOException {
in.readNull();
return null;
@@ -280,7 +300,8 @@ abstract class Advancer {
private static class BooleanFast extends Advancer {
public static final BooleanFast instance = new BooleanFast();
- private BooleanFast() { Schema s = Schema.create(Schema.Type.BOOLEAN); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.BOOLEAN);
+ private BooleanFast() { super(s,s); }
public boolean nextBoolean(Decoder in) throws IOException {
return in.readBoolean();
}
@@ -289,7 +310,8 @@ abstract class Advancer {
private static class IntFast extends Advancer {
public static final IntFast instance = new IntFast();
- private IntFast() { Schema s = Schema.create(Schema.Type.INTEGER); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.INT);
+ private IntFast() { super(s,s); }
public int nextInt(Decoder in) throws IOException {
return in.readInt();
}
@@ -298,7 +320,8 @@ abstract class Advancer {
private static class LongFast extends Advancer {
public static final LongFast instance = new LongFast();
- private LongFast() { Schema s = Schema.create(Schema.Type.LONG); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.LONG);
+ private LongFast() { super(s,s); }
public long nextLong(Decoder in) throws IOException {
return in.readLong();
}
@@ -307,7 +330,8 @@ abstract class Advancer {
private static class FloatFast extends Advancer {
public static final FloatFast instance = new FloatFast();
- private FloatFast() { Schema s = Schema.create(Schema.Type.FLOAT); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.FLOAT);
+ private FloatFast() { super(s,s); }
public float nextFloat(Decoder in) throws IOException {
return in.readFloat();
}
@@ -316,7 +340,8 @@ abstract class Advancer {
private static class DoubleFast extends Advancer {
public static final DoubleFast instance = new DoubleFast();
- private DoubleFast() { Schema s = Schema.create(Schema.Type.DOUBLE); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.DOUBLE);
+ private DoubleFast() { super(s,s); }
public double nextDouble(Decoder in) throws IOException {
return in.readDouble();
}
@@ -325,7 +350,8 @@ abstract class Advancer {
private static class StringFast extends Advancer {
public static final StringFast instance = new StringFast();
- private StringFast() { Schema s = Schema.create(Schema.Type.STRING); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.STRING);
+ private StringFast() { super(s,s); }
public String nextString(Decoder in) throws IOException { return in.readString(); }
public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
return in.readString(old);
@@ -335,7 +361,8 @@ abstract class Advancer {
private static class BytesFast extends Advancer {
public static final BytesFast instance = new BytesFast();
- private BytesFast() { Schema s = Schema.create(Schema.Type.BYTES); super(s,s); }
+ private static final Schema s = Schema.create(Schema.Type.BYTES);
+ private BytesFast() { super(s,s); }
public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
return in.readBytes(old);
}
@@ -477,7 +504,7 @@ abstract class Advancer {
* consume the tag ourself and call the corresponding advancer. */
private static class WriterUnion extends Advancer {
private Advancer[] branches;
- public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r) branches = b; }
+ public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r); branches = b; }
private final Advancer b(Decoder in) throws IOException
{ return branches[in.readIndex()]; }
@@ -504,8 +531,11 @@ abstract class Advancer {
public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
{ return b(in).getBranchAdvancer(in, branch); }
- public Container getContainerAdvancer(Decoder in) throws IOException
- { return b(in).getContainerAdvancer(in); }
+ public Container getArrayAdvancer(Decoder in) throws IOException
+ { return b(in).getArrayAdvancer(in); }
+
+ public Map getMapAdvancer(Decoder in) throws IOException
+ { return b(in).getMapAdvancer(in); }
public Record getRecordAdvancer(Decoder in) throws IOException
{ return b(in).getRecordAdvancer(in); }
@@ -693,8 +723,11 @@ abstract class Advancer {
public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
{ ignore(toSkip, in); return field.getBranchAdvancer(in, branch); }
- public Container getContainerAdvancer(Decoder in) throws IOException
- { ignore(toSkip, in); return field.getContainerAdvancer(in); }
+ public Container getArrayAdvancer(Decoder in) throws IOException
+ { ignore(toSkip, in); return field.getArrayAdvancer(in); }
+
+ public Map getMapAdvancer(Decoder in) throws IOException
+ { ignore(toSkip, in); return field.getMapAdvancer(in); }
public Record getRecordAdvancer(Decoder in) throws IOException
{ ignore(toSkip, in); return field.getRecordAdvancer(in); }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java
index 46506c2..cbfcff7 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java
@@ -19,47 +19,42 @@ package org.apache.avro.generic;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.avro.Resolver;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
-public class GenericDatumReader2<D extends IndexedRecord> implements DatumReader<D> {
- private final Schema reader, writer;
+public class GenericDatumReader2<D> implements DatumReader<D> {
private final Advancer.Record advancer;
private final GenericData data;
- private GenericDatumReader2(Schema writer, Schema reader, Advancer.Record a, GenericData d) {
- this.writer = writer;
- this.reader = reader;
+ private GenericDatumReader2(Advancer.Record a, GenericData d) {
advancer = a;
data = d;
}
- public static GenericDatumReader2 getReaderFor(Schema writer, Schema reader, GenericData data) {
+ /** ... Document how we use <code>d:</code> to create fixed, array,
+ * map, and record objects.
+ */
+ public static GenericDatumReader2 getReaderFor(Schema writer, Schema reader, GenericData d) {
// TODO: add caching
- Resolver.Action a = Resolver.resolve(writer, reader, data);
+ Resolver.Action a = Resolver.resolve(writer, reader, d);
Advancer.Record r = (Advancer.Record)Advancer.from(a);
- return new GenericDatumReader2(writer, reader, r, data);
+ return new GenericDatumReader2(r, d);
}
public D read(D reuse, Decoder in) throws IOException {
- List<Schema.Field> wf = writer.getFields();
- if (reuse == null) reuse = null; // FIXME
- for (int i = 0; i < advancer.advancers.length; i++) {
- int p = advancer.readerOrder[i].pos();
- reuse.put(p, read(null, wf.get(i).schema(), advancer.advancers[i], in));
- }
- advancer.done(in);
- return reuse;
+ return null;
}
- public Object read(Object reuse, Schema expected, Advancer a, Decoder in)
+ public Object read(Object reuse, Advancer a, Decoder in)
throws IOException
{
- switch (expected.getType()) {
+ switch (a.reader.getType()) {
case NULL: return a.nextNull(in);
case BOOLEAN: return (Boolean) a.nextBoolean(in);
case INT: return (Integer) a.nextInt(in);
@@ -68,22 +63,65 @@ public class GenericDatumReader2<D extends IndexedRecord> implements DatumReader
case DOUBLE: return (Double) a.nextDouble(in);
case STRING: return (String) a.nextString(in);
case BYTES: return a.nextBytes(in, (ByteBuffer)reuse);
- case FIXED:
+ case FIXED: {
+ GenericFixed fixed = (GenericFixed) data.createFixed(reuse, a.reader);
+ a.nextFixed(in, fixed.bytes());
+ return fixed;
+ }
+
case ARRAY: {
- List result = null; // FIXME -- use GenericData methods here...
- Advancer.Container c = advancer.getContainerAdvancer(in);
- Advancer ec = c.getElementAdvancer(in);
- Schema es = expected.getElementType();
- for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) {
+ Advancer.Container c = advancer.getArrayAdvancer(in);
+ Advancer ec = c.elementAdvancer;
+ long i = c.firstChunk(in);
+ if (reuse instanceof GenericArray) {
+ ((GenericArray) reuse).reset();
+ } else if (reuse instanceof Collection) {
+ ((Collection) reuse).clear();
+ } else reuse = new GenericData.Array((int)i, a.reader);
+
+ Collection array = (Collection)reuse;
+ for( ; i != 0; i = c.nextChunk(in))
for (long j = 0; j < i; j++) {
- result.add(read(null, es, ec, in));
+ Object v = read(null, ec, in);
+ // TODO -- logical type conversion
+ array.add(v);
}
- }
+ if (array instanceof GenericArray<?>)
+ ((GenericArray<?>) array).prune();
}
- case MAP:
- case RECORD:
+ case MAP: {
+ Advancer.Map c = advancer.getMapAdvancer(in);
+ Advancer kc = c.keyAdvancer;
+ Advancer ec = c.elementAdvancer;
+ long i = c.firstChunk(in);
+ if (reuse instanceof Map) {
+ ((Map) reuse).clear();
+ } else reuse = new HashMap<Object,Object>((int)i);
+ Map map = (Map)reuse;
+ for ( ; i != 0; i = c.nextChunk(in))
+ for (int j = 0; j < i; j++) {
+ Object key = kc.nextString(in);
+ Object val = read(null, ec, in);
+ map.put(key, val);
+ }
+ return map;
+ }
+
+ case RECORD: {
+ Advancer.Record ra = advancer.getRecordAdvancer(in);
+ Object r = data.newRecord(reuse, ra.reader);
+ for (int i = 0; i < ra.advancers.length; i++) {
+ int p = ra.readerOrder[i].pos();
+ ((IndexedRecord)reuse).put(p, read(null, ra.advancers[i], in));
+ }
+ ra.done(in);
+ return r;
+ }
+
case UNION:
+ return read(reuse, advancer.getBranchAdvancer(in, advancer.nextIndex(in)), in);
+
default:
throw new IllegalArgumentException("Can't handle this yet.");
}