You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:22:48 UTC

[10/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
deleted file mode 100644
index 6d1c256..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ /dev/null
@@ -1,2525 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.BitFieldReader;
-import org.apache.orc.impl.DynamicByteArray;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.IntegerReader;
-import org.apache.orc.OrcProto;
-import org.apache.orc.impl.PositionProvider;
-import org.apache.orc.impl.RunLengthByteReader;
-import org.apache.orc.impl.RunLengthIntegerReader;
-import org.apache.orc.impl.RunLengthIntegerReaderV2;
-import org.apache.orc.impl.SerializationUtils;
-import org.apache.orc.impl.StreamName;
-
-/**
- * Factory for creating ORC tree readers.
- */
-public class TreeReaderFactory {
-
-  public abstract static class TreeReader {
-    protected final int columnId;
-    protected BitFieldReader present = null;
-    protected boolean valuePresent = false;
-    protected int vectorColumnCount;
-
-    TreeReader(int columnId) throws IOException {
-      this(columnId, null);
-    }
-
-    protected TreeReader(int columnId, InStream in) throws IOException {
-      this.columnId = columnId;
-      if (in == null) {
-        present = null;
-        valuePresent = true;
-      } else {
-        present = new BitFieldReader(in, 1);
-      }
-      vectorColumnCount = -1;
-    }
-
-    void setVectorColumnCount(int vectorColumnCount) {
-      this.vectorColumnCount = vectorColumnCount;
-    }
-
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
-        InStream in,
-        boolean signed, boolean skipCorrupt) throws IOException {
-      switch (kind) {
-        case DIRECT_V2:
-        case DICTIONARY_V2:
-          return new RunLengthIntegerReaderV2(in, signed, skipCorrupt);
-        case DIRECT:
-        case DICTIONARY:
-          return new RunLengthIntegerReader(in, signed);
-        default:
-          throw new IllegalArgumentException("Unknown encoding " + kind);
-      }
-    }
-
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      checkEncoding(stripeFooter.getColumnsList().get(columnId));
-      InStream in = streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.PRESENT));
-      if (in == null) {
-        present = null;
-        valuePresent = true;
-      } else {
-        present = new BitFieldReader(in, 1);
-      }
-    }
-
-    /**
-     * Seek to the given position.
-     *
-     * @param index the indexes loaded from the file
-     * @throws IOException
-     */
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    public void seek(PositionProvider index) throws IOException {
-      if (present != null) {
-        present.seek(index);
-      }
-    }
-
-    protected long countNonNulls(long rows) throws IOException {
-      if (present != null) {
-        long result = 0;
-        for (long c = 0; c < rows; ++c) {
-          if (present.next() == 1) {
-            result += 1;
-          }
-        }
-        return result;
-      } else {
-        return rows;
-      }
-    }
-
-    abstract void skipRows(long rows) throws IOException;
-
-    void readValuePresent() throws IOException {
-      if (present != null) {
-        valuePresent = present.next() == 1;
-      }
-    }
-
-    Object next(Object previous) throws IOException {
-      if (present != null) {
-        valuePresent = present.next() == 1;
-      }
-      return previous;
-    }
-
-    /**
-     * Called at the top level to read into the given batch.
-     * @param batch the batch to read into
-     * @param batchSize the number of rows to read
-     * @throws IOException
-     */
-    public void nextBatch(VectorizedRowBatch batch,
-                          int batchSize) throws IOException {
-      batch.cols[0].reset();
-      batch.cols[0].ensureSize(batchSize, false);
-      nextVector(batch.cols[0], null, batchSize);
-    }
-
-    /**
-     * Populates the isNull vector array in the previousVector object based on
-     * the present stream values. This function is called from all the child
-     * readers, and they all set the values based on isNull field value.
-     *
-     * @param previous The columnVector object whose isNull value is populated
-     * @param isNull Whether the each value was null at a higher level. If
-     *               isNull is null, all values are non-null.
-     * @param batchSize      Size of the column vector
-     * @throws IOException
-     */
-    public void nextVector(ColumnVector previous,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      if (present != null || isNull != null) {
-        // Set noNulls and isNull vector of the ColumnVector based on
-        // present stream
-        previous.noNulls = true;
-        boolean allNull = true;
-        for (int i = 0; i < batchSize; i++) {
-          if (isNull == null || !isNull[i]) {
-            if (present != null && present.next() != 1) {
-              previous.noNulls = false;
-              previous.isNull[i] = true;
-            } else {
-              previous.isNull[i] = false;
-              allNull = false;
-            }
-          } else {
-            previous.noNulls = false;
-            previous.isNull[i] = true;
-          }
-        }
-        previous.isRepeating = !previous.noNulls && allNull;
-      } else {
-        // There is no present stream, this means that all the values are
-        // present.
-        previous.noNulls = true;
-        for (int i = 0; i < batchSize; i++) {
-          previous.isNull[i] = false;
-        }
-      }
-    }
-
-    public BitFieldReader getPresent() {
-      return present;
-    }
-  }
-
-  public static class NullTreeReader extends TreeReader {
-
-    public NullTreeReader(int columnId) throws IOException {
-      super(columnId);
-    }
-
-    @Override
-    public void startStripe(Map<StreamName, InStream> streams,
-                            OrcProto.StripeFooter footer) {
-      // PASS
-    }
-
-    @Override
-    void skipRows(long rows) {
-      // PASS
-    }
-
-    @Override
-    public void seek(PositionProvider position) {
-      // PASS
-    }
-
-    @Override
-    public void seek(PositionProvider[] position) {
-      // PASS
-    }
-
-    @Override
-    Object next(Object previous) {
-      return null;
-    }
-
-    @Override
-    public void nextVector(ColumnVector vector, boolean[] isNull, final int batchSize) {
-      vector.noNulls = false;
-      vector.isNull[0] = true;
-      vector.isRepeating = true;
-    }
-  }
-
-  public static class BooleanTreeReader extends TreeReader {
-    protected BitFieldReader reader = null;
-
-    BooleanTreeReader(int columnId) throws IOException {
-      this(columnId, null, null);
-    }
-
-    protected BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
-      super(columnId, present);
-      if (data != null) {
-        reader = new BitFieldReader(data, 1);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      reader = new BitFieldReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA)), 1);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      BooleanWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new BooleanWritable();
-        } else {
-          result = (BooleanWritable) previous;
-        }
-        result.set(reader.next() == 1);
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      LongColumnVector result = (LongColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-    }
-  }
-
-  public static class ByteTreeReader extends TreeReader {
-    protected RunLengthByteReader reader = null;
-
-    ByteTreeReader(int columnId) throws IOException {
-      this(columnId, null, null);
-    }
-
-    protected ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
-      super(columnId, present);
-      this.reader = new RunLengthByteReader(data);
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      reader = new RunLengthByteReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA)));
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      ByteWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new ByteWritable();
-        } else {
-          result = (ByteWritable) previous;
-        }
-        result.set(reader.next());
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final LongColumnVector result = (LongColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      // Read value entries based on isNull entries
-      reader.nextVector(result, result.vector, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-  }
-
-  public static class ShortTreeReader extends TreeReader {
-    protected IntegerReader reader = null;
-
-    ShortTreeReader(int columnId) throws IOException {
-      this(columnId, null, null, null);
-    }
-
-    protected ShortTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding encoding)
-        throws IOException {
-      super(columnId, present);
-      if (data != null && encoding != null) {
-        checkEncoding(encoding);
-        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(name), true, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      ShortWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new ShortWritable();
-        } else {
-          result = (ShortWritable) previous;
-        }
-        result.set((short) reader.next());
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final LongColumnVector result = (LongColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      // Read value entries based on isNull entries
-      reader.nextVector(result, result.vector, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-  }
-
-  public static class IntTreeReader extends TreeReader {
-    protected IntegerReader reader = null;
-
-    IntTreeReader(int columnId) throws IOException {
-      this(columnId, null, null, null);
-    }
-
-    protected IntTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding encoding)
-        throws IOException {
-      super(columnId, present);
-      if (data != null && encoding != null) {
-        checkEncoding(encoding);
-        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(name), true, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      IntWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new IntWritable();
-        } else {
-          result = (IntWritable) previous;
-        }
-        result.set((int) reader.next());
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final LongColumnVector result = (LongColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      // Read value entries based on isNull entries
-      reader.nextVector(result, result.vector, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-  }
-
-  public static class LongTreeReader extends TreeReader {
-    protected IntegerReader reader = null;
-
-    LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
-      this(columnId, null, null, null, skipCorrupt);
-    }
-
-    protected LongTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding encoding,
-        boolean skipCorrupt)
-        throws IOException {
-      super(columnId, present);
-      if (data != null && encoding != null) {
-        checkEncoding(encoding);
-        this.reader = createIntegerReader(encoding.getKind(), data, true, skipCorrupt);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(name), true, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      LongWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new LongWritable();
-        } else {
-          result = (LongWritable) previous;
-        }
-        result.set(reader.next());
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final LongColumnVector result = (LongColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      // Read value entries based on isNull entries
-      reader.nextVector(result, result.vector, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-  }
-
-  public static class FloatTreeReader extends TreeReader {
-    protected InStream stream;
-    private final SerializationUtils utils;
-
-    FloatTreeReader(int columnId) throws IOException {
-      this(columnId, null, null);
-    }
-
-    protected FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
-      super(columnId, present);
-      this.utils = new SerializationUtils();
-      this.stream = data;
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      stream = streams.get(name);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      stream.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      FloatWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new FloatWritable();
-        } else {
-          result = (FloatWritable) previous;
-        }
-        result.set(utils.readFloat(stream));
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      final boolean hasNulls = !result.noNulls;
-      boolean allNulls = hasNulls;
-
-      if (hasNulls) {
-        // conditions to ensure bounds checks skips
-        for (int i = 0; batchSize <= result.isNull.length && i < batchSize; i++) {
-          allNulls = allNulls & result.isNull[i];
-        }
-        if (allNulls) {
-          result.vector[0] = Double.NaN;
-          result.isRepeating = true;
-        } else {
-          // some nulls
-          result.isRepeating = false;
-          // conditions to ensure bounds checks skips
-          for (int i = 0; batchSize <= result.isNull.length
-              && batchSize <= result.vector.length && i < batchSize; i++) {
-            if (!result.isNull[i]) {
-              result.vector[i] = utils.readFloat(stream);
-            } else {
-              // If the value is not present then set NaN
-              result.vector[i] = Double.NaN;
-            }
-          }
-        }
-      } else {
-        // no nulls & > 1 row (check repeating)
-        boolean repeating = (batchSize > 1);
-        final float f1 = utils.readFloat(stream);
-        result.vector[0] = f1;
-        // conditions to ensure bounds checks skips
-        for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
-          final float f2 = utils.readFloat(stream);
-          repeating = repeating && (f1 == f2);
-          result.vector[i] = f2;
-        }
-        result.isRepeating = repeating;
-      }
-    }
-
-    @Override
-    protected void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      for (int i = 0; i < items; ++i) {
-        utils.readFloat(stream);
-      }
-    }
-  }
-
-  public static class DoubleTreeReader extends TreeReader {
-    protected InStream stream;
-    private final SerializationUtils utils;
-
-    DoubleTreeReader(int columnId) throws IOException {
-      this(columnId, null, null);
-    }
-
-    protected DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
-      super(columnId, present);
-      this.utils = new SerializationUtils();
-      this.stream = data;
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name =
-          new StreamName(columnId,
-              OrcProto.Stream.Kind.DATA);
-      stream = streams.get(name);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      stream.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      DoubleWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new DoubleWritable();
-        } else {
-          result = (DoubleWritable) previous;
-        }
-        result.set(utils.readDouble(stream));
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      final boolean hasNulls = !result.noNulls;
-      boolean allNulls = hasNulls;
-
-      if (hasNulls) {
-        // conditions to ensure bounds checks skips
-        for (int i = 0; i < batchSize && batchSize <= result.isNull.length; i++) {
-          allNulls = allNulls & result.isNull[i];
-        }
-        if (allNulls) {
-          result.vector[0] = Double.NaN;
-          result.isRepeating = true;
-        } else {
-          // some nulls
-          result.isRepeating = false;
-          // conditions to ensure bounds checks skips
-          for (int i = 0; batchSize <= result.isNull.length
-              && batchSize <= result.vector.length && i < batchSize; i++) {
-            if (!result.isNull[i]) {
-              result.vector[i] = utils.readDouble(stream);
-            } else {
-              // If the value is not present then set NaN
-              result.vector[i] = Double.NaN;
-            }
-          }
-        }
-      } else {
-        // no nulls
-        boolean repeating = (batchSize > 1);
-        final double d1 = utils.readDouble(stream);
-        result.vector[0] = d1;
-        // conditions to ensure bounds checks skips
-        for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
-          final double d2 = utils.readDouble(stream);
-          repeating = repeating && (d1 == d2);
-          result.vector[i] = d2;
-        }
-        result.isRepeating = repeating;
-      }
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      long len = items * 8;
-      while (len > 0) {
-        len -= stream.skip(len);
-      }
-    }
-  }
-
-  public static class BinaryTreeReader extends TreeReader {
-    protected InStream stream;
-    protected IntegerReader lengths = null;
-    protected final LongColumnVector scratchlcv;
-
-    BinaryTreeReader(int columnId) throws IOException {
-      this(columnId, null, null, null, null);
-    }
-
-    protected BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
-        OrcProto.ColumnEncoding encoding) throws IOException {
-      super(columnId, present);
-      scratchlcv = new LongColumnVector();
-      this.stream = data;
-      if (length != null && encoding != null) {
-        checkEncoding(encoding);
-        this.lengths = createIntegerReader(encoding.getKind(), length, false, false);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      stream = streams.get(name);
-      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      stream.seek(index);
-      lengths.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      BytesWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new BytesWritable();
-        } else {
-          result = (BytesWritable) previous;
-        }
-        int len = (int) lengths.next();
-        result.setSize(len);
-        int offset = 0;
-        while (len > 0) {
-          int written = stream.read(result.getBytes(), offset, len);
-          if (written < 0) {
-            throw new EOFException("Can't finish byte read from " + stream);
-          }
-          len -= written;
-          offset += written;
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final BytesColumnVector result = (BytesColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      long lengthToSkip = 0;
-      for (int i = 0; i < items; ++i) {
-        lengthToSkip += lengths.next();
-      }
-      while (lengthToSkip > 0) {
-        lengthToSkip -= stream.skip(lengthToSkip);
-      }
-    }
-  }
-
-  public static class TimestampTreeReader extends TreeReader {
-    protected IntegerReader data = null;
-    protected IntegerReader nanos = null;
-    private final boolean skipCorrupt;
-    private Map<String, Long> baseTimestampMap;
-    private long base_timestamp;
-    private final TimeZone readerTimeZone;
-    private TimeZone writerTimeZone;
-    private boolean hasSameTZRules;
-
-    TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
-      this(columnId, null, null, null, null, skipCorrupt);
-    }
-
-    protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
-        InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt)
-        throws IOException {
-      super(columnId, presentStream);
-      this.skipCorrupt = skipCorrupt;
-      this.baseTimestampMap = new HashMap<>();
-      this.readerTimeZone = TimeZone.getDefault();
-      this.writerTimeZone = readerTimeZone;
-      this.hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
-      this.base_timestamp = getBaseTimestamp(readerTimeZone.getID());
-      if (encoding != null) {
-        checkEncoding(encoding);
-
-        if (dataStream != null) {
-          this.data = createIntegerReader(encoding.getKind(), dataStream, true, skipCorrupt);
-        }
-
-        if (nanosStream != null) {
-          this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, skipCorrupt);
-        }
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      data = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.DATA)), true, skipCorrupt);
-      nanos = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.SECONDARY)), false, skipCorrupt);
-      base_timestamp = getBaseTimestamp(stripeFooter.getWriterTimezone());
-    }
-
-    private long getBaseTimestamp(String timeZoneId) throws IOException {
-      // to make sure new readers read old files in the same way
-      if (timeZoneId == null || timeZoneId.isEmpty()) {
-        timeZoneId = readerTimeZone.getID();
-      }
-
-      if (!baseTimestampMap.containsKey(timeZoneId)) {
-        writerTimeZone = TimeZone.getTimeZone(timeZoneId);
-        hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        sdf.setTimeZone(writerTimeZone);
-        try {
-          long epoch =
-              sdf.parse(WriterImpl.BASE_TIMESTAMP_STRING).getTime() / WriterImpl.MILLIS_PER_SECOND;
-          baseTimestampMap.put(timeZoneId, epoch);
-          return epoch;
-        } catch (ParseException e) {
-          throw new IOException("Unable to create base timestamp", e);
-        } finally {
-          sdf.setTimeZone(readerTimeZone);
-        }
-      }
-
-      return baseTimestampMap.get(timeZoneId);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      data.seek(index);
-      nanos.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      TimestampWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new TimestampWritable();
-        } else {
-          result = (TimestampWritable) previous;
-        }
-        long millis = (data.next() + base_timestamp) * WriterImpl.MILLIS_PER_SECOND;
-        int newNanos = parseNanos(nanos.next());
-        // fix the rounding when we divided by 1000.
-        if (millis >= 0) {
-          millis += newNanos / WriterImpl.NANOS_PER_MILLI;
-        } else {
-          millis -= newNanos / WriterImpl.NANOS_PER_MILLI;
-        }
-        long offset = 0;
-        // If reader and writer time zones have different rules, adjust the timezone difference
-        // between reader and writer taking day light savings into account.
-        if (!hasSameTZRules) {
-          offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
-        }
-        long adjustedMillis = millis + offset;
-        Timestamp ts = new Timestamp(adjustedMillis);
-        // Sometimes the reader timezone might have changed after adding the adjustedMillis.
-        // To account for that change, check for any difference in reader timezone after
-        // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
-        if (!hasSameTZRules &&
-            (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
-          long newOffset =
-              writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
-          adjustedMillis = millis + newOffset;
-          ts.setTime(adjustedMillis);
-        }
-        ts.setNanos(newNanos);
-        result.set(ts);
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      TimestampColumnVector result = (TimestampColumnVector) previousVector;
-      super.nextVector(previousVector, isNull, batchSize);
-
-      for (int i = 0; i < batchSize; i++) {
-        if (result.noNulls || !result.isNull[i]) {
-          long millis = data.next() + base_timestamp;
-          int newNanos = parseNanos(nanos.next());
-          if (millis < 0 && newNanos != 0) {
-            millis -= 1;
-          }
-          millis *= WriterImpl.MILLIS_PER_SECOND;
-          long offset = 0;
-          // If reader and writer time zones have different rules, adjust the timezone difference
-          // between reader and writer taking day light savings into account.
-          if (!hasSameTZRules) {
-            offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
-          }
-          long adjustedMillis = millis + offset;
-          // Sometimes the reader timezone might have changed after adding the adjustedMillis.
-          // To account for that change, check for any difference in reader timezone after
-          // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
-          if (!hasSameTZRules &&
-              (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
-            long newOffset =
-                writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
-            adjustedMillis = millis + newOffset;
-          }
-          result.time[i] = adjustedMillis;
-          result.nanos[i] = newNanos;
-          if (result.isRepeating && i != 0 &&
-              (result.time[0] != result.time[i] ||
-                  result.nanos[0] != result.nanos[i])) {
-            result.isRepeating = false;
-          }
-        }
-      }
-    }
-
-    private static int parseNanos(long serialized) {
-      int zeros = 7 & (int) serialized;
-      int result = (int) (serialized >>> 3);
-      if (zeros != 0) {
-        for (int i = 0; i <= zeros; ++i) {
-          result *= 10;
-        }
-      }
-      return result;
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      data.skip(items);
-      nanos.skip(items);
-    }
-  }
-
-  public static class DateTreeReader extends TreeReader {
-    protected IntegerReader reader = null;
-
-    DateTreeReader(int columnId) throws IOException {
-      this(columnId, null, null, null);
-    }
-
-    protected DateTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding encoding) throws IOException {
-      super(columnId, present);
-      if (data != null && encoding != null) {
-        checkEncoding(encoding);
-        reader = createIntegerReader(encoding.getKind(), data, true, false);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(name), true, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      DateWritable result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new DateWritable();
-        } else {
-          result = (DateWritable) previous;
-        }
-        result.set((int) reader.next());
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final LongColumnVector result = (LongColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      // Read value entries based on isNull entries
-      reader.nextVector(result, result.vector, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-  }
-
-  public static class DecimalTreeReader extends TreeReader {
-    protected InStream valueStream;
-    protected IntegerReader scaleReader = null;
-    private int[] scratchScaleVector;
-
-    private final int precision;
-    private final int scale;
-
-    DecimalTreeReader(int columnId, int precision, int scale) throws IOException {
-      this(columnId, precision, scale, null, null, null, null);
-    }
-
-    protected DecimalTreeReader(int columnId, int precision, int scale, InStream present,
-        InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding)
-        throws IOException {
-      super(columnId, present);
-      this.precision = precision;
-      this.scale = scale;
-      this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
-      this.valueStream = valueStream;
-      if (scaleStream != null && encoding != null) {
-        checkEncoding(encoding);
-        this.scaleReader = createIntegerReader(encoding.getKind(), scaleStream, true, false);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      valueStream = streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA));
-      scaleReader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      valueStream.seek(index);
-      scaleReader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      final HiveDecimalWritable result;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new HiveDecimalWritable();
-        } else {
-          result = (HiveDecimalWritable) previous;
-        }
-        result.set(HiveDecimal.create(SerializationUtils.readBigInteger
-                (valueStream), (int) scaleReader.next()));
-        return HiveDecimalWritable.enforcePrecisionScale(result, precision,
-            scale);
-      }
-      return null;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final DecimalColumnVector result = (DecimalColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      if (batchSize > scratchScaleVector.length) {
-        scratchScaleVector = new int[(int) batchSize];
-      }
-      scaleReader.nextVector(result, scratchScaleVector, batchSize);
-      // Read value entries based on isNull entries
-      if (result.noNulls) {
-        for (int r=0; r < batchSize; ++r) {
-          BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-          HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
-          result.set(r, dec);
-        }
-      } else if (!result.isRepeating || !result.isNull[0]) {
-        for (int r=0; r < batchSize; ++r) {
-          if (!result.isNull[r]) {
-            BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-            HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
-            result.set(r, dec);
-          }
-        }
-      }
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      for (int i = 0; i < items; i++) {
-        SerializationUtils.readBigInteger(valueStream);
-      }
-      scaleReader.skip(items);
-    }
-  }
-
-  /**
-   * A tree reader that will read string columns. At the start of the
-   * stripe, it creates an internal reader based on whether a direct or
-   * dictionary encoding was used.
-   */
-  public static class StringTreeReader extends TreeReader {
-    protected TreeReader reader;
-
-    StringTreeReader(int columnId) throws IOException {
-      super(columnId);
-    }
-
-    protected StringTreeReader(int columnId, InStream present, InStream data, InStream length,
-        InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
-      super(columnId, present);
-      if (encoding != null) {
-        switch (encoding.getKind()) {
-          case DIRECT:
-          case DIRECT_V2:
-            reader = new StringDirectTreeReader(columnId, present, data, length,
-                encoding.getKind());
-            break;
-          case DICTIONARY:
-          case DICTIONARY_V2:
-            reader = new StringDictionaryTreeReader(columnId, present, data, length, dictionary,
-                encoding);
-            break;
-          default:
-            throw new IllegalArgumentException("Unsupported encoding " +
-                encoding.getKind());
-        }
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      reader.checkEncoding(encoding);
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      // For each stripe, checks the encoding and initializes the appropriate
-      // reader
-      switch (stripeFooter.getColumnsList().get(columnId).getKind()) {
-        case DIRECT:
-        case DIRECT_V2:
-          reader = new StringDirectTreeReader(columnId);
-          break;
-        case DICTIONARY:
-        case DICTIONARY_V2:
-          reader = new StringDictionaryTreeReader(columnId);
-          break;
-        default:
-          throw new IllegalArgumentException("Unsupported encoding " +
-              stripeFooter.getColumnsList().get(columnId).getKind());
-      }
-      reader.startStripe(streams, stripeFooter);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      reader.seek(index);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      return reader.next(previous);
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      reader.nextVector(previousVector, isNull, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skipRows(items);
-    }
-  }
-
-  // This class collects together very similar methods for reading an ORC vector of byte arrays and
-  // creating the BytesColumnVector.
-  //
-  public static class BytesColumnVectorUtil {
-
-    private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths,
-        LongColumnVector scratchlcv,
-        BytesColumnVector result, final int batchSize) throws IOException {
-      // Read lengths
-      scratchlcv.isNull = result.isNull;  // Notice we are replacing the isNull vector here...
-      lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
-      int totalLength = 0;
-      if (!scratchlcv.isRepeating) {
-        for (int i = 0; i < batchSize; i++) {
-          if (!scratchlcv.isNull[i]) {
-            totalLength += (int) scratchlcv.vector[i];
-          }
-        }
-      } else {
-        if (!scratchlcv.isNull[0]) {
-          totalLength = (int) (batchSize * scratchlcv.vector[0]);
-        }
-      }
-
-      // Read all the strings for this batch
-      byte[] allBytes = new byte[totalLength];
-      int offset = 0;
-      int len = totalLength;
-      while (len > 0) {
-        int bytesRead = stream.read(allBytes, offset, len);
-        if (bytesRead < 0) {
-          throw new EOFException("Can't finish byte read from " + stream);
-        }
-        len -= bytesRead;
-        offset += bytesRead;
-      }
-
-      return allBytes;
-    }
-
-    // This method has the common code for reading in bytes into a BytesColumnVector.
-    public static void readOrcByteArrays(InStream stream,
-                                         IntegerReader lengths,
-                                         LongColumnVector scratchlcv,
-                                         BytesColumnVector result,
-                                         final int batchSize) throws IOException {
-      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
-            result, (int) batchSize);
-
-        // Too expensive to figure out 'repeating' by comparisons.
-        result.isRepeating = false;
-        int offset = 0;
-        if (!scratchlcv.isRepeating) {
-          for (int i = 0; i < batchSize; i++) {
-            if (!scratchlcv.isNull[i]) {
-              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
-              offset += scratchlcv.vector[i];
-            } else {
-              result.setRef(i, allBytes, 0, 0);
-            }
-          }
-        } else {
-          for (int i = 0; i < batchSize; i++) {
-            if (!scratchlcv.isNull[i]) {
-              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
-              offset += scratchlcv.vector[0];
-            } else {
-              result.setRef(i, allBytes, 0, 0);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * A reader for string columns that are direct encoded in the current
-   * stripe.
-   */
-  public static class StringDirectTreeReader extends TreeReader {
-    protected InStream stream;
-    protected TextReaderShim data;
-    protected IntegerReader lengths;
-    private final LongColumnVector scratchlcv;
-
-    StringDirectTreeReader(int columnId) throws IOException {
-      this(columnId, null, null, null, null);
-    }
-
-    protected StringDirectTreeReader(int columnId, InStream present, InStream data,
-        InStream length, OrcProto.ColumnEncoding.Kind encoding) throws IOException {
-      super(columnId, present);
-      this.scratchlcv = new LongColumnVector();
-      this.stream = data;
-      if (length != null && encoding != null) {
-        this.lengths = createIntegerReader(encoding, length, false, false);
-        this.data = ShimLoader.getHadoopShims().getTextReaderShim(this.stream);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
-          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA);
-      stream = streams.get(name);
-      data = ShimLoader.getHadoopShims().getTextReaderShim(this.stream);
-      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
-          false, false);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      stream.seek(index);
-      // don't seek data stream
-      lengths.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      Text result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new Text();
-        } else {
-          result = (Text) previous;
-        }
-        int len = (int) lengths.next();
-        data.read(result, len);
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final BytesColumnVector result = (BytesColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
-          result, batchSize);
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      long lengthToSkip = 0;
-      for (int i = 0; i < items; ++i) {
-        lengthToSkip += lengths.next();
-      }
-
-      while (lengthToSkip > 0) {
-        lengthToSkip -= stream.skip(lengthToSkip);
-      }
-    }
-
-    public IntegerReader getLengths() {
-      return lengths;
-    }
-
-    public InStream getStream() {
-      return stream;
-    }
-  }
-
-  /**
-   * A reader for string columns that are dictionary encoded in the current
-   * stripe.
-   */
-  public static class StringDictionaryTreeReader extends TreeReader {
-    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
-    private DynamicByteArray dictionaryBuffer;
-    private int[] dictionaryOffsets;
-    protected IntegerReader reader;
-
-    private byte[] dictionaryBufferInBytesCache = null;
-    private final LongColumnVector scratchlcv;
-
-    StringDictionaryTreeReader(int columnId) throws IOException {
-      this(columnId, null, null, null, null, null);
-    }
-
-    protected StringDictionaryTreeReader(int columnId, InStream present, InStream data,
-        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding)
-        throws IOException {
-      super(columnId, present);
-      scratchlcv = new LongColumnVector();
-      if (data != null && encoding != null) {
-        this.reader = createIntegerReader(encoding.getKind(), data, false, false);
-      }
-
-      if (dictionary != null && encoding != null) {
-        readDictionaryStream(dictionary);
-      }
-
-      if (length != null && encoding != null) {
-        readDictionaryLengthStream(length, encoding);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
-          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-
-      // read the dictionary blob
-      StreamName name = new StreamName(columnId,
-          OrcProto.Stream.Kind.DICTIONARY_DATA);
-      InStream in = streams.get(name);
-      readDictionaryStream(in);
-
-      // read the lengths
-      name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
-      in = streams.get(name);
-      readDictionaryLengthStream(in, stripeFooter.getColumnsList().get(columnId));
-
-      // set up the row reader
-      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(name), false, false);
-    }
-
-    private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding)
-        throws IOException {
-      int dictionarySize = encoding.getDictionarySize();
-      if (in != null) { // Guard against empty LENGTH stream.
-        IntegerReader lenReader = createIntegerReader(encoding.getKind(), in, false, false);
-        int offset = 0;
-        if (dictionaryOffsets == null ||
-            dictionaryOffsets.length < dictionarySize + 1) {
-          dictionaryOffsets = new int[dictionarySize + 1];
-        }
-        for (int i = 0; i < dictionarySize; ++i) {
-          dictionaryOffsets[i] = offset;
-          offset += (int) lenReader.next();
-        }
-        dictionaryOffsets[dictionarySize] = offset;
-        in.close();
-      }
-
-    }
-
-    private void readDictionaryStream(InStream in) throws IOException {
-      if (in != null) { // Guard against empty dictionary stream.
-        if (in.available() > 0) {
-          dictionaryBuffer = new DynamicByteArray(64, in.available());
-          dictionaryBuffer.readAll(in);
-          // Since its start of strip invalidate the cache.
-          dictionaryBufferInBytesCache = null;
-        }
-        in.close();
-      } else {
-        dictionaryBuffer = null;
-      }
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
-      reader.seek(index);
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      Text result = null;
-      if (valuePresent) {
-        int entry = (int) reader.next();
-        if (previous == null) {
-          result = new Text();
-        } else {
-          result = (Text) previous;
-        }
-        int offset = dictionaryOffsets[entry];
-        int length = getDictionaryEntryLength(entry, offset);
-        // If the column is just empty strings, the size will be zero,
-        // so the buffer will be null, in that case just return result
-        // as it will default to empty
-        if (dictionaryBuffer != null) {
-          dictionaryBuffer.setText(result, offset, length);
-        } else {
-          result.clear();
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final BytesColumnVector result = (BytesColumnVector) previousVector;
-      int offset;
-      int length;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
-      if (dictionaryBuffer != null) {
-
-        // Load dictionaryBuffer into cache.
-        if (dictionaryBufferInBytesCache == null) {
-          dictionaryBufferInBytesCache = dictionaryBuffer.get();
-        }
-
-        // Read string offsets
-        scratchlcv.isNull = result.isNull;
-        scratchlcv.ensureSize((int) batchSize, false);
-        reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
-        if (!scratchlcv.isRepeating) {
-
-          // The vector has non-repeating strings. Iterate thru the batch
-          // and set strings one by one
-          for (int i = 0; i < batchSize; i++) {
-            if (!scratchlcv.isNull[i]) {
-              offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
-              length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
-              result.setRef(i, dictionaryBufferInBytesCache, offset, length);
-            } else {
-              // If the value is null then set offset and length to zero (null string)
-              result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
-            }
-          }
-        } else {
-          // If the value is repeating then just set the first value in the
-          // vector and set the isRepeating flag to true. No need to iterate thru and
-          // set all the elements to the same value
-          offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
-          length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
-          result.setRef(0, dictionaryBufferInBytesCache, offset, length);
-        }
-        result.isRepeating = scratchlcv.isRepeating;
-      } else {
-        if (dictionaryOffsets == null) {
-          // Entire stripe contains null strings.
-          result.isRepeating = true;
-          result.noNulls = false;
-          result.isNull[0] = true;
-          result.setRef(0, EMPTY_BYTE_ARRAY, 0, 0);
-        } else {
-          // stripe contains nulls and empty strings
-          for (int i = 0; i < batchSize; i++) {
-            if (!result.isNull[i]) {
-              result.setRef(i, EMPTY_BYTE_ARRAY, 0, 0);
-            }
-          }
-        }
-      }
-    }
-
-    int getDictionaryEntryLength(int entry, int offset) {
-      final int length;
-      // if it isn't the last entry, subtract the offsets otherwise use
-      // the buffer length.
-      if (entry < dictionaryOffsets.length - 1) {
-        length = dictionaryOffsets[entry + 1] - offset;
-      } else {
-        length = dictionaryBuffer.size() - offset;
-      }
-      return length;
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      reader.skip(countNonNulls(items));
-    }
-
-    public IntegerReader getReader() {
-      return reader;
-    }
-  }
-
-  public static class CharTreeReader extends StringTreeReader {
-    int maxLength;
-
-    CharTreeReader(int columnId, int maxLength) throws IOException {
-      this(columnId, maxLength, null, null, null, null, null);
-    }
-
-    protected CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
-        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
-      super(columnId, present, data, length, dictionary, encoding);
-      this.maxLength = maxLength;
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      final HiveCharWritable result;
-      if (previous == null) {
-        result = new HiveCharWritable();
-      } else {
-        result = (HiveCharWritable) previous;
-      }
-      // Use the string reader implementation to populate the internal Text value
-      Object textVal = super.next(result.getTextValue());
-      if (textVal == null) {
-        return null;
-      }
-      // result should now hold the value that was read in.
-      // enforce char length
-      result.enforceMaxLength(maxLength);
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      // Get the vector of strings from StringTreeReader, then make a 2nd pass to
-      // adjust down the length (right trim and truncate) if necessary.
-      super.nextVector(previousVector, isNull, batchSize);
-      BytesColumnVector result = (BytesColumnVector) previousVector;
-      int adjustedDownLen;
-      if (result.isRepeating) {
-        if (result.noNulls || !result.isNull[0]) {
-          adjustedDownLen = StringExpr
-              .rightTrimAndTruncate(result.vector[0], result.start[0], result.length[0], maxLength);
-          if (adjustedDownLen < result.length[0]) {
-            result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
-          }
-        }
-      } else {
-        if (result.noNulls) {
-          for (int i = 0; i < batchSize; i++) {
-            adjustedDownLen = StringExpr
-                .rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i],
-                    maxLength);
-            if (adjustedDownLen < result.length[i]) {
-              result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
-            }
-          }
-        } else {
-          for (int i = 0; i < batchSize; i++) {
-            if (!result.isNull[i]) {
-              adjustedDownLen = StringExpr
-                  .rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i],
-                      maxLength);
-              if (adjustedDownLen < result.length[i]) {
-                result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  public static class VarcharTreeReader extends StringTreeReader {
-    int maxLength;
-
-    VarcharTreeReader(int columnId, int maxLength) throws IOException {
-      this(columnId, maxLength, null, null, null, null, null);
-    }
-
-    protected VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
-        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
-      super(columnId, present, data, length, dictionary, encoding);
-      this.maxLength = maxLength;
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      final HiveVarcharWritable result;
-      if (previous == null) {
-        result = new HiveVarcharWritable();
-      } else {
-        result = (HiveVarcharWritable) previous;
-      }
-      // Use the string reader implementation to populate the internal Text value
-      Object textVal = super.next(result.getTextValue());
-      if (textVal == null) {
-        return null;
-      }
-      // result should now hold the value that was read in.
-      // enforce varchar length
-      result.enforceMaxLength(maxLength);
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      // Get the vector of strings from StringTreeReader, then make a 2nd pass to
-      // adjust down the length (truncate) if necessary.
-      super.nextVector(previousVector, isNull, batchSize);
-      BytesColumnVector result = (BytesColumnVector) previousVector;
-
-      int adjustedDownLen;
-      if (result.isRepeating) {
-        if (result.noNulls || !result.isNull[0]) {
-          adjustedDownLen = StringExpr
-              .truncate(result.vector[0], result.start[0], result.length[0], maxLength);
-          if (adjustedDownLen < result.length[0]) {
-            result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
-          }
-        }
-      } else {
-        if (result.noNulls) {
-          for (int i = 0; i < batchSize; i++) {
-            adjustedDownLen = StringExpr
-                .truncate(result.vector[i], result.start[i], result.length[i], maxLength);
-            if (adjustedDownLen < result.length[i]) {
-              result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
-            }
-          }
-        } else {
-          for (int i = 0; i < batchSize; i++) {
-            if (!result.isNull[i]) {
-              adjustedDownLen = StringExpr
-                  .truncate(result.vector[i], result.start[i], result.length[i], maxLength);
-              if (adjustedDownLen < result.length[i]) {
-                result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  protected static class StructTreeReader extends TreeReader {
-    protected final TreeReader[] fields;
-
-    protected StructTreeReader(int columnId,
-                               TypeDescription readerSchema,
-                               SchemaEvolution evolution,
-                               boolean[] included,
-                               boolean skipCorrupt) throws IOException {
-      super(columnId);
-
-      TypeDescription fileSchema = evolution.getFileType(readerSchema);
-
-      List<TypeDescription> childrenTypes = readerSchema.getChildren();
-      this.fields = new TreeReader[childrenTypes.size()];
-      for (int i = 0; i < fields.length; ++i) {
-        TypeDescription subtype = childrenTypes.get(i);
-        this.fields[i] = createTreeReader(subtype, evolution, included, skipCorrupt);
-      }
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      super.seek(index);
-      for (TreeReader kid : fields) {
-        if (kid != null) {
-          kid.seek(index);
-        }
-      }
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      OrcStruct result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new OrcStruct(fields.length);
-        } else {
-          result = (OrcStruct) previous;
-
-          // If the input format was initialized with a file with a
-          // different number of fields, the number of fields needs to
-          // be updated to the correct number
-          result.setNumFields(fields.length);
-        }
-        for (int i = 0; i < fields.length; ++i) {
-          if (fields[i] != null) {
-            result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
-          }
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public void nextBatch(VectorizedRowBatch batch,
-                          int batchSize) throws IOException {
-      for(int i=0; i < fields.length &&
-          (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
-        batch.cols[i].reset();
-        batch.cols[i].ensureSize((int) batchSize, false);
-        fields[i].nextVector(batch.cols[i], null, batchSize);
-      }
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      super.nextVector(previousVector, isNull, batchSize);
-      StructColumnVector result = (StructColumnVector) previousVector;
-      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        result.isRepeating = false;
-
-        // Read all the members of struct as column vectors
-        boolean[] mask = result.noNulls ? null : result.isNull;
-        for (int f = 0; f < fields.length; f++) {
-          if (fields[f] != null) {
-            fields[f].nextVector(result.fields[f], mask, batchSize);
-          }
-        }
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      for (TreeReader field : fields) {
-        if (field != null) {
-          field.startStripe(streams, stripeFooter);
-        }
-      }
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      for (TreeReader field : fields) {
-        if (field != null) {
-          field.skipRows(items);
-        }
-      }
-    }
-  }
-
-  public static class UnionTreeReader extends TreeReader {
-    protected final TreeReader[] fields;
-    protected RunLengthByteReader tags;
-
-    protected UnionTreeReader(int fileColumn,
-                              TypeDescription readerSchema,
-                              SchemaEvolution evolution,
-                              boolean[] included,
-                              boolean skipCorrupt) throws IOException {
-      super(fileColumn);
-      List<TypeDescription> childrenTypes = readerSchema.getChildren();
-      int fieldCount = childrenTypes.size();
-      this.fields = new TreeReader[fieldCount];
-      for (int i = 0; i < fieldCount; ++i) {
-        TypeDescription subtype = childrenTypes.get(i);
-        this.fields[i] = createTreeReader(subtype, evolution, included, skipCorrupt);
-      }
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      super.seek(index);
-      tags.seek(index[columnId]);
-      for (TreeReader kid : fields) {
-        kid.seek(index);
-      }
-    }
-
-    @Override
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      OrcUnion result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new OrcUnion();
-        } else {
-          result = (OrcUnion) previous;
-        }
-        byte tag = tags.next();
-        Object previousVal = result.getObject();
-        result.set(tag, fields[tag].next(tag == result.getTag() ?
-            previousVal : null));
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      UnionColumnVector result = (UnionColumnVector) previousVector;
-      super.nextVector(result, isNull, batchSize);
-      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        result.isRepeating = false;
-        tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
-            batchSize);
-        boolean[] ignore = new boolean[(int) batchSize];
-        for (int f = 0; f < result.fields.length; ++f) {
-          // build the ignore list for this tag
-          for (int r = 0; r < batchSize; ++r) {
-            ignore[r] = (!result.noNulls && result.isNull[r]) ||
-                result.tags[r] != f;
-          }
-          fields[f].nextVector(result.fields[f], ignore, batchSize);
-        }
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      tags = new RunLengthByteReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA)));
-      for (TreeReader field : fields) {
-        if (field != null) {
-          field.startStripe(streams, stripeFooter);
-        }
-      }
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      long[] counts = new long[fields.length];
-      for (int i = 0; i < items; ++i) {
-        counts[tags.next()] += 1;
-      }
-      for (int i = 0; i < counts.length; ++i) {
-        fields[i].skipRows(counts[i]);
-      }
-    }
-  }
-
-  public static class ListTreeReader extends TreeReader {
-    protected final TreeReader elementReader;
-    protected IntegerReader lengths = null;
-
-    protected ListTreeReader(int fileColumn,
-                             TypeDescription readerSchema,
-                             SchemaEvolution evolution,
-                             boolean[] included,
-                             boolean skipCorrupt) throws IOException {
-      super(fileColumn);
-      TypeDescription elementType = readerSchema.getChildren().get(0);
-      elementReader = createTreeReader(elementType, evolution, included,
-          skipCorrupt);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      super.seek(index);
-      lengths.seek(index[columnId]);
-      elementReader.seek(index);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      List<Object> result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new ArrayList<>();
-        } else {
-          result = (ArrayList<Object>) previous;
-        }
-        int prevLength = result.size();
-        int length = (int) lengths.next();
-        // extend the list to the new length
-        for (int i = prevLength; i < length; ++i) {
-          result.add(null);
-        }
-        // read the new elements into the array
-        for (int i = 0; i < length; i++) {
-          result.set(i, elementReader.next(i < prevLength ?
-              result.get(i) : null));
-        }
-        // remove any extra elements
-        for (int i = prevLength - 1; i >= length; --i) {
-          result.remove(i);
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previous,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      ListColumnVector result = (ListColumnVector) previous;
-      super.nextVector(result, isNull, batchSize);
-      // if we have some none-null values, then read them
-      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        lengths.nextVector(result, result.lengths, batchSize);
-        // even with repeating lengths, the list doesn't repeat
-        result.isRepeating = false;
-        // build the offsets vector and figure out how many children to read
-        result.childCount = 0;
-        for (int r = 0; r < batchSize; ++r) {
-          if (result.noNulls || !result.isNull[r]) {
-            result.offsets[r] = result.childCount;
-            result.childCount += result.lengths[r];
-          }
-        }
-        result.child.ensureSize(result.childCount, false);
-        elementReader.nextVector(result.child, null, result.childCount);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.LENGTH)), false, false);
-      if (elementReader != null) {
-        elementReader.startStripe(streams, stripeFooter);
-      }
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      long childSkip = 0;
-      for (long i = 0; i < items; ++i) {
-        childSkip += lengths.next();
-      }
-      elementReader.skipRows(childSkip);
-    }
-  }
-
-  public static class MapTreeReader extends TreeReader {
-    protected final TreeReader keyReader;
-    protected final TreeReader valueReader;
-    protected IntegerReader lengths = null;
-
-    protected MapTreeReader(int fileColumn,
-                            TypeDescription readerSchema,
-                            SchemaEvolution evolution,
-                            boolean[] included,
-                            boolean skipCorrupt) throws IOException {
-      super(fileColumn);
-      TypeDescription keyType = readerSchema.getChildren().get(0);
-      TypeDescription valueType = readerSchema.getChildren().get(1);
-      keyReader = createTreeReader(keyType, evolution, included, skipCorrupt);
-      valueReader = createTreeReader(valueType, evolution, included, skipCorrupt);
-    }
-
-    @Override
-    void seek(PositionProvider[] index) throws IOException {
-      super.seek(index);
-      lengths.seek(index[columnId]);
-      keyReader.seek(index);
-      valueReader.seek(index);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    Object next(Object previous) throws IOException {
-      super.next(previous);
-      Map<Object, Object> result = null;
-      if (valuePresent) {
-        if (previous == null) {
-          result = new LinkedHashMap<>();
-        } else {
-          result = (LinkedHashMap<Object, Object>) previous;
-        }
-        // for now just clear and create new objects
-        result.clear();
-        int length = (int) lengths.next();
-        // read the new elements into the array
-        for (int i = 0; i < length; i++) {
-          result.put(keyReader.next(null), valueReader.next(null));
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public void nextVector(ColumnVector previous,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      MapColumnVector result = (MapColumnVector) previous;
-      super.nextVector(result, isNull, batchSize);
-      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        lengths.nextVector(result, result.lengths, batchSize);
-        // even with repeating lengths, the map doesn't repeat
-        result.isRepeating = false;
-        // build the offsets vector and figure out how many children to read
-        result.childCount = 0;
-        for (int r = 0; r < batchSize; ++r) {
-          if (result.noNulls || !result.isNull[r]) {
-            result.offsets[r] = result.childCount;
-            result.childCount += result.lengths[r];
-          }
-        }
-        result.keys.ensureSize(result.childCount, false);
-        result.values.ensureSize(result.childCount, false);
-        keyReader.nextVector(result.keys, null, result.childCount);
-        valueReader.nextVector(result.values, null, result.childCount);
-      }
-    }
-
-    @Override
-    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
-          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId);
-      }
-    }
-
-    @Override
-    void startStripe(Map<StreamName, InStream> streams,
-        OrcProto.StripeFooter stripeFooter
-    ) throws IOException {
-      super.startStripe(streams, stripeFooter);
-      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
-          streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.LENGTH)), false, false);
-      if (keyReader != null) {
-        keyReader.startStripe(streams, stripeFooter);
-      }
-      if (valueReader != null) {
-        valueReader.startStripe(streams, stripeFooter);
-      }
-    }
-
-    @Override
-    void skipRows(long items) throws IOException {
-      items = countNonNulls(items);
-      long childSkip = 0;
-      for (long i = 0; i < items; ++i) {
-        childSkip += lengths.next();
-      }
-      keyReader.skipRows(childSkip);
-      valueReader.skipRows(childSkip);
-    }
-  }
-
-  public static TreeReader createTreeReader(TypeDescription readerType,
-                                            SchemaEvolution evolution,
-                                            boolean[] included,
-                                            boolean skipCorrupt
-                                            ) throws IOException {
-    TypeDescription fileType = evolution.getFileType(readerType);
-    if (fileType == null ||
-        (included != null && !included[readerType.getId()])) {
-      return new NullTreeReader(0);
-    }
-    TypeDescription.Category readerTypeCategory = readerType.getCategory();
-    if (!fileType.getCategory().equals(readerTypeCategory) &&
-        (readerTypeCategory != TypeDescription.Category.STRUCT &&
-         readerTypeCategory != TypeDescription.Category.MAP &&
-         readerTypeCategory != TypeDescription.Category.LIST &&
-         readerTypeCategory != TypeDescription.Category.UNION)) {
-      // We only convert complex children.
-      return ConvertTreeReaderFactory.createConvertTreeReader(readerType, evolution,
-          included, skipCorrupt);
-    }
-    switch (readerTypeCategory) {
-      case BOOLEAN:
-        return new BooleanTreeReader(fileType.getId());
-      case BYTE:
-        return new ByteTreeReader(fileType.getId());
-      case DOUBLE:
-        return new DoubleTreeReader(fileType.getId());
-      case FLOAT:
-        return new FloatTreeReader(fileType.getId());
-      case SHORT:
-        return new ShortTreeReader(fileType.getId());
-      case INT:
-        return new IntTreeReader(fileType.getId());
-      case LONG:
-        return new LongTreeReader(fileType.getId(), skipCorrupt);
-      case STRING:
-        return new StringTreeReader(fileType.getId());
-      case CHAR:
-        return new CharTreeReader(fileType.getId(), readerType.getMaxLength());
-      case VARCHAR:
-        return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength());
-      case BINARY:
-        return new BinaryTreeReader(fileType.getId());
-      case TIMESTAMP:
-        return new TimestampTreeReader(fileType.getId(), skipCorrupt);
-      case DATE:
-        return new DateTreeReader(fileType.getId());
-      case DECIMAL:
-        return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(),
-            readerType.getScale());
-      case STRUCT:
-        return new StructTreeReader(fileType.getId(), readerType,
-            evolution, included, skipCorrupt);
-      case LIST:
-        return new ListTreeReader(fileType.getId(), readerType,
-            evolution, included, skipCorrupt);
-      case MAP:
-        return new MapTreeReader(fileType.getId(), readerType, evolution,
-            included, skipCorrupt);
-      case UNION:
-        return new UnionTreeReader(fileType.getId(), readerType,
-            evolution, included, skipCorrupt);
-      default:
-        throw new IllegalArgumentException("Unsupported type " +
-            readerTypeCategory);
-    }
-  }
-}