You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:23:01 UTC

[23/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
new file mode 100644
index 0000000..6c8ecfd
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -0,0 +1,2093 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.OrcProto;
+
+/**
+ * Factory for creating ORC tree readers.
+ */
+public class TreeReaderFactory {
+
+  public abstract static class TreeReader {
+    protected final int columnId;
+    protected BitFieldReader present = null;
+    protected boolean valuePresent = false;
+    protected int vectorColumnCount;
+
+    TreeReader(int columnId) throws IOException {
+      this(columnId, null);
+    }
+
+    protected TreeReader(int columnId, InStream in) throws IOException {
+      this.columnId = columnId;
+      if (in == null) {
+        present = null;
+        valuePresent = true;
+      } else {
+        present = new BitFieldReader(in, 1);
+      }
+      vectorColumnCount = -1;
+    }
+
+    void setVectorColumnCount(int vectorColumnCount) {
+      this.vectorColumnCount = vectorColumnCount;
+    }
+
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+        InStream in,
+        boolean signed, boolean skipCorrupt) throws IOException {
+      switch (kind) {
+        case DIRECT_V2:
+        case DICTIONARY_V2:
+          return new RunLengthIntegerReaderV2(in, signed, skipCorrupt);
+        case DIRECT:
+        case DICTIONARY:
+          return new RunLengthIntegerReader(in, signed);
+        default:
+          throw new IllegalArgumentException("Unknown encoding " + kind);
+      }
+    }
+
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      checkEncoding(stripeFooter.getColumnsList().get(columnId));
+      InStream in = streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.PRESENT));
+      if (in == null) {
+        present = null;
+        valuePresent = true;
+      } else {
+        present = new BitFieldReader(in, 1);
+      }
+    }
+
+    /**
+     * Seek to the given position.
+     *
+     * @param index the indexes loaded from the file
+     * @throws IOException
+     */
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        present.seek(index);
+      }
+    }
+
+    protected long countNonNulls(long rows) throws IOException {
+      if (present != null) {
+        long result = 0;
+        for (long c = 0; c < rows; ++c) {
+          if (present.next() == 1) {
+            result += 1;
+          }
+        }
+        return result;
+      } else {
+        return rows;
+      }
+    }
+
+    abstract void skipRows(long rows) throws IOException;
+
+    /**
+     * Called at the top level to read into the given batch.
+     * @param batch the batch to read into
+     * @param batchSize the number of rows to read
+     * @throws IOException
+     */
+    public void nextBatch(VectorizedRowBatch batch,
+                          int batchSize) throws IOException {
+      batch.cols[0].reset();
+      batch.cols[0].ensureSize(batchSize, false);
+      nextVector(batch.cols[0], null, batchSize);
+    }
+
+    /**
+     * Populates the isNull vector array in the previousVector object based on
+     * the present stream values. This function is called from all the child
+     * readers, and they all set the values based on isNull field value.
+     *
+     * @param previous The columnVector object whose isNull value is populated
+     * @param isNull Whether the each value was null at a higher level. If
+     *               isNull is null, all values are non-null.
+     * @param batchSize      Size of the column vector
+     * @throws IOException
+     */
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      if (present != null || isNull != null) {
+        // Set noNulls and isNull vector of the ColumnVector based on
+        // present stream
+        previous.noNulls = true;
+        boolean allNull = true;
+        for (int i = 0; i < batchSize; i++) {
+          if (isNull == null || !isNull[i]) {
+            if (present != null && present.next() != 1) {
+              previous.noNulls = false;
+              previous.isNull[i] = true;
+            } else {
+              previous.isNull[i] = false;
+              allNull = false;
+            }
+          } else {
+            previous.noNulls = false;
+            previous.isNull[i] = true;
+          }
+        }
+        previous.isRepeating = !previous.noNulls && allNull;
+      } else {
+        // There is no present stream, this means that all the values are
+        // present.
+        previous.noNulls = true;
+        for (int i = 0; i < batchSize; i++) {
+          previous.isNull[i] = false;
+        }
+      }
+    }
+
+    public BitFieldReader getPresent() {
+      return present;
+    }
+  }
+
+  public static class NullTreeReader extends TreeReader {
+
+    public NullTreeReader(int columnId) throws IOException {
+      super(columnId);
+    }
+
+    @Override
+    public void startStripe(Map<StreamName, InStream> streams,
+                            OrcProto.StripeFooter footer) {
+      // PASS
+    }
+
+    @Override
+    void skipRows(long rows) {
+      // PASS
+    }
+
+    @Override
+    public void seek(PositionProvider position) {
+      // PASS
+    }
+
+    @Override
+    public void seek(PositionProvider[] position) {
+      // PASS
+    }
+
+    @Override
+    public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
+      vector.noNulls = false;
+      vector.isNull[0] = true;
+      vector.isRepeating = true;
+    }
+  }
+
+  public static class BooleanTreeReader extends TreeReader {
+    protected BitFieldReader reader = null;
+
+    BooleanTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    protected BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      if (data != null) {
+        reader = new BitFieldReader(data, 1);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      reader = new BitFieldReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)), 1);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      LongColumnVector result = (LongColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, batchSize);
+    }
+  }
+
+  public static class ByteTreeReader extends TreeReader {
+    protected RunLengthByteReader reader = null;
+
+    ByteTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    protected ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      this.reader = new RunLengthByteReader(data);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      reader = new RunLengthByteReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)));
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, result.vector, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  public static class ShortTreeReader extends TreeReader {
+    protected IntegerReader reader = null;
+
+    ShortTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    protected ShortTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(name), true, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, result.vector, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  public static class IntTreeReader extends TreeReader {
+    protected IntegerReader reader = null;
+
+    IntTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    protected IntTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(name), true, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, result.vector, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  public static class LongTreeReader extends TreeReader {
+    protected IntegerReader reader = null;
+
+    LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
+      this(columnId, null, null, null, skipCorrupt);
+    }
+
+    protected LongTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding,
+        boolean skipCorrupt)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, skipCorrupt);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(name), true, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, result.vector, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  public static class FloatTreeReader extends TreeReader {
+    protected InStream stream;
+    private final SerializationUtils utils;
+
+    FloatTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    protected FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      this.utils = new SerializationUtils();
+      this.stream = data;
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      stream.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      final boolean hasNulls = !result.noNulls;
+      boolean allNulls = hasNulls;
+
+      if (hasNulls) {
+        // conditions to ensure bounds checks skips
+        for (int i = 0; batchSize <= result.isNull.length && i < batchSize; i++) {
+          allNulls = allNulls & result.isNull[i];
+        }
+        if (allNulls) {
+          result.vector[0] = Double.NaN;
+          result.isRepeating = true;
+        } else {
+          // some nulls
+          result.isRepeating = false;
+          // conditions to ensure bounds checks skips
+          for (int i = 0; batchSize <= result.isNull.length
+              && batchSize <= result.vector.length && i < batchSize; i++) {
+            if (!result.isNull[i]) {
+              result.vector[i] = utils.readFloat(stream);
+            } else {
+              // If the value is not present then set NaN
+              result.vector[i] = Double.NaN;
+            }
+          }
+        }
+      } else {
+        // no nulls & > 1 row (check repeating)
+        boolean repeating = (batchSize > 1);
+        final float f1 = utils.readFloat(stream);
+        result.vector[0] = f1;
+        // conditions to ensure bounds checks skips
+        for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
+          final float f2 = utils.readFloat(stream);
+          repeating = repeating && (f1 == f2);
+          result.vector[i] = f2;
+        }
+        result.isRepeating = repeating;
+      }
+    }
+
+    @Override
+    protected void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for (int i = 0; i < items; ++i) {
+        utils.readFloat(stream);
+      }
+    }
+  }
+
+  public static class DoubleTreeReader extends TreeReader {
+    protected InStream stream;
+    private final SerializationUtils utils;
+
+    DoubleTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    protected DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      this.utils = new SerializationUtils();
+      this.stream = data;
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name =
+          new StreamName(columnId,
+              OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      stream.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      final boolean hasNulls = !result.noNulls;
+      boolean allNulls = hasNulls;
+
+      if (hasNulls) {
+        // conditions to ensure bounds checks skips
+        for (int i = 0; i < batchSize && batchSize <= result.isNull.length; i++) {
+          allNulls = allNulls & result.isNull[i];
+        }
+        if (allNulls) {
+          result.vector[0] = Double.NaN;
+          result.isRepeating = true;
+        } else {
+          // some nulls
+          result.isRepeating = false;
+          // conditions to ensure bounds checks skips
+          for (int i = 0; batchSize <= result.isNull.length
+              && batchSize <= result.vector.length && i < batchSize; i++) {
+            if (!result.isNull[i]) {
+              result.vector[i] = utils.readDouble(stream);
+            } else {
+              // If the value is not present then set NaN
+              result.vector[i] = Double.NaN;
+            }
+          }
+        }
+      } else {
+        // no nulls
+        boolean repeating = (batchSize > 1);
+        final double d1 = utils.readDouble(stream);
+        result.vector[0] = d1;
+        // conditions to ensure bounds checks skips
+        for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
+          final double d2 = utils.readDouble(stream);
+          repeating = repeating && (d1 == d2);
+          result.vector[i] = d2;
+        }
+        result.isRepeating = repeating;
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long len = items * 8;
+      while (len > 0) {
+        len -= stream.skip(len);
+      }
+    }
+  }
+
+  public static class BinaryTreeReader extends TreeReader {
+    protected InStream stream;
+    protected IntegerReader lengths = null;
+    protected final LongColumnVector scratchlcv;
+
+    BinaryTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null, null);
+    }
+
+    protected BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      scratchlcv = new LongColumnVector();
+      this.stream = data;
+      if (length != null && encoding != null) {
+        checkEncoding(encoding);
+        this.lengths = createIntegerReader(encoding.getKind(), length, false, false);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      stream.seek(index);
+      lengths.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long lengthToSkip = 0;
+      for (int i = 0; i < items; ++i) {
+        lengthToSkip += lengths.next();
+      }
+      while (lengthToSkip > 0) {
+        lengthToSkip -= stream.skip(lengthToSkip);
+      }
+    }
+  }
+
+  public static class TimestampTreeReader extends TreeReader {
+    protected IntegerReader data = null;
+    protected IntegerReader nanos = null;
+    private final boolean skipCorrupt;
+    private Map<String, Long> baseTimestampMap;
+    private long base_timestamp;
+    private final TimeZone readerTimeZone;
+    private TimeZone writerTimeZone;
+    private boolean hasSameTZRules;
+
+    TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
+      this(columnId, null, null, null, null, skipCorrupt);
+    }
+
+    protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
+        InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt)
+        throws IOException {
+      super(columnId, presentStream);
+      this.skipCorrupt = skipCorrupt;
+      this.baseTimestampMap = new HashMap<>();
+      this.readerTimeZone = TimeZone.getDefault();
+      this.writerTimeZone = readerTimeZone;
+      this.hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
+      this.base_timestamp = getBaseTimestamp(readerTimeZone.getID());
+      if (encoding != null) {
+        checkEncoding(encoding);
+
+        if (dataStream != null) {
+          this.data = createIntegerReader(encoding.getKind(), dataStream, true, skipCorrupt);
+        }
+
+        if (nanosStream != null) {
+          this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, skipCorrupt);
+        }
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      data = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.DATA)), true, skipCorrupt);
+      nanos = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.SECONDARY)), false, skipCorrupt);
+      base_timestamp = getBaseTimestamp(stripeFooter.getWriterTimezone());
+    }
+
+    private long getBaseTimestamp(String timeZoneId) throws IOException {
+      // to make sure new readers read old files in the same way
+      if (timeZoneId == null || timeZoneId.isEmpty()) {
+        timeZoneId = readerTimeZone.getID();
+      }
+
+      if (!baseTimestampMap.containsKey(timeZoneId)) {
+        writerTimeZone = TimeZone.getTimeZone(timeZoneId);
+        hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(writerTimeZone);
+        try {
+          long epoch =
+              sdf.parse(WriterImpl.BASE_TIMESTAMP_STRING).getTime() / WriterImpl.MILLIS_PER_SECOND;
+          baseTimestampMap.put(timeZoneId, epoch);
+          return epoch;
+        } catch (ParseException e) {
+          throw new IOException("Unable to create base timestamp", e);
+        } finally {
+          sdf.setTimeZone(readerTimeZone);
+        }
+      }
+
+      return baseTimestampMap.get(timeZoneId);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      data.seek(index);
+      nanos.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      TimestampColumnVector result = (TimestampColumnVector) previousVector;
+      super.nextVector(previousVector, isNull, batchSize);
+
+      for (int i = 0; i < batchSize; i++) {
+        if (result.noNulls || !result.isNull[i]) {
+          long millis = data.next() + base_timestamp;
+          int newNanos = parseNanos(nanos.next());
+          if (millis < 0 && newNanos != 0) {
+            millis -= 1;
+          }
+          millis *= WriterImpl.MILLIS_PER_SECOND;
+          long offset = 0;
+          // If reader and writer time zones have different rules, adjust the timezone difference
+          // between reader and writer taking day light savings into account.
+          if (!hasSameTZRules) {
+            offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
+          }
+          long adjustedMillis = millis + offset;
+          // Sometimes the reader timezone might have changed after adding the adjustedMillis.
+          // To account for that change, check for any difference in reader timezone after
+          // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
+          if (!hasSameTZRules &&
+              (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
+            long newOffset =
+                writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
+            adjustedMillis = millis + newOffset;
+          }
+          result.time[i] = adjustedMillis;
+          result.nanos[i] = newNanos;
+          if (result.isRepeating && i != 0 &&
+              (result.time[0] != result.time[i] ||
+                  result.nanos[0] != result.nanos[i])) {
+            result.isRepeating = false;
+          }
+        }
+      }
+    }
+
+    private static int parseNanos(long serialized) {
+      int zeros = 7 & (int) serialized;
+      int result = (int) (serialized >>> 3);
+      if (zeros != 0) {
+        for (int i = 0; i <= zeros; ++i) {
+          result *= 10;
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      data.skip(items);
+      nanos.skip(items);
+    }
+  }
+
+  public static class DateTreeReader extends TreeReader {
+    protected IntegerReader reader = null;
+
+    DateTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    protected DateTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(name), true, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, result.vector, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  public static class DecimalTreeReader extends TreeReader {
+    protected InStream valueStream;
+    protected IntegerReader scaleReader = null;
+    private int[] scratchScaleVector;
+
+    private final int precision;
+    private final int scale;
+
+    DecimalTreeReader(int columnId, int precision, int scale) throws IOException {
+      this(columnId, precision, scale, null, null, null, null);
+    }
+
+    protected DecimalTreeReader(int columnId, int precision, int scale, InStream present,
+        InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
+      this.precision = precision;
+      this.scale = scale;
+      this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
+      this.valueStream = valueStream;
+      if (scaleStream != null && encoding != null) {
+        checkEncoding(encoding);
+        this.scaleReader = createIntegerReader(encoding.getKind(), scaleStream, true, false);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      valueStream = streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA));
+      scaleReader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      valueStream.seek(index);
+      scaleReader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final DecimalColumnVector result = (DecimalColumnVector) previousVector;
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      // read the scales
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
+      // Read value entries based on isNull entries
+      if (result.noNulls) {
+        for (int r=0; r < batchSize; ++r) {
+          BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
+          HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+          result.set(r, dec);
+        }
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        for (int r=0; r < batchSize; ++r) {
+          if (!result.isNull[r]) {
+            BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
+            HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+            result.set(r, dec);
+          }
+        }
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for (int i = 0; i < items; i++) {
+        SerializationUtils.readBigInteger(valueStream);
+      }
+      scaleReader.skip(items);
+    }
+  }
+
+  /**
+   * A tree reader that will read string columns. At the start of the
+   * stripe, it creates an internal reader based on whether a direct or
+   * dictionary encoding was used.
+   */
+  public static class StringTreeReader extends TreeReader {
+    protected TreeReader reader;
+
+    StringTreeReader(int columnId) throws IOException {
+      super(columnId);
+    }
+
+    protected StringTreeReader(int columnId, InStream present, InStream data, InStream length,
+        InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      if (encoding != null) {
+        switch (encoding.getKind()) {
+          case DIRECT:
+          case DIRECT_V2:
+            reader = new StringDirectTreeReader(columnId, present, data, length,
+                encoding.getKind());
+            break;
+          case DICTIONARY:
+          case DICTIONARY_V2:
+            reader = new StringDictionaryTreeReader(columnId, present, data, length, dictionary,
+                encoding);
+            break;
+          default:
+            throw new IllegalArgumentException("Unsupported encoding " +
+                encoding.getKind());
+        }
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      reader.checkEncoding(encoding);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      // For each stripe, checks the encoding and initializes the appropriate
+      // reader
+      switch (stripeFooter.getColumnsList().get(columnId).getKind()) {
+        case DIRECT:
+        case DIRECT_V2:
+          reader = new StringDirectTreeReader(columnId);
+          break;
+        case DICTIONARY:
+        case DICTIONARY_V2:
+          reader = new StringDictionaryTreeReader(columnId);
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported encoding " +
+              stripeFooter.getColumnsList().get(columnId).getKind());
+      }
+      reader.startStripe(streams, stripeFooter);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      reader.seek(index);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      reader.nextVector(previousVector, isNull, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skipRows(items);
+    }
+  }
+
+  // This class collects together very similar methods for reading an ORC vector of byte arrays and
+  // creating the BytesColumnVector.
+  //
+  public static class BytesColumnVectorUtil {
+
+    private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths,
+        LongColumnVector scratchlcv,
+        BytesColumnVector result, final int batchSize) throws IOException {
+      // Read lengths
+      scratchlcv.isNull = result.isNull;  // Notice we are replacing the isNull vector here...
+      lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
+      int totalLength = 0;
+      if (!scratchlcv.isRepeating) {
+        for (int i = 0; i < batchSize; i++) {
+          if (!scratchlcv.isNull[i]) {
+            totalLength += (int) scratchlcv.vector[i];
+          }
+        }
+      } else {
+        if (!scratchlcv.isNull[0]) {
+          totalLength = (int) (batchSize * scratchlcv.vector[0]);
+        }
+      }
+
+      // Read all the strings for this batch
+      byte[] allBytes = new byte[totalLength];
+      int offset = 0;
+      int len = totalLength;
+      while (len > 0) {
+        int bytesRead = stream.read(allBytes, offset, len);
+        if (bytesRead < 0) {
+          throw new EOFException("Can't finish byte read from " + stream);
+        }
+        len -= bytesRead;
+        offset += bytesRead;
+      }
+
+      return allBytes;
+    }
+
+    // This method has the common code for reading in bytes into a BytesColumnVector.
+    public static void readOrcByteArrays(InStream stream,
+                                         IntegerReader lengths,
+                                         LongColumnVector scratchlcv,
+                                         BytesColumnVector result,
+                                         final int batchSize) throws IOException {
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
+            result, (int) batchSize);
+
+        // Too expensive to figure out 'repeating' by comparisons.
+        result.isRepeating = false;
+        int offset = 0;
+        if (!scratchlcv.isRepeating) {
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
+              offset += scratchlcv.vector[i];
+            } else {
+              result.setRef(i, allBytes, 0, 0);
+            }
+          }
+        } else {
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
+              offset += scratchlcv.vector[0];
+            } else {
+              result.setRef(i, allBytes, 0, 0);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * A reader for string columns that are direct encoded in the current
+   * stripe.
+   */
+  public static class StringDirectTreeReader extends TreeReader {
+    private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+    protected InStream stream;
+    protected HadoopShims.TextReaderShim data;
+    protected IntegerReader lengths;
+    private final LongColumnVector scratchlcv;
+
+    StringDirectTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null, null);
+    }
+
+    protected StringDirectTreeReader(int columnId, InStream present, InStream data,
+        InStream length, OrcProto.ColumnEncoding.Kind encoding) throws IOException {
+      super(columnId, present);
+      this.scratchlcv = new LongColumnVector();
+      this.stream = data;
+      if (length != null && encoding != null) {
+        this.lengths = createIntegerReader(encoding, length, false, false);
+        this.data = SHIMS.getTextReaderShim(this.stream);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+      data = SHIMS.getTextReaderShim(this.stream);
+      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+          false, false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      stream.seek(index);
+      // don't seek data stream
+      lengths.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
+          result, batchSize);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long lengthToSkip = 0;
+      for (int i = 0; i < items; ++i) {
+        lengthToSkip += lengths.next();
+      }
+
+      while (lengthToSkip > 0) {
+        lengthToSkip -= stream.skip(lengthToSkip);
+      }
+    }
+
+    public IntegerReader getLengths() {
+      return lengths;
+    }
+
+    public InStream getStream() {
+      return stream;
+    }
+  }
+
+  /**
+   * A reader for string columns that are dictionary encoded in the current
+   * stripe.
+   */
+  public static class StringDictionaryTreeReader extends TreeReader {
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+    private DynamicByteArray dictionaryBuffer;
+    private int[] dictionaryOffsets;
+    protected IntegerReader reader;
+
+    private byte[] dictionaryBufferInBytesCache = null;
+    private final LongColumnVector scratchlcv;
+
+    StringDictionaryTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null, null, null);
+    }
+
+    protected StringDictionaryTreeReader(int columnId, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
+      scratchlcv = new LongColumnVector();
+      if (data != null && encoding != null) {
+        this.reader = createIntegerReader(encoding.getKind(), data, false, false);
+      }
+
+      if (dictionary != null && encoding != null) {
+        readDictionaryStream(dictionary);
+      }
+
+      if (length != null && encoding != null) {
+        readDictionaryLengthStream(length, encoding);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+
+      // read the dictionary blob
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DICTIONARY_DATA);
+      InStream in = streams.get(name);
+      readDictionaryStream(in);
+
+      // read the lengths
+      name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
+      in = streams.get(name);
+      readDictionaryLengthStream(in, stripeFooter.getColumnsList().get(columnId));
+
+      // set up the row reader
+      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(name), false, false);
+    }
+
+    private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      int dictionarySize = encoding.getDictionarySize();
+      if (in != null) { // Guard against empty LENGTH stream.
+        IntegerReader lenReader = createIntegerReader(encoding.getKind(), in, false, false);
+        int offset = 0;
+        if (dictionaryOffsets == null ||
+            dictionaryOffsets.length < dictionarySize + 1) {
+          dictionaryOffsets = new int[dictionarySize + 1];
+        }
+        for (int i = 0; i < dictionarySize; ++i) {
+          dictionaryOffsets[i] = offset;
+          offset += (int) lenReader.next();
+        }
+        dictionaryOffsets[dictionarySize] = offset;
+        in.close();
+      }
+
+    }
+
+    private void readDictionaryStream(InStream in) throws IOException {
+      if (in != null) { // Guard against empty dictionary stream.
+        if (in.available() > 0) {
+          dictionaryBuffer = new DynamicByteArray(64, in.available());
+          dictionaryBuffer.readAll(in);
+          // Since its start of strip invalidate the cache.
+          dictionaryBufferInBytesCache = null;
+        }
+        in.close();
+      } else {
+        dictionaryBuffer = null;
+      }
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      reader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
+      int offset;
+      int length;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+
+      if (dictionaryBuffer != null) {
+
+        // Load dictionaryBuffer into cache.
+        if (dictionaryBufferInBytesCache == null) {
+          dictionaryBufferInBytesCache = dictionaryBuffer.get();
+        }
+
+        // Read string offsets
+        scratchlcv.isNull = result.isNull;
+        scratchlcv.ensureSize((int) batchSize, false);
+        reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
+        if (!scratchlcv.isRepeating) {
+
+          // The vector has non-repeating strings. Iterate thru the batch
+          // and set strings one by one
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+              length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+              result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+            } else {
+              // If the value is null then set offset and length to zero (null string)
+              result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+            }
+          }
+        } else {
+          // If the value is repeating then just set the first value in the
+          // vector and set the isRepeating flag to true. No need to iterate thru and
+          // set all the elements to the same value
+          offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+          length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+          result.setRef(0, dictionaryBufferInBytesCache, offset, length);
+        }
+        result.isRepeating = scratchlcv.isRepeating;
+      } else {
+        if (dictionaryOffsets == null) {
+          // Entire stripe contains null strings.
+          result.isRepeating = true;
+          result.noNulls = false;
+          result.isNull[0] = true;
+          result.setRef(0, EMPTY_BYTE_ARRAY, 0, 0);
+        } else {
+          // stripe contains nulls and empty strings
+          for (int i = 0; i < batchSize; i++) {
+            if (!result.isNull[i]) {
+              result.setRef(i, EMPTY_BYTE_ARRAY, 0, 0);
+            }
+          }
+        }
+      }
+    }
+
+    int getDictionaryEntryLength(int entry, int offset) {
+      final int length;
+      // if it isn't the last entry, subtract the offsets otherwise use
+      // the buffer length.
+      if (entry < dictionaryOffsets.length - 1) {
+        length = dictionaryOffsets[entry + 1] - offset;
+      } else {
+        length = dictionaryBuffer.size() - offset;
+      }
+      return length;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+
+    public IntegerReader getReader() {
+      return reader;
+    }
+  }
+
+  public static class CharTreeReader extends StringTreeReader {
+    int maxLength;
+
+    CharTreeReader(int columnId, int maxLength) throws IOException {
+      this(columnId, maxLength, null, null, null, null, null);
+    }
+
+    protected CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, dictionary, encoding);
+      this.maxLength = maxLength;
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      // Get the vector of strings from StringTreeReader, then make a 2nd pass to
+      // adjust down the length (right trim and truncate) if necessary.
+      super.nextVector(previousVector, isNull, batchSize);
+      BytesColumnVector result = (BytesColumnVector) previousVector;
+      int adjustedDownLen;
+      if (result.isRepeating) {
+        if (result.noNulls || !result.isNull[0]) {
+          adjustedDownLen = StringExpr
+              .rightTrimAndTruncate(result.vector[0], result.start[0], result.length[0], maxLength);
+          if (adjustedDownLen < result.length[0]) {
+            result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
+          }
+        }
+      } else {
+        if (result.noNulls) {
+          for (int i = 0; i < batchSize; i++) {
+            adjustedDownLen = StringExpr
+                .rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i],
+                    maxLength);
+            if (adjustedDownLen < result.length[i]) {
+              result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+            }
+          }
+        } else {
+          for (int i = 0; i < batchSize; i++) {
+            if (!result.isNull[i]) {
+              adjustedDownLen = StringExpr
+                  .rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i],
+                      maxLength);
+              if (adjustedDownLen < result.length[i]) {
+                result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  public static class VarcharTreeReader extends StringTreeReader {
+    int maxLength;
+
+    VarcharTreeReader(int columnId, int maxLength) throws IOException {
+      this(columnId, maxLength, null, null, null, null, null);
+    }
+
+    protected VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, dictionary, encoding);
+      this.maxLength = maxLength;
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      // Get the vector of strings from StringTreeReader, then make a 2nd pass to
+      // adjust down the length (truncate) if necessary.
+      super.nextVector(previousVector, isNull, batchSize);
+      BytesColumnVector result = (BytesColumnVector) previousVector;
+
+      int adjustedDownLen;
+      if (result.isRepeating) {
+        if (result.noNulls || !result.isNull[0]) {
+          adjustedDownLen = StringExpr
+              .truncate(result.vector[0], result.start[0], result.length[0], maxLength);
+          if (adjustedDownLen < result.length[0]) {
+            result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
+          }
+        }
+      } else {
+        if (result.noNulls) {
+          for (int i = 0; i < batchSize; i++) {
+            adjustedDownLen = StringExpr
+                .truncate(result.vector[i], result.start[i], result.length[i], maxLength);
+            if (adjustedDownLen < result.length[i]) {
+              result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+            }
+          }
+        } else {
+          for (int i = 0; i < batchSize; i++) {
+            if (!result.isNull[i]) {
+              adjustedDownLen = StringExpr
+                  .truncate(result.vector[i], result.start[i], result.length[i], maxLength);
+              if (adjustedDownLen < result.length[i]) {
+                result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  protected static class StructTreeReader extends TreeReader {
+    protected final TreeReader[] fields;
+
+    protected StructTreeReader(int columnId,
+                               TypeDescription readerSchema,
+                               SchemaEvolution evolution,
+                               boolean[] included,
+                               boolean skipCorrupt) throws IOException {
+      super(columnId);
+
+      List<TypeDescription> childrenTypes = readerSchema.getChildren();
+      this.fields = new TreeReader[childrenTypes.size()];
+      for (int i = 0; i < fields.length; ++i) {
+        TypeDescription subtype = childrenTypes.get(i);
+        this.fields[i] = createTreeReader(subtype, evolution, included, skipCorrupt);
+      }
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      for (TreeReader kid : fields) {
+        if (kid != null) {
+          kid.seek(index);
+        }
+      }
+    }
+
+    @Override
+    public void nextBatch(VectorizedRowBatch batch,
+                          int batchSize) throws IOException {
+      for(int i=0; i < fields.length &&
+          (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+        batch.cols[i].reset();
+        batch.cols[i].ensureSize((int) batchSize, false);
+        fields[i].nextVector(batch.cols[i], null, batchSize);
+      }
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      super.nextVector(previousVector, isNull, batchSize);
+      StructColumnVector result = (StructColumnVector) previousVector;
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        result.isRepeating = false;
+
+        // Read all the members of struct as column vectors
+        boolean[] mask = result.noNulls ? null : result.isNull;
+        for (int f = 0; f < fields.length; f++) {
+          if (fields[f] != null) {
+            fields[f].nextVector(result.fields[f], mask, batchSize);
+          }
+        }
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      for (TreeReader field : fields) {
+        if (field != null) {
+          field.startStripe(streams, stripeFooter);
+        }
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for (TreeReader field : fields) {
+        if (field != null) {
+          field.skipRows(items);
+        }
+      }
+    }
+  }
+
+  public static class UnionTreeReader extends TreeReader {
+    protected final TreeReader[] fields;
+    protected RunLengthByteReader tags;
+
+    protected UnionTreeReader(int fileColumn,
+                              TypeDescription readerSchema,
+                              SchemaEvolution evolution,
+                              boolean[] included,
+                              boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      List<TypeDescription> childrenTypes = readerSchema.getChildren();
+      int fieldCount = childrenTypes.size();
+      this.fields = new TreeReader[fieldCount];
+      for (int i = 0; i < fieldCount; ++i) {
+        TypeDescription subtype = childrenTypes.get(i);
+        this.fields[i] = createTreeReader(subtype, evolution, included, skipCorrupt);
+      }
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      tags.seek(index[columnId]);
+      for (TreeReader kid : fields) {
+        kid.seek(index);
+      }
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      UnionColumnVector result = (UnionColumnVector) previousVector;
+      super.nextVector(result, isNull, batchSize);
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        result.isRepeating = false;
+        tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
+            batchSize);
+        boolean[] ignore = new boolean[(int) batchSize];
+        for (int f = 0; f < result.fields.length; ++f) {
+          // build the ignore list for this tag
+          for (int r = 0; r < batchSize; ++r) {
+            ignore[r] = (!result.noNulls && result.isNull[r]) ||
+                result.tags[r] != f;
+          }
+          fields[f].nextVector(result.fields[f], ignore, batchSize);
+        }
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      tags = new RunLengthByteReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)));
+      for (TreeReader field : fields) {
+        if (field != null) {
+          field.startStripe(streams, stripeFooter);
+        }
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long[] counts = new long[fields.length];
+      for (int i = 0; i < items; ++i) {
+        counts[tags.next()] += 1;
+      }
+      for (int i = 0; i < counts.length; ++i) {
+        fields[i].skipRows(counts[i]);
+      }
+    }
+  }
+
+  public static class ListTreeReader extends TreeReader {
+    protected final TreeReader elementReader;
+    protected IntegerReader lengths = null;
+
+    protected ListTreeReader(int fileColumn,
+                             TypeDescription readerSchema,
+                             SchemaEvolution evolution,
+                             boolean[] included,
+                             boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      TypeDescription elementType = readerSchema.getChildren().get(0);
+      elementReader = createTreeReader(elementType, evolution, included,
+          skipCorrupt);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      lengths.seek(index[columnId]);
+      elementReader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      ListColumnVector result = (ListColumnVector) previous;
+      super.nextVector(result, isNull, batchSize);
+      // if we have some none-null values, then read them
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        lengths.nextVector(result, result.lengths, batchSize);
+        // even with repeating lengths, the list doesn't repeat
+        result.isRepeating = false;
+        // build the offsets vector and figure out how many children to read
+        result.childCount = 0;
+        for (int r = 0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.offsets[r] = result.childCount;
+            result.childCount += result.lengths[r];
+          }
+        }
+        result.child.ensureSize(result.childCount, false);
+        elementReader.nextVector(result.child, null, result.childCount);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.LENGTH)), false, false);
+      if (elementReader != null) {
+        elementReader.startStripe(streams, stripeFooter);
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long childSkip = 0;
+      for (long i = 0; i < items; ++i) {
+        childSkip += lengths.next();
+      }
+      elementReader.skipRows(childSkip);
+    }
+  }
+
+  public static class MapTreeReader extends TreeReader {
+    protected final TreeReader keyReader;
+    protected final TreeReader valueReader;
+    protected IntegerReader lengths = null;
+
+    protected MapTreeReader(int fileColumn,
+                            TypeDescription readerSchema,
+                            SchemaEvolution evolution,
+                            boolean[] included,
+                            boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      TypeDescription keyType = readerSchema.getChildren().get(0);
+      TypeDescription valueType = readerSchema.getChildren().get(1);
+      keyReader = createTreeReader(keyType, evolution, included, skipCorrupt);
+      valueReader = createTreeReader(valueType, evolution, included, skipCorrupt);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      lengths.seek(index[columnId]);
+      keyReader.seek(index);
+      valueReader.seek(index);
+    }
+
+    @Override
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      MapColumnVector result = (MapColumnVector) previous;
+      super.nextVector(result, isNull, batchSize);
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        lengths.nextVector(result, result.lengths, batchSize);
+        // even with repeating lengths, the map doesn't repeat
+        result.isRepeating = false;
+        // build the offsets vector and figure out how many children to read
+        result.childCount = 0;
+        for (int r = 0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.offsets[r] = result.childCount;
+            result.childCount += result.lengths[r];
+          }
+        }
+        result.keys.ensureSize(result.childCount, false);
+        result.values.ensureSize(result.childCount, false);
+        keyReader.nextVector(result.keys, null, result.childCount);
+        valueReader.nextVector(result.values, null, result.childCount);
+      }
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+        OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.LENGTH)), false, false);
+      if (keyReader != null) {
+        keyReader.startStripe(streams, stripeFooter);
+      }
+      if (valueReader != null) {
+        valueReader.startStripe(streams, stripeFooter);
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long childSkip = 0;
+      for (long i = 0; i < items; ++i) {
+        childSkip += lengths.next();
+      }
+      keyReader.skipRows(childSkip);
+      valueReader.skipRows(childSkip);
+    }
+  }
+
+  public static TreeReader createTreeReader(TypeDescription readerType,
+                                            SchemaEvolution evolution,
+                                            boolean[] included,
+                                            boolean skipCorrupt
+                                            ) throws IOException {
+    TypeDescription fileType = evolution.getFileType(readerType);
+    if (fileType == null ||
+        (included != null && !included[readerType.getId()])) {
+      return new NullTreeReader(0);
+    }
+    TypeDescription.Category readerTypeCategory = readerType.getCategory();
+    if (!fileType.getCategory().equals(readerTypeCategory) &&
+        (readerTypeCategory != TypeDescription.Category.STRUCT &&
+         readerTypeCategory != TypeDescription.Category.MAP &&
+         readerTypeCategory != TypeDescription.Category.LIST &&
+         readerTypeCategory != TypeDescription.Category.UNION)) {
+      // We only convert complex children.
+      return ConvertTreeReaderFactory.createConvertTreeReader(readerType, evolution,
+          included, skipCorrupt);
+    }
+    switch (readerTypeCategory) {
+      case BOOLEAN:
+        return new BooleanTreeReader(fileType.getId());
+      case BYTE:
+        return new ByteTreeReader(fileType.getId());
+      case DOUBLE:
+        return new DoubleTreeReader(fileType.getId());
+      case FLOAT:
+        return new FloatTreeReader(fileType.getId());
+      case SHORT:
+        return new ShortTreeReader(fileType.getId());
+      case INT:
+        return new IntTreeReader(fileType.getId());
+      case LONG:
+        return new LongTreeReader(fileType.getId(), skipCorrupt);
+      case STRING:
+        return new StringTreeReader(fileType.getId());
+      case CHAR:
+        return new CharTreeReader(fileType.getId(), readerType.getMaxLength());
+      case VARCHAR:
+        return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength());
+      case BINARY:
+        return new BinaryTreeReader(fileType.getId());
+      case TIMESTAMP:
+        return new TimestampTreeReader(fileType.getId(), skipCorrupt);
+      case DATE:
+        return new DateTreeReader(fileType.getId());
+      case DECIMAL:
+        return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(),
+            readerType.getScale());
+      case STRUCT:
+        return new StructTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
+      case LIST:
+        return new ListTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
+      case MAP:
+        return new MapTreeReader(fileType.getId(), readerType, evolution,
+            included, skipCorrupt);
+      case UNION:
+        return new UnionTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
+      default:
+        throw new IllegalArgumentException("Unsupported type " +
+            readerTypeCategory);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/ZeroCopyShims.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/ZeroCopyShims.java b/orc/src/java/org/apache/orc/impl/ZeroCopyShims.java
new file mode 100644
index 0000000..de02c8b
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/ZeroCopyShims.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.io.ByteBufferPool;
+
+class ZeroCopyShims {
+  private static final class ByteBufferPoolAdapter implements ByteBufferPool {
+    private HadoopShims.ByteBufferPoolShim pool;
+
+    public ByteBufferPoolAdapter(HadoopShims.ByteBufferPoolShim pool) {
+      this.pool = pool;
+    }
+
+    @Override
+    public final ByteBuffer getBuffer(boolean direct, int length) {
+      return this.pool.getBuffer(direct, length);
+    }
+
+    @Override
+    public final void putBuffer(ByteBuffer buffer) {
+      this.pool.putBuffer(buffer);
+    }
+  }
+
+  private static final class ZeroCopyAdapter implements HadoopShims.ZeroCopyReaderShim {
+    private final FSDataInputStream in;
+    private final ByteBufferPoolAdapter pool;
+    private final static EnumSet<ReadOption> CHECK_SUM = EnumSet
+        .noneOf(ReadOption.class);
+    private final static EnumSet<ReadOption> NO_CHECK_SUM = EnumSet
+        .of(ReadOption.SKIP_CHECKSUMS);
+
+    public ZeroCopyAdapter(FSDataInputStream in,
+                           HadoopShims.ByteBufferPoolShim poolshim) {
+      this.in = in;
+      if (poolshim != null) {
+        pool = new ByteBufferPoolAdapter(poolshim);
+      } else {
+        pool = null;
+      }
+    }
+
+    public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums)
+        throws IOException {
+      EnumSet<ReadOption> options = NO_CHECK_SUM;
+      if (verifyChecksums) {
+        options = CHECK_SUM;
+      }
+      return this.in.read(this.pool, maxLength, options);
+    }
+
+    public final void releaseBuffer(ByteBuffer buffer) {
+      this.in.releaseBuffer(buffer);
+    }
+
+    @Override
+    public final void close() throws IOException {
+      this.in.close();
+    }
+  }
+
+  public static HadoopShims.ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+                                                                 HadoopShims.ByteBufferPoolShim pool) throws IOException {
+    return new ZeroCopyAdapter(in, pool);
+  }
+
+}