You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2019/05/01 19:11:20 UTC
[avro] 10/14: Will be handy to have schemas available during
decoding.
This is an automated email from the ASF dual-hosted git repository.
thiru pushed a commit to branch fast-decoder-thiru
in repository https://gitbox.apache.org/repos/asf/avro.git
commit ab9601dbfeff198d41bc9fb5fe419d22ec8d14e7
Author: rstata <rs...@yahoo.com>
AuthorDate: Tue Apr 30 13:13:55 2019 -0700
Will be handy to have schemas available during decoding.
---
.../java/org/apache/avro/generic/Advancer.java | 111 +++++++++++++--------
1 file changed, 69 insertions(+), 42 deletions(-)
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
index 8f2b010..d80e812 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
@@ -75,6 +75,9 @@ abstract class Advancer {
//// an integer is read with no promotion) overrides just
//// readInt.
+ public final Schema writer, reader;
+ protected Advancer(Schema w, Schema r) { this.writer = w; this.reader = r; }
+
public Object next(Decoder in) throws IOException { exception(); return null; }
public Object nextNull(Decoder in) throws IOException { exception(); return null; }
public boolean nextBoolean(Decoder in) throws IOException { exception(); return false; }
@@ -140,7 +143,7 @@ abstract class Advancer {
case DOUBLE: return DoubleFast.instance;
case STRING: return StringFast.instance;
case BYTES: return BytesFast.instance;
- case FIXED: return new FixedFast(a.writer.getFixedSize());
+ case FIXED: return new FixedFast(a.writer, a.reader);
default:
throw new IllegalArgumentException("Unexpected schema for DoNothing:" + a.reader);
}
@@ -159,13 +162,14 @@ abstract class Advancer {
}
case ENUM:
Resolver.EnumAdjust e = (Resolver.EnumAdjust)a;
- if (e.noAdjustmentsNeeded) return EnumFast.instance;
- else return new EnumWithAdjustments(e.adjustments);
+ if (e.noAdjustmentsNeeded) return new EnumFast(a.writer, a.reader);
+ else return new EnumWithAdjustments(a.writer, a.reader, e.adjustments);
case CONTAINER:
Advancer ea = Advancer.from(((Resolver.Container)a).elementAction);
- if (a.writer.getType() == Schema.Type.ARRAY) return new ArrayContainer(ea);
- else return new MapContainer(ea);
+ if (a.writer.getType() == Schema.Type.ARRAY)
+ return new ArrayContainer(a.writer, a.reader, ea);
+ else return new MapContainer(a.writer, a.reader, ea);
case RECORD:
return Advancer.Record.from((Resolver.RecordAdjust)a);
@@ -175,15 +179,16 @@ abstract class Advancer {
Advancer[] branches = new Advancer[wu.actions.length];
for (int i = 0; i < branches.length; i++)
branches[i] = Advancer.from(wu.actions[i]);
- if (wu.unionEquiv) return new EquivUnion(branches);
- return new WriterUnion(branches);
+ if (wu.unionEquiv) return new EquivUnion(a.writer, a.reader, branches);
+ return new WriterUnion(a.writer, a.reader, branches);
case READER_UNION:
Resolver.ReaderUnion ru = (Resolver.ReaderUnion)a;
- return new ReaderUnion(ru.firstMatch, Advancer.from(ru.actualAction));
+ return new ReaderUnion(a.writer, a.reader,
+ ru.firstMatch, Advancer.from(ru.actualAction));
case ERROR:
- throw new AvroTypeException(a.toString());
+ return new Error(w,r, a.toString());
case SKIP:
throw new RuntimeException("Internal error. Skip should've been consumed.");
default:
@@ -213,7 +218,7 @@ abstract class Advancer {
* data causes the error to manifest). */
private static class Error extends Advancer {
String msg;
- public Error(String msg) { this.msg = msg; }
+ public Error(Schema w, Schema r, String msg) { super(w,r); this.msg = msg; }
protected Exception exception() {
throw new AvroTypeException(msg);
}
@@ -236,7 +241,7 @@ abstract class Advancer {
* illustrations. */
public abstract static class Container extends Advancer {
private final Advancer elementAdvancer;
- public Container(Advancer elementAdvancer) { this.elementAdvancer = elementAdvancer; }
+ public Container(Schema w, Schema r, Advancer ea) { super(wr); elementAdvancer = ea; }
public Container getContainerAdvancer(Decoder in) { return this; }
public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; }
public abstract long firstChunk(Decoder in) throws IOException;
@@ -244,7 +249,7 @@ abstract class Advancer {
}
private static class ArrayContainer extends Container {
- public ArrayContainer(Advancer ea) { super(ea); }
+ public ArrayContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
public long firstChunk(Decoder in) throws IOException
{ return in.readArrayStart(); }
public long nextChunk(Decoder in) throws IOException
@@ -252,7 +257,7 @@ abstract class Advancer {
}
private static class MapContainer extends Container {
- public MapContainer(Advancer ea) { super(ea); }
+ public MapContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
public long firstChunk(Decoder in) throws IOException
{ return in.readMapStart(); }
public long nextChunk(Decoder in) throws IOException
@@ -265,7 +270,7 @@ abstract class Advancer {
private static class NullFast extends Advancer {
public static final NullFast instance = new NullFast();
- private NullFast() { }
+ private NullFast() { Schema s = Schema.create(Schema.Type.NULL); super(s,s); }
public Object nextNull(Decoder in) throws IOException {
in.readNull();
return null;
@@ -275,7 +280,7 @@ abstract class Advancer {
private static class BooleanFast extends Advancer {
public static final BooleanFast instance = new BooleanFast();
- private BooleanFast() { }
+ private BooleanFast() { Schema s = Schema.create(Schema.Type.BOOLEAN); super(s,s); }
public boolean nextBoolean(Decoder in) throws IOException {
return in.readBoolean();
}
@@ -284,7 +289,7 @@ abstract class Advancer {
private static class IntFast extends Advancer {
public static final IntFast instance = new IntFast();
- private IntFast() { }
+ private IntFast() { Schema s = Schema.create(Schema.Type.INTEGER); super(s,s); }
public int nextInt(Decoder in) throws IOException {
return in.readInt();
}
@@ -293,7 +298,7 @@ abstract class Advancer {
private static class LongFast extends Advancer {
public static final LongFast instance = new LongFast();
- private LongFast() { }
+ private LongFast() { Schema s = Schema.create(Schema.Type.LONG); super(s,s); }
public long nextLong(Decoder in) throws IOException {
return in.readLong();
}
@@ -302,7 +307,7 @@ abstract class Advancer {
private static class FloatFast extends Advancer {
public static final FloatFast instance = new FloatFast();
- private FloatFast() { }
+ private FloatFast() { Schema s = Schema.create(Schema.Type.FLOAT); super(s,s); }
public float nextFloat(Decoder in) throws IOException {
return in.readFloat();
}
@@ -311,7 +316,7 @@ abstract class Advancer {
private static class DoubleFast extends Advancer {
public static final DoubleFast instance = new DoubleFast();
- private DoubleFast() { }
+ private DoubleFast() { Schema s = Schema.create(Schema.Type.DOUBLE); super(s,s); }
public double nextDouble(Decoder in) throws IOException {
return in.readDouble();
}
@@ -320,7 +325,7 @@ abstract class Advancer {
private static class StringFast extends Advancer {
public static final StringFast instance = new StringFast();
- private StringFast() { }
+ private StringFast() { Schema s = Schema.create(Schema.Type.STRING); super(s,s); }
public String nextString(Decoder in) throws IOException { return in.readString(); }
public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
return in.readString(old);
@@ -330,7 +335,7 @@ abstract class Advancer {
private static class BytesFast extends Advancer {
public static final BytesFast instance = new BytesFast();
- private BytesFast() { }
+ private BytesFast() { Schema s = Schema.create(Schema.Type.BYTES); super(s,s); }
public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
return in.readBytes(old);
}
@@ -339,7 +344,7 @@ abstract class Advancer {
private static class FixedFast extends Advancer {
private final int len;
- private FixedFast(int len) { this.len = len; }
+ private FixedFast(Schema w, Schema r) { super(w,r); this.len = w.getFixedSize(); }
public byte[] nextFixed(Decoder in, byte[] bytes, int start, int len) throws IOException {
in.readFixed(bytes, start, len);
return bytes;
@@ -352,8 +357,7 @@ abstract class Advancer {
}
private static class EnumFast extends Advancer {
- public static final EnumFast instance = new EnumFast();
- private EnumFast() { }
+ public EnumFast(Schema w, Schema r) { super(w,r); }
public int nextEnum(Decoder in) throws IOException { return in.readEnum(); }
public Object next(Decoder in) throws IOException { return nextEnum(in); }
}
@@ -363,7 +367,9 @@ abstract class Advancer {
private static class LongFromInt extends Advancer {
public static final LongFromInt instance = new LongFromInt();
- private LongFromInt() { }
+ private LongFromInt() {
+ super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.LONG));
+ }
public long nextLong(Decoder in) throws IOException {
return (long) in.readInt();
}
@@ -372,7 +378,9 @@ abstract class Advancer {
private static class FloatFromInt extends Advancer {
public static final FloatFromInt instance = new FloatFromInt();
- private FloatFromInt() { }
+ private FloatFromInt() {
+ super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.FLOAT));
+ }
public float nextFloat(Decoder in) throws IOException {
return (float) in.readInt();
}
@@ -381,7 +389,9 @@ abstract class Advancer {
private static class FloatFromLong extends Advancer {
public static final FloatFromLong instance = new FloatFromLong();
- private FloatFromLong() { }
+ private FloatFromLong() {
+ super(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.FLOAT));
+ }
public float nextFloat(Decoder in) throws IOException {
return (long) in.readLong();
}
@@ -390,7 +400,9 @@ abstract class Advancer {
private static class DoubleFromInt extends Advancer {
public static final DoubleFromInt instance = new DoubleFromInt();
- private DoubleFromInt() { }
+ private DoubleFromInt() {
+ super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.DOUBLE));
+ }
public double nextDouble(Decoder in) throws IOException {
return (double) in.readInt();
}
@@ -399,7 +411,9 @@ abstract class Advancer {
private static class DoubleFromLong extends Advancer {
public static final DoubleFromLong instance = new DoubleFromLong();
- private DoubleFromLong() { }
+ private DoubleFromLong() {
+ super(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.DOUBLE));
+ }
public double nextDouble(Decoder in) throws IOException {
return (double) in.readLong();
}
@@ -408,7 +422,9 @@ abstract class Advancer {
private static class DoubleFromFloat extends Advancer {
public static final DoubleFromFloat instance = new DoubleFromFloat();
- private DoubleFromFloat() { }
+ private DoubleFromFloat() {
+ super(Schema.create(Schema.Type.FLOAT), Schema.create(Schema.Type.DOUBLE));
+ }
public double nextDouble(Decoder in) throws IOException {
return (double) in.readFloat();
}
@@ -417,7 +433,9 @@ abstract class Advancer {
private static class BytesFromString extends Advancer {
public static final BytesFromString instance = new BytesFromString();
- private BytesFromString() { }
+ private BytesFromString() {
+ super(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.BYTES));
+ }
public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
Utf8 s = in.readString(null);
return ByteBuffer.wrap(s.getBytes(), 0, s.getByteLength());
@@ -427,7 +445,9 @@ abstract class Advancer {
private static class StringFromBytes extends Advancer {
public static final StringFromBytes instance = new StringFromBytes();
- private StringFromBytes() { }
+ private StringFromBytes() {
+ super(Schema.create(Schema.Type.BYTES), Schema.create(Schema.Type.STRING));
+ }
public String nextString(Decoder in) throws IOException {
return new String(in.readBytes(null).array(), StandardCharsets.UTF_8);
}
@@ -443,7 +463,8 @@ abstract class Advancer {
private static class EnumWithAdjustments extends Advancer {
private final int[] adjustments;
- public EnumWithAdjustments(int[] adjustments) {
+ public EnumWithAdjustments(Schema w, Schema r, int[] adjustments) {
+ super(w,r);
this.adjustments = adjustments;
}
public int nextEnum(Decoder in) throws IOException {
@@ -456,7 +477,7 @@ abstract class Advancer {
* consume the tag ourself and call the corresponding advancer. */
private static class WriterUnion extends Advancer {
private Advancer[] branches;
- public WriterUnion(Advancer[] branches) { this.branches = branches; }
+ public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r) branches = b; }
private final Advancer b(Decoder in) throws IOException
{ return branches[in.readIndex()]; }
@@ -494,7 +515,7 @@ abstract class Advancer {
* consume it as a regular union. */
private static class EquivUnion extends Advancer {
private final Advancer[] branches;
- public EquivUnion(Advancer[] branches) { this.branches = branches; }
+ public EquivUnion(Schema w, Schema r, Advancer[] b) {super(w,r); branches = b; }
public int nextIndex(Decoder in) throws IOException { return in.readIndex(); }
public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException {
@@ -505,8 +526,11 @@ abstract class Advancer {
private static class ReaderUnion extends Advancer {
private int branch;
private Advancer advancer;
- public ReaderUnion(int b, Advancer a) { branch = b; advancer = a; }
+ public ReaderUnion(Schema w, Schema r, int b, Advancer a)
+ { super(w,r); branch = b; advancer = a; }
+
public int nextIndex(Decoder in) { return branch; }
+
public Advancer getBranchAdvancer(Decoder in, int b) {
if (b != this.branch)
throw new IllegalArgumentException("Branch much be " + branch + ", got " + b);
@@ -544,9 +568,10 @@ abstract class Advancer {
public final Schema.Field[] readerOrder;
public final boolean inOrder;
- private Record(Advancer[] advancers, Schema[] finalSkips,
+ private Record(Schema w, Schema r, Advancer[] advancers, Schema[] finalSkips,
Schema.Field[] readerOrder, boolean inOrder)
{
+ super(w,r);
this.advancers = advancers;
this.finalSkips = finalSkips;
this.readerOrder = readerOrder;
@@ -580,7 +605,8 @@ abstract class Advancer {
Schema[] toSkip = collectSkips(ra.fieldActions, i);
i += toSkip.length;
Advancer fieldAdv = Advancer.from(ra.fieldActions[i++]);
- if (toSkip.length != 0) fieldAdv = new RecordField(toSkip, fieldAdv);
+ if (toSkip.length != 0)
+ fieldAdv = new RecordField(fieldAdv.writer, fieldAdv.reader, toSkip, fieldAdv);
fieldAdvs[nrf] = fieldAdv;
}
@@ -590,7 +616,7 @@ abstract class Advancer {
// Deal with defaults
for (int df = 0; rf < readOrder.length; rf++, df++, nrf++)
- fieldAdvs[nrf] = new Default(ra.defaults[df]);
+ fieldAdvs[nrf] = new Default(ra.readerOrder[df].schema(), ra.defaults[df]);
// If reader and writer orders agree, sort fieldAdvs by reader
// order (i.e., move defaults into the correct place), to allow
@@ -615,14 +641,15 @@ abstract class Advancer {
readOrder = newReadOrder;
}
- return new Record(fieldAdvs, finalSkips, readOrder, inOrder);
+ return new Record(ra.writer, ra.reader, fieldAdvs, finalSkips, readOrder, inOrder);
}
}
private static class RecordField extends Advancer {
private final Schema[] toSkip;
private final Advancer field;
- public RecordField(Schema[] toSkip, Advancer field) {
+ public RecordField(Schema w, Schema r, Schema[] toSkip, Advancer field) {
+ super(w,r);
this.toSkip = toSkip;
this.field = field;
}
@@ -675,7 +702,7 @@ abstract class Advancer {
private static class Default extends Advancer {
protected final Object val;
- private Default(Object val) { this.val = val; }
+ private Default(Schema s, Object v) { super(s,s); val = v; }
public Object next(Decoder in) { return val; }
public Object nextNull(Decoder in) { return val; }