You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2019/05/01 19:11:12 UTC
[avro] 02/14: First draft of advancer code.
This is an automated email from the ASF dual-hosted git repository.
thiru pushed a commit to branch fast-decoder-thiru
in repository https://gitbox.apache.org/repos/asf/avro.git
commit c602938347a6e1ba96da11ff7c3e08f8a7eb05ab
Author: rstata <rs...@yahoo.com>
AuthorDate: Mon Apr 29 23:26:46 2019 -0700
First draft of advancer code.
---
.../java/org/apache/avro/specific/Advancer.java | 757 +++++++++++++++++++++
1 file changed, 757 insertions(+)
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java
new file mode 100644
index 0000000..75d2615
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.specific;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Resolver;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+
+/** An "Advancer" is a tree of objects that apply resolution logic
+ * while reading values out of a {@link Decoder}.
+ *
+ * An Advancer tree is created by calling {@link advancerFor} on a
+ * {@link Resolver.Action} object. The resulting tree mimics the
+ * reader schema of that Action object.
+ *
+ * A decoder for that reader schema is meant to traverse the schema
+ * in a depth-first fashion. When it hits a leaf of type
+ * <code>Xyz</code>, it should call corresponding
+ * <code>nextXyx</code> on the Advancer. For example, if the reader
+ * hits a lead indicating that an integer should be read, it should
+ * call {@link nextInt}, as in <code>a.nextInt(in)</code>, where
+ * <code>a</code> is the advancer being traversed, and
+ * <code>in</code> is the Decoder being read from.
+ *
+ * When traversing an Array or Map in the reader schema, the decoder
+ * should call {@link getElementAdvancer} to retrieve the advancer
+ * object for the contained element-schema, value-schema, or non-null
+ * schema respectively. ({@link next} cannot be called on {@link
+ * Advancer.Record} objects -- decoders must decode them field by
+ * field.)
+ *
+ * For unions, the decoder should call {@link nextIndex} to fetch the
+ * branch and then {@link getBranchAdvancer} to get the advancer of
+ * that branch. (Calling {@link next} on a union will read the
+ * index, pick the right advancer based on the index, and then read
+ * and return the actual value.)
+ *
+ * Traversing an record is more involved. The decoder should call
+ * {@link getRecordAdvancer} and proceed as described in the
+ * documentation for {@link Advancer.Record}. ({@link next} cannot
+ * be called on {@link Advancer.Record} objects -- decoders must
+ * decode them field by field.)
+ **/
+abstract class Advancer {
+ protected Exception exception() {
+ throw new UnsupportedOperationException();
+ }
+
+ //// API methods of Advancer. Used by decoding methods to
+ //// read values out of Decoder, applying resolution logic
+ //// in the process. In the base class, these do throw
+ //// a not-supported exception. Specific subclasses implement
+ //// certain ones, e.g., IntFast (the Advancer used when
+ //// an integer is read with no promotion) overrides just
+ //// readInt.
+
+ public Object next(Decoder in) throws IOException { exception(); }
+ public Object nextNull(Decoder in) throws IOException { exception(); }
+ public boolean nextBoolean(Decoder in) throws IOException { exception(); }
+ public int nextInt(Decoder in) throws IOException { exception(); }
+ public long nextLong(Decoder in) throws IOException { exception(); }
+ public float nextFloat(Decoder in) throws IOException { exception(); }
+ public double nextDouble(Decoder in) throws IOException { exception(); }
+ public int nextEnum(Decoder in) throws IOException { exception(); }
+ public Utf8 nextString(Decoder in, Utf8 old) throws IOException { exception(); }
+ public String nextString(Decoder in) throws IOException { exception(); }
+ public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { exception(); }
+
+ public byte[] nextFixed(Decoder in, byte[] bytes, int start, int length) throws IOException {
+ exception();
+ }
+
+ public byte[] nextFixed(Decoder in, byte[] bytes) throws IOException {
+ return nextFixed(in, bytes, 0, bytes.length);
+ }
+
+ /** Access to contained advancer (for Array and Map types). */
+ public Advancer getElementAdvancer(Decoder in) throws IOException {
+ exception();
+ }
+
+ /** Get index for a union. */
+ public int nextIndex(Decoder in) throws IOException { exception(); }
+
+ /** Access to contained advancer for unions. You must call {@link
+ * nextIndex} before calling this method. */
+ public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException {
+ exception();
+ }
+
+ /** Access to contained advancer (for Array, Map, and Union types). */
+ public Record getRecordAdvancer(Decoder in) throws IOException {
+ exception();
+ }
+
+
+ ////// Here's the builder for Advancer trees. The subclasses used by
+ ////// this implementation are found below.
+
+ /** Build an {@link Advancer} tree that for a given {@link
+ * Resolver.Action} tree. */
+ public static Advancer from(Resolver.Action a) {
+ switch (a.type) {
+ case DO_NOTHING:
+ switch (a.reader.getType()) {
+ case NULL: return NullFast.instance;
+ case BOOLEAN: return BooleanFast.instance;
+ case INT: return IntFast.instance;
+ case LONG: return LongFast.instance;
+ case FLOAT: return FloatFast.instance;
+ case DOUBLE: return DoubleFast.instance;
+ case STRING: return StringFast.instance;
+ case BYTES: return BytesFast.instance;
+ case FIXED: return new FixedFast(a.writer.getFixedSize());
+ default:
+ throw new IllegalArgumentException("Unexpected schema for DoNothing:" + a.reader);
+ }
+ case PROMOTE:
+ switch (((Resolver.Promote)a).promotion) {
+ case INT2LONG: return LongFromInt.instance;
+ case INT2FLOAT: return FloatFromInt.instance;
+ case INT2DOUBLE: return DoubleFromInt.instance;
+ case LONG2FLOAT: return FloatFromLong.instance;
+ case LONG2DOUBLE: return DoubleFromLong.instance;
+ case FLOAT2DOUBLE: return DoubleFromFloat.instance;
+ case STRING2BYTES: return BytesFromString.instance;
+ case BYTES2STRING: return StringFromBytes.instance;
+ default:
+ throw new IllegalArgumentException("Unexpected promotion:" + a);
+ }
+ case ENUM:
+ Resolver.EnumAdjust e = (Resolver.EnumAdjust)a;
+ if (e.noAdjustmentsNeeded) return EnumFast.instance;
+ else return new EnumWithAdjustments(e.adjustments);
+
+ case CONTAINER:
+ return new Container(Advancer.from(((Resolver.Container)a).elementAction));
+
+ case RECORD:
+ return Advancer.Record.from((Resolver.RecordAdjust)a);
+
+ case WRITER_UNION:
+ Resolver.WriterUnion wu = (Resolver.WriterUnion)a;
+ Advancer[] branches = new Advancer[wu.actions.length];
+ for (int i = 0; i < branches.length; i++)
+ branches[i] = Advancer.from(wu.actions[i]);
+ if (wu.unionEquiv) return new EquivUnion(branches);
+ return new WriterUnion(branches);
+
+ case READER_UNION:
+ Resolver.ReaderUnion ru = (Resolver.ReaderUnion)a;
+ return new ReaderUnion(ru.firstMatch, Advancer.from(ru.actualAction));
+
+ case ERROR:
+ throw new AvroTypeException(a.toString());
+ case SKIP:
+ throw new RuntimeException("Internal error. Skip should've been consumed.");
+ default:
+ throw new IllegalArgumentException("Unknown action:" + a);
+ }
+ }
+
+ private static Schema[] collectSkips(Resolver.Action[] actions, int start) {
+ Schema[] result = EMPTY_SCHEMA_ARRAY;
+ int j = start;
+ while (j < actions.length && actions[j].type == Resolver.Action.Type.SKIP)
+ j++;
+ if (start < j) {
+ result = new Schema[j - start];
+ for (int k = 0; k < (j - start); k++)
+ result[k] = actions[start + k].writer;
+ }
+ return result;
+ }
+ private static final Schema[] EMPTY_SCHEMA_ARRAY = new Schema[0];
+
+ ////// Subclasses of Advancer -- real work is done in these
+
+ /** All methods of <code>this</code> throw {@link
+ * AvroTypeException} with appropriate message. Used for
+ * throwing resolution errors in a lazy fashion (i.e., as actual
+ * data causes the error to manifest). */
+ private static class Error extends Advancer {
+ String msg;
+ public Error(String msg) { this.msg = msg; }
+ protected Exception exception() {
+ throw new AvroTypeException(msg);
+ }
+ }
+
+ /** Used for Array, Map, and Union. In case of Union, since we only
+ * support "nullable" unions (ie, two-branch unions in which one
+ * branch is null), the element advancer is for the non-null branch
+ * of the union. */
+ private static class Container extends Advancer {
+ private final Advancer elementAdvancer;
+ public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; }
+ }
+
+ //// The following set of subclasses are for when there is no
+ //// resolution logic to be applied. All that needs to be done
+ //// is call the corresponding method on the Decoder.
+
+ private static class NullFast extends Advancer {
+ public static final NullFast instance = new NullFast();
+ private NullFast() { }
+ public Object nextNull(Decoder in) throws IOException {
+ in.readNull();
+ return null;
+ }
+ public Object next(Decoder in) throws IOException { return nextNull(in); }
+ }
+
+ private static class BooleanFast extends Advancer {
+ public static final BooleanFast instance = new BooleanFast();
+ private BooleanFast() { }
+ public boolean nextBoolean(Decoder in) throws IOException {
+ return in.readBoolean();
+ }
+ public Object next(Decoder in) throws IOException { return nextBoolean(in); }
+ }
+
+ private static class IntFast extends Advancer {
+ public static final IntFast instance = new IntFast();
+ private IntFast() { }
+ public int nextInt(Decoder in) throws IOException {
+ return in.readInt();
+ }
+ public Object next(Decoder in) throws IOException { return nextInt(in); }
+ }
+
+ private static class LongFast extends Advancer {
+ public static final LongFast instance = new LongFast();
+ private LongFast() { }
+ public long nextLong(Decoder in) throws IOException {
+ return in.readLong();
+ }
+ public Object next(Decoder in) throws IOException { return nextLong(in); }
+ }
+
+ private static class FloatFast extends Advancer {
+ public static final FloatFast instance = new FloatFast();
+ private FloatFast() { }
+ public float nextFloat(Decoder in) throws IOException {
+ return in.readFloat();
+ }
+ public Object next(Decoder in) throws IOException { return nextFloat(in); }
+ }
+
+ private static class DoubleFast extends Advancer {
+ public static final DoubleFast instance = new DoubleFast();
+ private DoubleFast() { }
+ public double nextDouble(Decoder in) throws IOException {
+ return in.readDouble();
+ }
+ public Object next(Decoder in) throws IOException { return nextDouble(in); }
+ }
+
+ private static class StringFast extends Advancer {
+ public static final StringFast instance = new StringFast();
+ private StringFast() { }
+ public String nextString(Decoder in) throws IOException { return in.readString(); }
+ public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
+ return in.readString(old);
+ }
+ public Object next(Decoder in) throws IOException { return nextString(in); }
+ }
+
+ private static class BytesFast extends Advancer {
+ public static final BytesFast instance = new BytesFast();
+ private BytesFast() { }
+ public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
+ return in.readBytes(old);
+ }
+ public Object next(Decoder in) throws IOException { return nextBytes(in, null); }
+ }
+
+ private static class FixedFast extends Advancer {
+ private final int len;
+ private FixedFast(int len) { this.len = len; }
+ public byte[] nextFixed(Decoder in, byte[] bytes, int start, int len) throws IOException {
+ in.readFixed(bytes, start, len);
+ return bytes;
+ }
+ public Object next(Decoder in) throws IOException {
+ byte[] result = new byte[len];
+ nextFixed(in, new byte[len]);
+ return result;
+ }
+ }
+
+ private static class EnumFast extends Advancer {
+ public static final EnumFast instance = new EnumFast();
+ private EnumFast() { }
+ public int nextEnum(Decoder in) throws IOException { return in.readEnum(); }
+ public Object next(Decoder in) throws IOException { return nextEnum(in); }
+ }
+
+ //// The following set of subclasses apply promotion logic
+ //// to the underlying value read.
+
+ private static class LongFromInt extends Advancer {
+ public static final LongFromInt instance = new LongFromInt();
+ private LongFromInt() { }
+ public long nextLong(Decoder in) throws IOException {
+ return (long) in.readInt();
+ }
+ public Object next(Decoder in) throws IOException { return nextLong(in); }
+ }
+
+ private static class FloatFromInt extends Advancer {
+ public static final FloatFromInt instance = new FloatFromInt();
+ private FloatFromInt() { }
+ public float nextFloat(Decoder in) throws IOException {
+ return (float) in.readInt();
+ }
+ public Object next(Decoder in) throws IOException { return nextFloat(in); }
+ }
+
+ private static class FloatFromLong extends Advancer {
+ public static final FloatFromLong instance = new FloatFromLong();
+ private FloatFromLong() { }
+ public float nextFloat(Decoder in) throws IOException {
+ return (long) in.readLong();
+ }
+ public Object next(Decoder in) throws IOException { return nextFloat(in); }
+ }
+
+ private static class DoubleFromInt extends Advancer {
+ public static final DoubleFromInt instance = new DoubleFromInt();
+ private DoubleFromInt() { }
+ public double nextDouble(Decoder in) throws IOException {
+ return (double) in.readInt();
+ }
+ public Object next(Decoder in) throws IOException { return nextDouble(in); }
+ }
+
+ private static class DoubleFromLong extends Advancer {
+ public static final DoubleFromLong instance = new DoubleFromLong();
+ private DoubleFromLong() { }
+ public double nextDouble(Decoder in) throws IOException {
+ return (double) in.readLong();
+ }
+ public Object next(Decoder in) throws IOException { return nextDouble(in); }
+ }
+
+ private static class DoubleFromFloat extends Advancer {
+ public static final DoubleFromFloat instance = new DoubleFromFloat();
+ private DoubleFromFloat() { }
+ public double nextDouble(Decoder in) throws IOException {
+ return (double) in.readFloat();
+ }
+ public Object next(Decoder in) throws IOException { return nextDouble(in); }
+ }
+
+ private static class BytesFromString extends Advancer {
+ public static final BytesFromString instance = new BytesFromString();
+ private BytesFromString() { }
+ public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
+ Utf8 s = in.readString(null);
+ return ByteBuffer.wrap(s.getBytes(), 0, s.getByteLength());
+ }
+ public Object next(Decoder in) throws IOException { return nextBytes(in, null); }
+ }
+
+ private static class StringFromBytes extends Advancer {
+ public static final StringFromBytes instance = new StringFromBytes();
+ private StringFromBytes() { }
+ public String nextString(Decoder in) throws IOException {
+ return new String(in.readBytes(null).array(), StandardCharsets.UTF_8);
+ }
+ public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
+ return new Utf8(in.readBytes(null).array());
+ }
+ public Object next(Decoder in) throws IOException { return nextString(in); }
+ }
+
+
+ //// This last set of advancers are used when more sophisticated
+ //// adjustmentds are needed
+
+ private static class EnumWithAdjustments extends Advancer {
+ private final int[] adjustments;
+ public EnumWithAdjustments(int[] adjustments) {
+ this.adjustments = adjustments;
+ }
+ public int nextEnum(Decoder in) throws IOException {
+ return adjustments[in.readInt()];
+ }
+ public Object next(Decoder in) throws IOException { return nextEnum(in); }
+ }
+
+ /** In this case, the writer has a union by the reader doesn't, so we
+ * consume the tag ourself and call the corresponding advancer. */
+ private static class WriterUnion extends Advancer {
+ private Advancer[] branches;
+ public WriterUnion(Advancer[] branches) { this.branches = branches; }
+
+ private final Advancer b(Decoder in) throws IOException
+ { return branches[in.readIndex()]; }
+
+ public Object next(Decoder in) throws IOException { return b(in).next(in); }
+ public Object nextNull(Decoder in) throws IOException { return b(in).nextNull(in); }
+ public boolean nextBoolean(Decoder in) throws IOException { return b(in).nextBoolean(in); }
+ public int nextInt(Decoder in) throws IOException { return b(in).nextInt(in); }
+ public long nextLong(Decoder in) throws IOException { return b(in).nextLong(in); }
+ public float nextFloat(Decoder in) throws IOException { return b(in).nextFloat(in); }
+ public double nextDouble(Decoder in) throws IOException { return b(in).nextDouble(in); }
+ public int nextEnum(Decoder in) throws IOException { return b(in).nextEnum(in); }
+ public String nextString(Decoder in) throws IOException { return b(in).nextString(in); }
+ public Utf8 nextString(Decoder in, Utf8 old) throws IOException
+ { return b(in).nextString(in, old); }
+
+ public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException
+ { return b(in).nextBytes(in, old); }
+
+ public byte[] nextFixed(Decoder in, byte[] bytes, int start, int length) throws IOException
+ { return b(in).nextFixed(in, bytes, start, length); }
+
+ public Advancer getElementAdvancer(Decoder in) throws IOException
+ { return b(in).getElementAdvancer(in); }
+
+ public int nextIndex(Decoder in) throws IOException { return b(in).nextIndex(in); }
+ public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
+ { return b(in).getBranchAdvancer(in, branch); }
+
+ public Record getRecordAdvancer(Decoder in) throws IOException
+ { return b(in).getRecordAdvancer(in); }
+ }
+
+ /** In this case, reader and writer have the same union, so let the decoder
+ * consume it as a regular union. */
+ private static class EquivUnion extends Advancer {
+ private final Advancer[] branches;
+ public EquivUnion(Advancer[] branches) { this.branches = branches; }
+
+ public int nextIndex(Decoder in) throws IOException { return in.readIndex(); }
+ public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException {
+ return branches[branch];
+ }
+ public Object next(Decoder in) throws IOException {
+ return branches[in.readIndex()].next(in);
+ }
+ }
+
+ private static class ReaderUnion extends Advancer {
+ private int branch;
+ private Advancer advancer;
+ public ReaderUnion(int b, Advancer a) { branch = b; advancer = a; }
+ public int nextIndex(Decoder in) { return branch; }
+ public Advancer getBranchAdvancer(Decoder in, int b) {
+ if (b != this.branch)
+ throw new IllegalArgumentException("Branch much be " + branch + ", got " + b);
+ return advancer;
+ }
+ public Object next(Decoder in) throws IOException {
+ return advancer.next(in);
+ }
+ }
+
+
+
+
+ //// Records are particularly intricate because we may have to skip
+ //// fields, read fields out of order, and use default values.
+
+ /** Advancer for records. The {@link advancer} array contains an
+ * advancer for each field, ordered according writer (which
+ * determines the order in which data must be read). The {@link
+ * readerOrder} array tells you how those advancers line up with the
+ * reader's fields. Thus, the following is how to read a record:
+ * <pre>
+ * for (int i = 0; i < a.advancers.length; i++)
+ * dataum.set(a.readerOrder[i], a.advancers[i].next());
+ * </pre>
+ * As a convenience, {@link inOrder} is set to true iff the reader
+ * and writer order agrees (i.e., iff <code>readerOrder[i] ==
+ * i</code> for all i). Generated code can use this to optimize this
+ * common case. */
+ public static class Record extends Advancer {
+ public final Advancer[] advancers;
+ public final int[] readerOrder;
+ public final boolean inOrder;
+
+ private Record(Advancer[] advancers, int[] readerOrder, boolean inOrder) {
+ this.advancers = advancers;
+ this.readerOrder = readerOrder;
+ this.inOrder = inOrder;
+ }
+
+ public Record getRecordAdvancer(Decoder in) { return this; }
+
+ protected static Advancer from(Resolver.RecordAdjust ra) {
+ /** Two cases: reader + writer agree on order, vs disagree. */
+ /** This is the complicated case, since skipping is involved. */
+ /** Special subclasses of Advance will encapsulate skipping. */
+
+ // Compute the "readerOrder" argument to Advancer.Record constructor
+ int[] readOrder = new int[ra.readerOrder.length];
+ for (int i = 0; i < readOrder.length; i++) readOrder[i] = ra.readerOrder[i].pos();
+
+ // Compute the "advancers" argument to Advancer.Record constructor
+ Advancer[] fieldAdvs = new Advancer[readOrder.length];
+
+ int i = 0; // Index into ra.fieldActions
+ int rf = 0; // Index into readOrder
+ int nrf = 0; // Index into fieldAdvs
+
+ // Deal with any leading fields to be skipped
+ Schema[] firstSkips = collectSkips(ra.fieldActions, i);
+ if (firstSkips.length != 0) i += firstSkips.length;
+ else firstSkips = null;
+
+ // Deal with fields to be read
+ for ( ; i < ra.fieldActions.length; nrf++, rf++) {
+ Advancer fieldAdv = Advancer.from(ra.fieldActions[i]);
+ i++;
+ Schema[] toSkip = collectSkips(ra.fieldActions, i);
+ if (toSkip.length != 0) {
+ fieldAdv = new RecordField(fieldAdv, toSkip);
+ i += toSkip.length;
+ }
+ if (firstSkips != null) {
+ fieldAdv = new RecordFieldWithBefore(firstSkips, fieldAdv);
+ firstSkips = null;
+ }
+ fieldAdvs[nrf] = fieldAdv;
+ }
+
+ // If reader and writer orders agree, sort fieldAdvs by reader
+ // order (i.e., move defaults into the correct place), to allow
+ // decoders to have an optimized path for the common case of a
+ // record's field order not changing.
+ boolean inOrder = true;
+ for (int k = 0; k < ra.firstDefault-1; k++)
+ if (readOrder[k] > readOrder[k+1]) inOrder = false;
+ if (inOrder) {
+ Advancer[] newAdvancers = new Advancer[fieldAdvs.length];
+ for (int k = 0, rf2 = 0, df = ra.firstDefault; k < readOrder.length; k++) {
+ if (rf2 < df) newAdvancers[k] = fieldAdvs[rf2++];
+ else newAdvancers[k] = fieldAdvs[df++];
+ readOrder[k] = k;
+ }
+ newAdvancers = fieldAdvs;
+ }
+
+ return new Record(fieldAdvs, readOrder, inOrder);
+ }
+ }
+
+ private static class RecordField extends Advancer {
+ private final Advancer field;
+ private final Schema[] after;
+ public RecordField(Advancer field, Schema[] after) {
+ this.field = field;
+ this.after = after;
+ }
+
+ public Object next(Decoder in) throws IOException
+ { Object r = field.next(in); ignore(after, in); return r; }
+
+ public Object nextNull(Decoder in) throws IOException
+ { field.nextNull(in); ignore(after, in); return null; }
+
+ public boolean nextBoolean(Decoder in) throws IOException
+ { boolean r = field.nextBoolean(in); ignore(after, in); return r; }
+
+ public int nextInt(Decoder in) throws IOException
+ { int r = field.nextInt(in); ignore(after, in); return r; }
+
+ public long nextLong(Decoder in) throws IOException
+ { long r = field.nextLong(in); ignore(after, in); return r; }
+
+ public float nextFloat(Decoder in) throws IOException
+ { float r = field.nextFloat(in); ignore(after, in); return r; }
+
+ public double nextDouble(Decoder in) throws IOException
+ { double r = field.nextDouble(in); ignore(after, in); return r; }
+
+ public int nextEnum(Decoder in) throws IOException
+ { int r = field.nextEnum(in); ignore(after, in); return r; }
+
+ public String nextString(Decoder in) throws IOException
+ { String r = field.nextString(in); ignore(after, in); return r; }
+
+ public Utf8 nextString(Decoder in, Utf8 old) throws IOException {
+ Utf8 r = field.nextString(in,old);
+ ignore(after, in);
+ return r;
+ }
+
+ public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException {
+ ByteBuffer r = field.nextBytes(in,old);
+ ignore(after, in);
+ return r;
+ }
+
+ public byte[] nextFixed(Decoder in, byte[] bytes, int start, int len) throws IOException {
+ byte[] r = field.nextFixed(in, bytes, start, len);
+ ignore(after, in);
+ return r;
+ }
+
+ // TODO: THIS DOESN'T WORK!!
+ public Advancer getElementAdvancer(Decoder in) throws IOException
+ { Advancer r = field.getElementAdvancer(in); ignore(after, in); return r; }
+
+ // TODO: THIS DOESN'T WORK!!
+ public int nextIndex(Decoder in) throws IOException
+ { int r = field.nextIndex(in); ignore(after, in); return r; }
+
+ // TODO: THIS DOESN'T WORK!!
+ public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
+ { Advancer r = field.getBranchAdvancer(in, branch); ignore(after, in); return r; }
+
+ // TODO: THIS DOESN'T WORK!!
+ public Record getRecordAdvancer(Decoder in) throws IOException
+ { Record r = field.getRecordAdvancer(in); ignore(after, in); return r; }
+ }
+
+ private static class RecordFieldWithBefore extends Advancer {
+ private final Schema[] before;
+ private final Advancer field;
+ public RecordFieldWithBefore(Schema[] before, Advancer field) {
+ this.before = before;
+ this.field = field;
+ }
+
+ public Object next(Decoder in) throws IOException
+ { ignore(before, in); return field.next(in); }
+
+ public Object nextNull(Decoder in) throws IOException
+ { ignore(before, in); return field.nextNull(in); }
+
+ public boolean nextBoolean(Decoder in) throws IOException
+ { ignore(before, in); return field.nextBoolean(in); }
+
+ public int nextInt(Decoder in) throws IOException
+ { ignore(before, in); return field.nextInt(in); }
+
+ public long nextLong(Decoder in) throws IOException
+ { ignore(before, in); return field.nextLong(in); }
+
+ public float nextFloat(Decoder in) throws IOException
+ { ignore(before, in); return field.nextFloat(in); }
+
+ public double nextDouble(Decoder in) throws IOException
+ { ignore(before, in); return field.nextDouble(in); }
+
+ public int nextEnum(Decoder in) throws IOException
+ { ignore(before, in); return field.nextEnum(in); }
+
+ public String nextString(Decoder in) throws IOException
+ { ignore(before, in); return field.nextString(in); }
+
+ public Utf8 nextString(Decoder in, Utf8 old) throws IOException
+ { ignore(before, in); return field.nextString(in, old); }
+
+ public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException
+ { ignore(before, in); return field.nextBytes(in, old); }
+
+ public byte[] nextFixed(Decoder in, byte[] bytes, int start, int len) throws IOException
+ { ignore(before, in); return field.nextFixed(in, bytes, start, len); }
+
+ public Advancer getElementAdvancer(Decoder in) throws IOException
+ { ignore(before, in); return field.getElementAdvancer(in); }
+
+ public int nextIndex(Decoder in) throws IOException
+ { ignore(before, in); return field.nextIndex(in); }
+
+ public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException
+ { ignore(before, in); return field.getBranchAdvancer(in, branch); }
+
+ public Record getRecordAdvancer(Decoder in) throws IOException
+ { ignore(before, in); return field.getRecordAdvancer(in); }
+ }
+
+
+ private static class Default extends Advancer {
+ protected final Object val;
+ private Default(Object val) { this.val = val; }
+
+ public Object next(Decoder in) { return val; }
+ public Object nextNull(Decoder in) { return val; }
+ public boolean nextBoolean(Decoder in) { return (Boolean) val; }
+ public int nextInt(Decoder in) { return (Integer) val; }
+ public long nextLong(Decoder in) { return (Long) val; }
+ public float nextFloat(Decoder in) { return (Float) val; }
+ public double nextDouble(Decoder in) { return (Double) val; }
+ public int nextEnum(Decoder in) { return (Integer) val; }
+
+ // TODO -- finish for the rest of the types
+ }
+
+ private static void ignore(Schema[] toIgnore, Decoder in) throws IOException {
+ for (Schema s: toIgnore) skip(s, in);
+ }
+
+ // Probably belongs someplace else, although Decoder doesn't reference
+ // Schema, and Schema doesn't reference Decoder, and I'd hate to create
+ // new dependencies...
+ public static void skip(Schema s, Decoder in) throws IOException {
+ switch (s.getType()) {
+ case NULL: in.readNull(); break;
+ case BOOLEAN: in.readBoolean(); break;
+ case INT: in.readInt(); break;
+ case LONG: in.readLong(); break;
+ case FLOAT: in.readFloat(); break;
+ case DOUBLE: in.readDouble(); break;
+ case STRING: in.skipString(); break;
+ case BYTES: in.skipBytes(); break;
+ case FIXED: in.skipFixed(s.getFixedSize()); break;
+ case ENUM: in.readEnum(); break;
+ case UNION: skip(s.getTypes().get(in.readInt()), in); break;
+ case RECORD:
+ for (Schema.Field f: s.getFields())
+ skip(f.schema(), in);
+ break;
+ case ARRAY:
+ for (long i = in.skipArray(); i != 0; i = in.skipArray())
+ for (long j = 0; j < i; j++)
+ skip(s.getElementType(), in);
+ break;
+ case MAP:
+ for (long k = in.skipArray(); k != 0; k = in.skipArray())
+ for (long l = 0; l < k; l++) {
+ in.skipString(); // Key
+ skip(s.getValueType(), in);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type for schema: " + s);
+ }
+ }
+}