You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2019/05/01 19:11:21 UTC

[avro] 11/14: Fleshed out rest of cases in GenericReader2 (but do not handle logical types).

This is an automated email from the ASF dual-hosted git repository.

thiru pushed a commit to branch fast-decoder-thiru
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 5e50f64995559c647fc98e3f87e1aebccec3f7c7
Author: rstata <rs...@yahoo.com>
AuthorDate: Tue Apr 30 16:34:39 2019 -0700

    Fleshed out rest of cases in GenericReader2 (but do not handle logical types).
---
 .../java/org/apache/avro/generic/Advancer.java     | 107 ++++++++++++++-------
 .../apache/avro/generic/GenericDatumReader2.java   |  96 ++++++++++++------
 2 files changed, 137 insertions(+), 66 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
index d80e812..7c469cb 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java
@@ -57,9 +57,9 @@ import org.apache.avro.util.Utf8;
   * and return the actual value.)
   *
   * Traversing records, arrays, and maps is more involved.  In the
-  * case of an array or map, call {@link getContainerAdvancer} and
-  * proceed as described in the documentation for {@link
-  * Advancer.Container}.  For records, best to just look at the
+  * case of an array or map, call {@link getArrayAdvancer} {@link
+  * getMapAdvancer} and proceed as described in the documentation for
+  * {@link Advancer.Container}.  For records, best to just look at the
   * implementation of {@link GenericDatumReader2}.
   **/
 abstract class Advancer {
@@ -112,11 +112,18 @@ abstract class Advancer {
     return null;
   }
 
-  /** Access to advancer for array or map type. */
-  public Container getContainerAdvancer(Decoder in) throws IOException {
+  /** Access to advancer for array type. */
+  public Container getArrayAdvancer(Decoder in) throws IOException {
     exception();
     return null;
   }
+
+  /** Access to advancer for array type. */
+  public Map getMapAdvancer(Decoder in) throws IOException {
+    exception();
+    return null;
+  }
+
   /** Access to advancer for record type. */
   public Record getRecordAdvancer(Decoder in) throws IOException {
     exception();
@@ -168,8 +175,8 @@ abstract class Advancer {
     case CONTAINER:
         Advancer ea = Advancer.from(((Resolver.Container)a).elementAction);
         if (a.writer.getType() == Schema.Type.ARRAY)
-          return new ArrayContainer(a.writer, a.reader, ea);
-        else return new MapContainer(a.writer, a.reader, ea);
+          return new Container(a.writer, a.reader, ea);
+        else return new Map(a.writer, a.reader, ea);
 
     case RECORD:
       return Advancer.Record.from((Resolver.RecordAdjust)a);
@@ -188,7 +195,7 @@ abstract class Advancer {
                              ru.firstMatch, Advancer.from(ru.actualAction));
 
     case ERROR:
-      return new Error(w,r, a.toString());
+      return new Error(a.writer,a.reader, a.toString());
     case SKIP:
       throw new RuntimeException("Internal error.  Skip should've been consumed.");
     default:
@@ -224,42 +231,54 @@ abstract class Advancer {
     }
   }
 
-  /** Used for Array and Map.  The following fragment illustrates how
+  /** Used for Array.  The following fragment illustrates how
     * to use to read an array of int:
     *
     * <pre>
-    *   Advancer c = advancer.getContainerAdvancer(in);
-    *   Advancer.Container ec = c.getElementAdvancer(in);
+    *   Advancer.Container c = advancer.getArrayAdvancer(in);
     *   for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) {
     *     for (long j = 0; j < i; j++) {
-    *       int element = c.readInt(in);
+    *       int element = c.elementAdvancer.readInt(in);
     *       // .. do something with this element
     *     }
     *   }
     * </pre>
     * See the implementation of {@link GenericDatumReader2} for more
     * illustrations. */
-  public abstract static class Container extends Advancer {
-    private final Advancer elementAdvancer;
-    public Container(Schema w, Schema r, Advancer ea) { super(wr); elementAdvancer = ea; }
-    public Container getContainerAdvancer(Decoder in) { return this; }
-    public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; }
-    public abstract long firstChunk(Decoder in) throws IOException;
-    public abstract long nextChunk(Decoder in) throws IOException;
-  }
+  public static class Container extends Advancer {
+    public final Advancer elementAdvancer;
+    public Container(Schema w, Schema r, Advancer ea)
+      { super(w,r); elementAdvancer = ea; }
 
-  private static class ArrayContainer extends Container {
-    public ArrayContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
     public long firstChunk(Decoder in) throws IOException
       { return in.readArrayStart(); }
+
     public long nextChunk(Decoder in) throws IOException
       { return in.arrayNext(); }
   }
 
-  private static class MapContainer extends Container {
-    public MapContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
+  /** Used for Map.  The following fragment illustrates how
+    * to use to read an array of int:
+    *
+    * <pre>
+    *   Advancer.Map c = advancer.getMapAdvancer(in);
+    *   for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) {
+    *     for (long j = 0; j < i; j++) {
+    *       String key = c.keyAdvancer.readString(in);
+    *       int element = c.elementAdvancer.readInt(in);
+    *       // .. do something with this element
+    *     }
+    *   }
+    * </pre>
+    * See the implementation of {@link GenericDatumReader2} for more
+    * illustrations. */
+  public static class Map extends Container {
+    public final Advancer keyAdvancer = StringFast.instance;
+    public Map(Schema w, Schema r, Advancer ea) { super(w,r,ea); }
+
     public long firstChunk(Decoder in) throws IOException
       { return in.readMapStart(); }
+
     public long nextChunk(Decoder in) throws IOException
       { return in.mapNext(); }
   }
@@ -270,7 +289,8 @@ abstract class Advancer {
 
   private static class NullFast extends Advancer {
     public static final NullFast instance = new NullFast();
-    private NullFast() { Schema s = Schema.create(Schema.Type.NULL); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.NULL);
+    private NullFast() { super(s,s); }
     public Object nextNull(Decoder in) throws IOException {
       in.readNull(); 
       return null;
@@ -280,7 +300,8 @@ abstract class Advancer {
 
   private static class BooleanFast extends Advancer {
     public static final BooleanFast instance = new BooleanFast();
-    private BooleanFast() { Schema s = Schema.create(Schema.Type.BOOLEAN); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.BOOLEAN);
+    private BooleanFast() { super(s,s); }
     public boolean nextBoolean(Decoder in) throws IOException {
       return in.readBoolean(); 
     }
@@ -289,7 +310,8 @@ abstract class Advancer {
 
   private static class IntFast extends Advancer {
     public static final IntFast instance = new IntFast();
-    private IntFast() { Schema s = Schema.create(Schema.Type.INTEGER); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.INT);
+    private IntFast() { super(s,s); }
     public int nextInt(Decoder in) throws IOException {
       return in.readInt(); 
     }
@@ -298,7 +320,8 @@ abstract class Advancer {
 
   private static class LongFast extends Advancer {
     public static final LongFast instance = new LongFast();
-    private LongFast() { Schema s = Schema.create(Schema.Type.LONG); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.LONG);
+    private LongFast() { super(s,s); }
     public long nextLong(Decoder in) throws IOException {
       return in.readLong(); 
     }
@@ -307,7 +330,8 @@ abstract class Advancer {
 
   private static class FloatFast extends Advancer {
     public static final FloatFast instance = new FloatFast();
-    private FloatFast() { Schema s = Schema.create(Schema.Type.FLOAT); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.FLOAT);
+    private FloatFast() { super(s,s); }
     public float nextFloat(Decoder in) throws IOException {
       return in.readFloat(); 
     }
@@ -316,7 +340,8 @@ abstract class Advancer {
 
   private static class DoubleFast extends Advancer {
     public static final DoubleFast instance = new DoubleFast();
-    private DoubleFast() { Schema s = Schema.create(Schema.Type.DOUBLE); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.DOUBLE);
+    private DoubleFast() { super(s,s); }
     public double nextDouble(Decoder in) throws IOException {
       return in.readDouble(); 
     }
@@ -325,7 +350,8 @@ abstract class Advancer {
 
   private static class StringFast extends Advancer {
     public static final StringFast instance = new StringFast();
-    private StringFast() { Schema s = Schema.create(Schema.Type.STRING); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.STRING);
+    private StringFast() { super(s,s); }
     public String nextString(Decoder in) throws IOException { return in.readString(); }
     public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
       return in.readString(old);
@@ -335,7 +361,8 @@ abstract class Advancer {
 
   private static class BytesFast extends Advancer {
     public static final BytesFast instance = new BytesFast();
-    private BytesFast() { Schema s = Schema.create(Schema.Type.BYTES); super(s,s); }
+    private static final Schema s = Schema.create(Schema.Type.BYTES);
+    private BytesFast() { super(s,s); }
     public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
       return in.readBytes(old);
     }
@@ -477,7 +504,7 @@ abstract class Advancer {
     * consume the tag ourself and call the corresponding advancer. */
   private static class WriterUnion extends Advancer {
     private Advancer[] branches;
-    public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r) branches = b; }
+    public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r); branches = b; }
 
     private final Advancer b(Decoder in) throws IOException
       { return branches[in.readIndex()]; }
@@ -504,8 +531,11 @@ abstract class Advancer {
     public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
       { return b(in).getBranchAdvancer(in, branch); }
 
-    public Container getContainerAdvancer(Decoder in) throws IOException
-      { return b(in).getContainerAdvancer(in); }
+    public Container getArrayAdvancer(Decoder in) throws IOException
+      { return b(in).getArrayAdvancer(in); }
+
+    public Map getMapAdvancer(Decoder in) throws IOException
+      { return b(in).getMapAdvancer(in); }
 
     public Record getRecordAdvancer(Decoder in) throws IOException
       { return b(in).getRecordAdvancer(in); }
@@ -693,8 +723,11 @@ abstract class Advancer {
     public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
       { ignore(toSkip, in); return field.getBranchAdvancer(in, branch); }
 
-    public Container getContainerAdvancer(Decoder in) throws IOException
-      { ignore(toSkip, in); return field.getContainerAdvancer(in); }
+    public Container getArrayAdvancer(Decoder in) throws IOException
+      { ignore(toSkip, in); return field.getArrayAdvancer(in); }
+
+    public Map getMapAdvancer(Decoder in) throws IOException
+      { ignore(toSkip, in); return field.getMapAdvancer(in); }
 
     public Record getRecordAdvancer(Decoder in) throws IOException
       { ignore(toSkip, in); return field.getRecordAdvancer(in); }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java
index 46506c2..cbfcff7 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java
@@ -19,47 +19,42 @@ package org.apache.avro.generic;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.avro.Resolver;
 import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 
-public class GenericDatumReader2<D extends IndexedRecord> implements DatumReader<D> {
-  private final Schema reader, writer;
+public class GenericDatumReader2<D> implements DatumReader<D> {
   private final Advancer.Record advancer;
   private final GenericData data;
 
-  private GenericDatumReader2(Schema writer, Schema reader, Advancer.Record a, GenericData d) {
-    this.writer = writer;
-    this.reader = reader;
+  private GenericDatumReader2(Advancer.Record a, GenericData d) {
     advancer = a;
     data = d;
   }
 
-  public static GenericDatumReader2 getReaderFor(Schema writer, Schema reader, GenericData data) {
+  /** ... Document how we use <code>d:</code> to create fixed, array,
+    * map, and record objects.
+    */
+  public static GenericDatumReader2 getReaderFor(Schema writer, Schema reader, GenericData d) {
     // TODO: add caching
-    Resolver.Action a = Resolver.resolve(writer, reader, data);
+    Resolver.Action a = Resolver.resolve(writer, reader, d);
     Advancer.Record r = (Advancer.Record)Advancer.from(a);
-    return new GenericDatumReader2(writer, reader, r, data);
+    return new GenericDatumReader2(r, d);
   }
 
   public D read(D reuse, Decoder in) throws IOException {
-    List<Schema.Field> wf = writer.getFields();
-    if (reuse == null) reuse = null; // FIXME
-    for (int i = 0; i < advancer.advancers.length; i++) {
-      int p = advancer.readerOrder[i].pos();
-      reuse.put(p, read(null, wf.get(i).schema(), advancer.advancers[i], in));
-    }
-    advancer.done(in);
-    return reuse;
+   return null;
   }
 
-  public Object read(Object reuse, Schema expected, Advancer a, Decoder in)
+  public Object read(Object reuse, Advancer a, Decoder in)
     throws IOException
   {
-    switch (expected.getType()) {
+    switch (a.reader.getType()) {
     case NULL: return a.nextNull(in);
     case BOOLEAN: return (Boolean) a.nextBoolean(in);
     case INT: return (Integer) a.nextInt(in);
@@ -68,22 +63,65 @@ public class GenericDatumReader2<D extends IndexedRecord> implements DatumReader
     case DOUBLE: return (Double) a.nextDouble(in);
     case STRING: return (String) a.nextString(in);
     case BYTES: return a.nextBytes(in, (ByteBuffer)reuse);
-    case FIXED:
+    case FIXED: {
+      GenericFixed fixed = (GenericFixed) data.createFixed(reuse, a.reader);
+      a.nextFixed(in, fixed.bytes());
+      return fixed;
+    }
+
     case ARRAY: {
-      List result = null; // FIXME -- use GenericData methods here...
-      Advancer.Container c = advancer.getContainerAdvancer(in);
-      Advancer ec = c.getElementAdvancer(in);
-      Schema es = expected.getElementType();
-      for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) {
+      Advancer.Container c = advancer.getArrayAdvancer(in);
+      Advancer ec = c.elementAdvancer;
+      long i = c.firstChunk(in);
+      if (reuse instanceof GenericArray) {
+        ((GenericArray) reuse).reset();
+      } else if (reuse instanceof Collection) {
+        ((Collection) reuse).clear();
+      } else reuse = new GenericData.Array((int)i, a.reader);
+
+      Collection array = (Collection)reuse;
+      for( ; i != 0; i = c.nextChunk(in))
         for (long j = 0; j < i; j++) {
-          result.add(read(null, es, ec, in));
+          Object v = read(null, ec, in);
+          // TODO -- logical type conversion
+          array.add(v);
         }
-      }
+      if (array instanceof GenericArray<?>)
+        ((GenericArray<?>) array).prune();
     }
         
-    case MAP:
-    case RECORD:
+    case MAP: {
+      Advancer.Map c = advancer.getMapAdvancer(in);
+      Advancer kc = c.keyAdvancer;
+      Advancer ec = c.elementAdvancer;
+      long i = c.firstChunk(in);
+      if (reuse instanceof Map) {
+        ((Map) reuse).clear();
+      } else reuse = new HashMap<Object,Object>((int)i);
+      Map map = (Map)reuse;
+      for ( ; i != 0; i = c.nextChunk(in))
+        for (int j = 0; j < i; j++) {
+          Object key = kc.nextString(in);
+          Object val = read(null, ec, in);
+          map.put(key, val);
+        }
+      return map;
+    }
+
+    case RECORD: {
+      Advancer.Record ra = advancer.getRecordAdvancer(in);
+      Object r = data.newRecord(reuse, ra.reader);
+      for (int i = 0; i < ra.advancers.length; i++) {
+        int p = ra.readerOrder[i].pos();
+        ((IndexedRecord)reuse).put(p, read(null, ra.advancers[i], in));
+      }
+      ra.done(in);
+      return r;
+    }
+
     case UNION:
+      return read(reuse, advancer.getBranchAdvancer(in, advancer.nextIndex(in)), in);
+
     default:
       throw new IllegalArgumentException("Can't handle this yet.");
     }