You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2019/05/01 19:11:20 UTC

[avro] 10/14: Will be handy to have schemas available during decoding.

This is an automated email from the ASF dual-hosted git repository.

thiru pushed a commit to branch fast-decoder-thiru
in repository https://gitbox.apache.org/repos/asf/avro.git

commit ab9601dbfeff198d41bc9fb5fe419d22ec8d14e7
Author: rstata <rs...@yahoo.com>
AuthorDate: Tue Apr 30 13:13:55 2019 -0700

    Will be handy to have schemas available during decoding.
---
 .../java/org/apache/avro/generic/Advancer.java     | 111 +++++++++++++--------
 1 file changed, 69 insertions(+), 42 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
index 8f2b010..d80e812 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
@@ -75,6 +75,9 @@ abstract class Advancer {
   //// an integer is read with no promotion) overrides just
   //// readInt.
 
+  public final Schema writer, reader;
+  protected Advancer(Schema w, Schema r) { this.writer = w; this.reader = r; }
+
   public Object next(Decoder in) throws IOException { exception(); return null; }
   public Object nextNull(Decoder in) throws IOException { exception(); return null; }
   public boolean nextBoolean(Decoder in) throws IOException { exception(); return false; }
@@ -140,7 +143,7 @@ abstract class Advancer {
       case DOUBLE: return DoubleFast.instance;
       case STRING: return StringFast.instance;
       case BYTES: return BytesFast.instance;
-      case FIXED: return new FixedFast(a.writer.getFixedSize());
+      case FIXED: return new FixedFast(a.writer, a.reader);
       default:
         throw new IllegalArgumentException("Unexpected schema for DoNothing:" + a.reader);
       }
@@ -159,13 +162,14 @@ abstract class Advancer {
       }
     case ENUM:
       Resolver.EnumAdjust e = (Resolver.EnumAdjust)a;
-      if (e.noAdjustmentsNeeded) return EnumFast.instance;
-      else return new EnumWithAdjustments(e.adjustments);
+      if (e.noAdjustmentsNeeded) return new EnumFast(a.writer, a.reader);
+      else return new EnumWithAdjustments(a.writer, a.reader, e.adjustments);
 
     case CONTAINER:
         Advancer ea = Advancer.from(((Resolver.Container)a).elementAction);
-        if (a.writer.getType() == Schema.Type.ARRAY) return new ArrayContainer(ea);
-        else return new MapContainer(ea);
+        if (a.writer.getType() == Schema.Type.ARRAY)
+          return new ArrayContainer(a.writer, a.reader, ea);
+        else return new MapContainer(a.writer, a.reader, ea);
 
     case RECORD:
       return Advancer.Record.from((Resolver.RecordAdjust)a);
@@ -175,15 +179,16 @@ abstract class Advancer {
       Advancer[] branches = new Advancer[wu.actions.length];
       for (int i = 0; i < branches.length; i++)
         branches[i] = Advancer.from(wu.actions[i]);
-      if (wu.unionEquiv) return new EquivUnion(branches);
-      return new WriterUnion(branches);
+      if (wu.unionEquiv) return new EquivUnion(a.writer, a.reader, branches);
+      return new WriterUnion(a.writer, a.reader, branches);
 
     case READER_UNION:
       Resolver.ReaderUnion ru = (Resolver.ReaderUnion)a;
-      return new ReaderUnion(ru.firstMatch, Advancer.from(ru.actualAction));
+      return new ReaderUnion(a.writer, a.reader,
+                             ru.firstMatch, Advancer.from(ru.actualAction));
 
     case ERROR:
-      throw new AvroTypeException(a.toString());
+      return new Error(w,r, a.toString());
     case SKIP:
       throw new RuntimeException("Internal error.  Skip should've been consumed.");
     default:
@@ -213,7 +218,7 @@ abstract class Advancer {
    *  data causes the error to manifest). */
   private static class Error extends Advancer {
     String msg;
-    public Error(String msg) { this.msg = msg; }
+    public Error(Schema w, Schema r, String msg) { super(w,r); this.msg = msg; }
     protected Exception exception() {
       throw new AvroTypeException(msg);
     }
@@ -236,7 +241,7 @@ abstract class Advancer {
     * illustrations. */
   public abstract static class Container extends Advancer {
     private final Advancer elementAdvancer;
-    public Container(Advancer elementAdvancer) { this.elementAdvancer = elementAdvancer; }
+    public Container(Schema w, Schema r, Advancer ea) { super(wr); elementAdvancer = ea; }
     public Container getContainerAdvancer(Decoder in) { return this; }
     public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; }
     public abstract long firstChunk(Decoder in) throws IOException;
@@ -244,7 +249,7 @@ abstract class Advancer {
   }
 
   private static class ArrayContainer extends Container {
-    public ArrayContainer(Advancer ea) { super(ea); }
+    public ArrayContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
     public long firstChunk(Decoder in) throws IOException
       { return in.readArrayStart(); }
     public long nextChunk(Decoder in) throws IOException
@@ -252,7 +257,7 @@ abstract class Advancer {
   }
 
   private static class MapContainer extends Container {
-    public MapContainer(Advancer ea) { super(ea); }
+    public MapContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
     public long firstChunk(Decoder in) throws IOException
       { return in.readMapStart(); }
     public long nextChunk(Decoder in) throws IOException
@@ -265,7 +270,7 @@ abstract class Advancer {
 
   private static class NullFast extends Advancer {
     public static final NullFast instance = new NullFast();
-    private NullFast() { }
+    private NullFast() { Schema s = Schema.create(Schema.Type.NULL); super(s,s); }
     public Object nextNull(Decoder in) throws IOException {
       in.readNull(); 
       return null;
@@ -275,7 +280,7 @@ abstract class Advancer {
 
   private static class BooleanFast extends Advancer {
     public static final BooleanFast instance = new BooleanFast();
-    private BooleanFast() { }
+    private BooleanFast() { Schema s = Schema.create(Schema.Type.BOOLEAN); super(s,s); }
     public boolean nextBoolean(Decoder in) throws IOException {
       return in.readBoolean(); 
     }
@@ -284,7 +289,7 @@ abstract class Advancer {
 
   private static class IntFast extends Advancer {
     public static final IntFast instance = new IntFast();
-    private IntFast() { }
+    private IntFast() { Schema s = Schema.create(Schema.Type.INTEGER); super(s,s); }
     public int nextInt(Decoder in) throws IOException {
       return in.readInt(); 
     }
@@ -293,7 +298,7 @@ abstract class Advancer {
 
   private static class LongFast extends Advancer {
     public static final LongFast instance = new LongFast();
-    private LongFast() { }
+    private LongFast() { Schema s = Schema.create(Schema.Type.LONG); super(s,s); }
     public long nextLong(Decoder in) throws IOException {
       return in.readLong(); 
     }
@@ -302,7 +307,7 @@ abstract class Advancer {
 
   private static class FloatFast extends Advancer {
     public static final FloatFast instance = new FloatFast();
-    private FloatFast() { }
+    private FloatFast() { Schema s = Schema.create(Schema.Type.FLOAT); super(s,s); }
     public float nextFloat(Decoder in) throws IOException {
       return in.readFloat(); 
     }
@@ -311,7 +316,7 @@ abstract class Advancer {
 
   private static class DoubleFast extends Advancer {
     public static final DoubleFast instance = new DoubleFast();
-    private DoubleFast() { }
+    private DoubleFast() { Schema s = Schema.create(Schema.Type.DOUBLE); super(s,s); }
     public double nextDouble(Decoder in) throws IOException {
       return in.readDouble(); 
     }
@@ -320,7 +325,7 @@ abstract class Advancer {
 
   private static class StringFast extends Advancer {
     public static final StringFast instance = new StringFast();
-    private StringFast() { }
+    private StringFast() { Schema s = Schema.create(Schema.Type.STRING); super(s,s); }
     public String nextString(Decoder in) throws IOException { return in.readString(); }
     public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
       return in.readString(old);
@@ -330,7 +335,7 @@ abstract class Advancer {
 
   private static class BytesFast extends Advancer {
     public static final BytesFast instance = new BytesFast();
-    private BytesFast() { }
+    private BytesFast() { Schema s = Schema.create(Schema.Type.BYTES); super(s,s); }
     public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
       return in.readBytes(old);
     }
@@ -339,7 +344,7 @@ abstract class Advancer {
 
   private static class FixedFast extends Advancer {
     private final int len;
-    private FixedFast(int len) { this.len = len; }
+    private FixedFast(Schema w, Schema r) { super(w,r); this.len = w.getFixedSize(); }
     public byte[] nextFixed(Decoder in, byte[] bytes, int start, int len) throws IOException {
       in.readFixed(bytes, start, len);
       return bytes;
@@ -352,8 +357,7 @@ abstract class Advancer {
   }
 
   private static class EnumFast extends Advancer {
-    public static final EnumFast instance = new EnumFast();
-    private EnumFast() { }
+    public EnumFast(Schema w, Schema r) { super(w,r); }
     public int nextEnum(Decoder in) throws IOException { return in.readEnum(); }
     public Object next(Decoder in) throws IOException { return nextEnum(in); }
   }
@@ -363,7 +367,9 @@ abstract class Advancer {
 
   private static class LongFromInt extends Advancer {
     public static final LongFromInt instance = new LongFromInt();
-    private LongFromInt() { }
+    private LongFromInt() {
+      super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.LONG));
+    }
     public long nextLong(Decoder in) throws IOException {
       return (long) in.readInt(); 
     }
@@ -372,7 +378,9 @@ abstract class Advancer {
 
   private static class FloatFromInt extends Advancer {
     public static final FloatFromInt instance = new FloatFromInt();
-    private FloatFromInt() { }
+    private FloatFromInt() {
+      super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.FLOAT));
+    }
     public float nextFloat(Decoder in) throws IOException {
       return (float) in.readInt(); 
     }
@@ -381,7 +389,9 @@ abstract class Advancer {
 
   private static class FloatFromLong extends Advancer {
     public static final FloatFromLong instance = new FloatFromLong();
-    private FloatFromLong() { }
+    private FloatFromLong() {
+      super(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.FLOAT));
+    }
     public float nextFloat(Decoder in) throws IOException {
       return (long) in.readLong(); 
     }
@@ -390,7 +400,9 @@ abstract class Advancer {
 
   private static class DoubleFromInt extends Advancer {
     public static final DoubleFromInt instance = new DoubleFromInt();
-    private DoubleFromInt() { }
+    private DoubleFromInt() {
+      super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.DOUBLE));
+    }
     public double nextDouble(Decoder in) throws IOException {
       return (double) in.readInt(); 
     }
@@ -399,7 +411,9 @@ abstract class Advancer {
 
   private static class DoubleFromLong extends Advancer {
     public static final DoubleFromLong instance = new DoubleFromLong();
-    private DoubleFromLong() { }
+    private DoubleFromLong() {
+      super(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.DOUBLE));
+    }
     public double nextDouble(Decoder in) throws IOException {
       return (double) in.readLong(); 
     }
@@ -408,7 +422,9 @@ abstract class Advancer {
 
   private static class DoubleFromFloat extends Advancer {
     public static final DoubleFromFloat instance = new DoubleFromFloat();
-    private DoubleFromFloat() { }
+    private DoubleFromFloat() {
+      super(Schema.create(Schema.Type.FLOAT), Schema.create(Schema.Type.DOUBLE));
+    }
     public double nextDouble(Decoder in) throws IOException {
       return (double) in.readFloat(); 
     }
@@ -417,7 +433,9 @@ abstract class Advancer {
 
   private static class BytesFromString extends Advancer {
     public static final BytesFromString instance = new BytesFromString();
-    private BytesFromString() { }
+    private BytesFromString() {
+      super(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.BYTES));
+    }
     public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
       Utf8 s = in.readString(null);
       return ByteBuffer.wrap(s.getBytes(), 0, s.getByteLength());
@@ -427,7 +445,9 @@ abstract class Advancer {
 
   private static class StringFromBytes extends Advancer {
     public static final StringFromBytes instance = new StringFromBytes();
-    private StringFromBytes() { }
+    private StringFromBytes() {
+      super(Schema.create(Schema.Type.BYTES), Schema.create(Schema.Type.STRING));
+    }
     public String nextString(Decoder in) throws IOException {
       return new String(in.readBytes(null).array(), StandardCharsets.UTF_8);
     }
@@ -443,7 +463,8 @@ abstract class Advancer {
 
   private static class EnumWithAdjustments extends Advancer {
     private final int[] adjustments;
-    public EnumWithAdjustments(int[] adjustments) {
+    public EnumWithAdjustments(Schema w, Schema r, int[] adjustments) {
+      super(w,r);
       this.adjustments = adjustments;
     }
     public int nextEnum(Decoder in) throws IOException {
@@ -456,7 +477,7 @@ abstract class Advancer {
     * consume the tag ourself and call the corresponding advancer. */
   private static class WriterUnion extends Advancer {
     private Advancer[] branches;
-    public WriterUnion(Advancer[] branches) { this.branches = branches; }
+    public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r) branches = b; }
 
     private final Advancer b(Decoder in) throws IOException
       { return branches[in.readIndex()]; }
@@ -494,7 +515,7 @@ abstract class Advancer {
     * consume it as a regular union. */
   private static class EquivUnion extends Advancer {
     private final Advancer[] branches;
-    public EquivUnion(Advancer[] branches) { this.branches = branches; }
+    public EquivUnion(Schema w, Schema r, Advancer[] b) {super(w,r); branches = b; }
 
     public int nextIndex(Decoder in) throws IOException { return in.readIndex(); }
     public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException {
@@ -505,8 +526,11 @@ abstract class Advancer {
   private static class ReaderUnion extends Advancer {
     private int branch;
     private Advancer advancer;
-    public ReaderUnion(int b, Advancer a) { branch = b; advancer = a; }
+    public ReaderUnion(Schema w, Schema r, int b, Advancer a)
+      { super(w,r); branch = b; advancer = a; }
+
     public int nextIndex(Decoder in) { return branch; }
+
     public Advancer getBranchAdvancer(Decoder in, int b) {
       if (b != this.branch)
           throw new IllegalArgumentException("Branch much be " + branch + ", got " + b);
@@ -544,9 +568,10 @@ abstract class Advancer {
     public final Schema.Field[] readerOrder;
     public final boolean inOrder;
 
-    private Record(Advancer[] advancers, Schema[] finalSkips,
+    private Record(Schema w, Schema r, Advancer[] advancers, Schema[] finalSkips,
                    Schema.Field[] readerOrder, boolean inOrder)
     {
+      super(w,r);
       this.advancers = advancers;
       this.finalSkips = finalSkips;
       this.readerOrder = readerOrder;
@@ -580,7 +605,8 @@ abstract class Advancer {
         Schema[] toSkip = collectSkips(ra.fieldActions, i);
         i += toSkip.length;
         Advancer fieldAdv = Advancer.from(ra.fieldActions[i++]);
-        if (toSkip.length != 0) fieldAdv = new RecordField(toSkip, fieldAdv);
+        if (toSkip.length != 0)
+          fieldAdv = new RecordField(fieldAdv.writer, fieldAdv.reader, toSkip, fieldAdv);
         fieldAdvs[nrf] = fieldAdv;
       }
 
@@ -590,7 +616,7 @@ abstract class Advancer {
 
       // Deal with defaults
       for (int df = 0; rf < readOrder.length; rf++, df++, nrf++)
-        fieldAdvs[nrf] = new Default(ra.defaults[df]);
+        fieldAdvs[nrf] = new Default(ra.readerOrder[df].schema(), ra.defaults[df]);
 
       // If reader and writer orders agree, sort fieldAdvs by reader
       // order (i.e., move defaults into the correct place), to allow
@@ -615,14 +641,15 @@ abstract class Advancer {
         readOrder = newReadOrder;
       }
 
-      return new Record(fieldAdvs, finalSkips, readOrder, inOrder);
+      return new Record(ra.writer, ra.reader, fieldAdvs, finalSkips, readOrder, inOrder);
     }
   }
 
   private static class RecordField extends Advancer {
     private final Schema[] toSkip;
     private final Advancer field;
-    public RecordField(Schema[] toSkip, Advancer field) {
+    public RecordField(Schema w, Schema r, Schema[] toSkip, Advancer field) {
+      super(w,r);
       this.toSkip = toSkip;
       this.field = field;
     }
@@ -675,7 +702,7 @@ abstract class Advancer {
 
   private static class Default extends Advancer {
     protected final Object val;
-    private Default(Object val) { this.val = val; }
+      private Default(Schema s, Object v) { super(s,s); val = v; }
 
     public Object next(Decoder in) { return val; }
     public Object nextNull(Decoder in) { return val; }