You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2019/05/01 19:11:13 UTC

[avro] 03/14: Dealt with record field-skipping problem. Also fixed a few errors missed earlier.

This is an automated email from the ASF dual-hosted git repository.

thiru pushed a commit to branch fast-decoder-thiru
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 345f6b32f9cc9de75cf492785a62e8a91c78085e
Author: rstata <rs...@yahoo.com>
AuthorDate: Tue Apr 30 00:20:14 2019 -0700

    Dealt with record field-skipping problem.  Also fixed a few errors missed earlier.
---
 .../java/org/apache/avro/specific/Advancer.java    | 96 +++++++++++++---------
 1 file changed, 56 insertions(+), 40 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java
index 75d2615..ee326e2 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java
@@ -76,20 +76,24 @@ abstract class Advancer {
   //// an integer is read with no promotion) overrides just
   //// readInt.
 
-  public Object next(Decoder in) throws IOException { exception(); }
-  public Object nextNull(Decoder in) throws IOException { exception(); }
-  public boolean nextBoolean(Decoder in) throws IOException { exception(); }
-  public int nextInt(Decoder in) throws IOException { exception(); }
-  public long nextLong(Decoder in) throws IOException { exception(); }
-  public float nextFloat(Decoder in) throws IOException { exception(); }
-  public double nextDouble(Decoder in) throws IOException { exception(); }
-  public int nextEnum(Decoder in) throws IOException { exception(); }
-  public Utf8 nextString(Decoder in, Utf8 old) throws IOException { exception(); }
-  public String nextString(Decoder in) throws IOException { exception(); }
-  public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { exception(); }
+  public Object next(Decoder in) throws IOException { exception(); return null; }
+  public Object nextNull(Decoder in) throws IOException { exception(); return null; }
+  public boolean nextBoolean(Decoder in) throws IOException { exception(); return false; }
+  public int nextInt(Decoder in) throws IOException { exception(); return 0; }
+  public long nextLong(Decoder in) throws IOException { exception(); return 0; }
+  public float nextFloat(Decoder in) throws IOException { exception(); return 0; }
+  public double nextDouble(Decoder in) throws IOException { exception(); return 0; }
+  public int nextEnum(Decoder in) throws IOException { exception(); return 0; }
+  public Utf8 nextString(Decoder in, Utf8 old) throws IOException { exception(); return null; }
+  public String nextString(Decoder in) throws IOException { exception(); return null; }
+  public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
+    exception();
+    return null;
+  }
 
   public byte[] nextFixed(Decoder in, byte[] bytes, int start, int length) throws IOException {
     exception();
+    return null;
   }
 
   public byte[] nextFixed(Decoder in, byte[] bytes) throws IOException {
@@ -99,20 +103,23 @@ abstract class Advancer {
   /** Access to contained advancer (for Array and Map types). */
   public Advancer getElementAdvancer(Decoder in) throws IOException {
     exception();
+    return null;
   }
 
   /** Get index for a union. */
-  public int nextIndex(Decoder in) throws IOException { exception(); }
+  public int nextIndex(Decoder in) throws IOException { exception(); return 0; }
 
   /** Access to contained advancer for unions.  You must call {@link
    *  nextIndex} before calling this method.  */
   public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException {
     exception();
+    return null;
   }
 
   /** Access to contained advancer (for Array, Map, and Union types). */
   public Record getRecordAdvancer(Decoder in) throws IOException {
     exception();
+    return null;
   }
 
 
@@ -216,6 +223,7 @@ abstract class Advancer {
     * of the union. */
   private static class Container extends Advancer {
     private final Advancer elementAdvancer;
+    public Container(Advancer elementAdvancer) { this.elementAdvancer = elementAdvancer; }
     public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; }
   }
 
@@ -486,32 +494,44 @@ abstract class Advancer {
   //// Records are particularly intricate because we may have to skip
   //// fields, read fields out of order, and use default values.
 
-  /** Advancer for records.  The {@link advancer} array contains an
+  /** Advancer for records.  The {@link advancers} array contains an
     * advancer for each field, ordered according writer (which
     * determines the order in which data must be read).  The {@link
     * readerOrder} array tells you how those advancers line up with the
     * reader's fields.  Thus, the following is how to read a record:
     * <pre>
-    *    for (int i = 0; i < a.advancers.length; i++)
+    *    for (int i = 0; i < a.advancers.length; i++) {
     *      dataum.set(a.readerOrder[i], a.advancers[i].next());
+    *    }
+    *    a.done();
     * </pre>
+    * Note that a decoder <em>must</em> call {@link done} after interpreting
+    * all the elemnts in {@link advancers}.
+    *
     * As a convenience, {@link inOrder} is set to true iff the reader
     * and writer order agrees (i.e., iff <code>readerOrder[i] ==
     * i</code> for all i).  Generated code can use this to optimize this
     * common case. */
   public static class Record extends Advancer {
     public final Advancer[] advancers;
+    private Schema[] finalSkips;
     public final int[] readerOrder;
     public final boolean inOrder;
 
-    private Record(Advancer[] advancers, int[] readerOrder, boolean inOrder) {
+    private Record(Advancer[] advancers, Schema[] finalSkips, int[] order, boolean inOrder) {
       this.advancers = advancers;
-      this.readerOrder = readerOrder;
+      this.finalSkips = finalSkips;
+      this.readerOrder = order;
       this.inOrder = inOrder;
     }
 
     public Record getRecordAdvancer(Decoder in) { return this; }
 
+    /** Must be called after consuming all elements of {@link advancers}. */
+    public void done(Decoder in) throws IOException {
+      ignore(finalSkips, in);
+    }
+
     protected static Advancer from(Resolver.RecordAdjust ra) {
       /** Two cases: reader + writer agree on order, vs disagree. */
       /** This is the complicated case, since skipping is involved. */
@@ -525,48 +545,44 @@ abstract class Advancer {
       Advancer[] fieldAdvs = new Advancer[readOrder.length];
 
       int i = 0; // Index into ra.fieldActions
-      int rf = 0; // Index into readOrder
-      int nrf = 0; // Index into fieldAdvs
-
-      // Deal with any leading fields to be skipped
-      Schema[] firstSkips = collectSkips(ra.fieldActions, i);
-      if (firstSkips.length != 0) i += firstSkips.length;
-      else firstSkips = null;
+      int rf = 0; // Index into ra.readerOrder
+      int nrf = 0; // (Insertion) index into fieldAdvs
 
       // Deal with fields to be read
-      for ( ; i < ra.fieldActions.length; nrf++, rf++) {
-        Advancer fieldAdv = Advancer.from(ra.fieldActions[i]);
-        i++;
+      for ( ; rf < ra.firstDefault; rf++, nrf++) {
         Schema[] toSkip = collectSkips(ra.fieldActions, i);
-        if (toSkip.length != 0) {
-          fieldAdv = new RecordField(fieldAdv, toSkip);
-          i += toSkip.length;
-        }
-        if (firstSkips != null) {
-          fieldAdv = new RecordFieldWithBefore(firstSkips, fieldAdv);
-          firstSkips = null;
-        }
+        i += toSkip.length;
+        Advancer fieldAdv = Advancer.from(ra.fieldActions[i++]);
+        if (toSkip.length != 0) fieldAdv = new RecordField(fieldAdv, toSkip);
         fieldAdvs[nrf] = fieldAdv;
       }
 
+      // Deal with any trailing fields to be skipped:
+      Schema[] finalSkips = collectSkips(ra.fieldActions, i);
+      // Assert i == ra.fieldActions.length
+
+      // Deal with defaults
+      for (int df = 0; rf < readOrder.length; rf++, df++, nrf++)
+        fieldAdvs[nrf] = new Default(ra.defaults[df]);
+
       // If reader and writer orders agree, sort fieldAdvs by reader
       // order (i.e., move defaults into the correct place), to allow
       // decoders to have an optimized path for the common case of a
       // record's field order not changing.
       boolean inOrder = true;
       for (int k = 0; k < ra.firstDefault-1; k++)
-        if (readOrder[k] > readOrder[k+1]) inOrder = false;
+        inOrder &= (readOrder[k] < readOrder[k+1]);
       if (inOrder) {
         Advancer[] newAdvancers = new Advancer[fieldAdvs.length];
         for (int k = 0, rf2 = 0, df = ra.firstDefault; k < readOrder.length; k++) {
-          if (rf2 < df) newAdvancers[k] = fieldAdvs[rf2++];
-          else  newAdvancers[k] = fieldAdvs[df++];
-          readOrder[k] = k;
+          if (readOrder[rf2] < readOrder[df]) newAdvancers[k] = fieldAdvs[rf2++];
+          else newAdvancers[k] = fieldAdvs[df++];
         }
-        newAdvancers = fieldAdvs;
+        for (int k = 0; k < readOrder.length; k++) readOrder[k] = k;
+        fieldAdvs = newAdvancers;
       }
 
-      return new Record(fieldAdvs, readOrder, inOrder);
+      return new Record(fieldAdvs, finalSkips, readOrder, inOrder);
     }
   }