You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:23:03 UTC

[25/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims.java b/orc/src/java/org/apache/orc/impl/HadoopShims.java
index 2980d71..ef7d70f 100644
--- a/orc/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/orc/src/java/org/apache/orc/impl/HadoopShims.java
@@ -18,9 +18,13 @@
 
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.VersionInfo;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 public interface HadoopShims {
@@ -43,6 +47,81 @@ public interface HadoopShims {
    */
   DirectDecompressor getDirectDecompressor(DirectCompressionType codec);
 
+  /**
+   * a hadoop.io ByteBufferPool shim.
+   */
+  public interface ByteBufferPoolShim {
+    /**
+     * Get a new ByteBuffer from the pool.  The pool can provide this from
+     * removing a buffer from its internal cache, or by allocating a
+     * new buffer.
+     *
+     * @param direct     Whether the buffer should be direct.
+     * @param length     The minimum length the buffer will have.
+     * @return           A new ByteBuffer. Its capacity can be less
+     *                   than what was requested, but must be at
+     *                   least 1 byte.
+     */
+    ByteBuffer getBuffer(boolean direct, int length);
+
+    /**
+     * Release a buffer back to the pool.
+     * The pool may choose to put this buffer into its cache/free it.
+     *
+     * @param buffer    a direct bytebuffer
+     */
+    void putBuffer(ByteBuffer buffer);
+  }
+
+  /**
+   * Provides an HDFS ZeroCopyReader shim.
+   * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+   * @param in ByteBufferPoolShim to allocate fallback buffers with
+   *
+   * @return returns null if not supported
+   */
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+  public interface ZeroCopyReaderShim extends Closeable {
+    /**
+     * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+     * Also move the in stream by that amount. The data read can be small than maxLength.
+     *
+     * @return ByteBuffer read from the stream,
+     */
+    public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+    /**
+     * Release a ByteBuffer obtained from a read on the
+     * Also move the in stream by that amount. The data read can be small than maxLength.
+     *
+     */
+    public void releaseBuffer(ByteBuffer buffer);
+
+    /**
+     * Close the underlying stream.
+     * @throws IOException
+     */
+    public void close() throws IOException;
+  }
+  /**
+   * Read data into a Text object in the fastest way possible
+   */
+  public interface TextReaderShim {
+    /**
+     * @param txt
+     * @param size
+     * @return bytes read
+     * @throws IOException
+     */
+    void read(Text txt, int size) throws IOException;
+  }
+
+  /**
+   * Wrap a TextReaderShim around an input stream. The reader shim will not
+   * buffer any reads from the underlying stream and will only consume bytes
+   * which are required for TextReaderShim.read() input.
+   */
+  public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
 
   class Factory {
     private static HadoopShims SHIMS = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
index 3b9371d..5c53f74 100644
--- a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -18,10 +18,14 @@
 
 package org.apache.orc.impl;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
 
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 /**
@@ -59,4 +63,30 @@ public class HadoopShimsCurrent implements HadoopShims {
         return null;
     }
   }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+                                              ByteBufferPoolShim pool
+                                              ) throws IOException {
+    return ZeroCopyShims.getZeroCopyReader(in, pool);
+  }
+
+  private final class FastTextReaderShim implements TextReaderShim {
+    private final DataInputStream din;
+
+    public FastTextReaderShim(InputStream in) {
+      this.din = new DataInputStream(in);
+    }
+
+    @Override
+    public void read(Text txt, int len) throws IOException {
+      txt.readWithKnownLength(din, len);
+    }
+  }
+
+  @Override
+  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+    return new FastTextReaderShim(in);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
index ac46836..3f65e74 100644
--- a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
+++ b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
@@ -18,19 +18,84 @@
 
 package org.apache.orc.impl;
 
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
-import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.io.InputStream;
+import java.lang.reflect.Method;
 
 /**
  * Shims for versions of Hadoop up to and including 2.2.x
  */
 public class HadoopShims_2_2 implements HadoopShims {
 
+  final boolean zeroCopy;
+  final boolean fastRead;
+
+  HadoopShims_2_2() {
+    boolean zcr = false;
+    try {
+      Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+        HadoopShims_2_2.class.getClassLoader());
+      zcr = true;
+    } catch (ClassNotFoundException ce) {
+    }
+    zeroCopy = zcr;
+    boolean fastRead = false;
+    if (zcr) {
+      for (Method m : Text.class.getMethods()) {
+        if ("readWithKnownLength".equals(m.getName())) {
+          fastRead = true;
+        }
+      }
+    }
+    this.fastRead = fastRead;
+  }
+
   public DirectDecompressor getDirectDecompressor(
       DirectCompressionType codec) {
     return null;
   }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+                                              ByteBufferPoolShim pool
+                                              ) throws IOException {
+    if(zeroCopy) {
+      return ZeroCopyShims.getZeroCopyReader(in, pool);
+    }
+    /* not supported */
+    return null;
+  }
+
+  private final class BasicTextReaderShim implements TextReaderShim {
+    private final InputStream in;
+
+    public BasicTextReaderShim(InputStream in) {
+      this.in = in;
+    }
+
+    @Override
+    public void read(Text txt, int len) throws IOException {
+      int offset = 0;
+      byte[] bytes = new byte[len];
+      while (len > 0) {
+        int written = in.read(bytes, offset, len);
+        if (written < 0) {
+          throw new EOFException("Can't finish read from " + in + " read "
+              + (offset) + " bytes out of " + bytes.length);
+        }
+        len -= written;
+        offset += written;
+      }
+      txt.set(bytes);
+    }
+  }
+
+  @Override
+  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+    return new BasicTextReaderShim(in);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index 8bef0f1..3e64d54 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -78,4 +78,5 @@ public interface IntegerReader {
   void nextVector(ColumnVector column,
                   int[] data,
                   int length
-                  ) throws IOException;}
+                  ) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
new file mode 100644
index 0000000..72c7f54
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.Reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+public class OrcAcidUtils {
+  public static final String ACID_STATS = "hive.acid.stats";
+  public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
+
+  /**
+   * Get the filename of the ORC ACID side file that contains the lengths
+   * of the intermediate footers.
+   * @param main the main ORC filename
+   * @return the name of the side file
+   */
+  public static Path getSideFile(Path main) {
+    return new Path(main + DELTA_SIDE_FILE_SUFFIX);
+  }
+
+  /**
+   * Read the side file to get the last flush length.
+   * @param fs the file system to use
+   * @param deltaFile the path of the delta file
+   * @return the maximum size of the file to use
+   * @throws IOException
+   */
+  public static long getLastFlushLength(FileSystem fs,
+                                        Path deltaFile) throws IOException {
+    Path lengths = getSideFile(deltaFile);
+    long result = Long.MAX_VALUE;
+    try (FSDataInputStream stream = fs.open(lengths)) {
+      result = -1;
+      while (stream.available() > 0) {
+        result = stream.readLong();
+      }
+      return result;
+    } catch (IOException ioe) {
+      return result;
+    }
+  }
+
+  private static final Charset utf8 = Charset.forName("UTF-8");
+  private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
+
+  public static AcidStats parseAcidStats(Reader reader) {
+    if (reader.hasMetadataValue(ACID_STATS)) {
+      try {
+        ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate();
+        return new AcidStats(utf8Decoder.decode(val).toString());
+      } catch (CharacterCodingException e) {
+        throw new IllegalArgumentException("Bad string encoding for " +
+            ACID_STATS, e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
new file mode 100644
index 0000000..2da590e
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -0,0 +1,758 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.FileFormatException;
+import org.apache.orc.FileMetaInfo;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcProto;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.CodedInputStream;
+
+public class ReaderImpl implements Reader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class);
+
+  private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+  protected final FileSystem fileSystem;
+  private final long maxLength;
+  protected final Path path;
+  protected final org.apache.orc.CompressionKind compressionKind;
+  protected final CompressionCodec codec;
+  protected final int bufferSize;
+  private final List<OrcProto.StripeStatistics> stripeStats;
+  private final int metadataSize;
+  protected final List<OrcProto.Type> types;
+  private final TypeDescription schema;
+  private final List<OrcProto.UserMetadataItem> userMetadata;
+  private final List<OrcProto.ColumnStatistics> fileStats;
+  private final List<StripeInformation> stripes;
+  protected final int rowIndexStride;
+  private final long contentLength, numberOfRows;
+
+
+  private long deserializedSize = -1;
+  protected final Configuration conf;
+  private final List<Integer> versionList;
+  private final OrcFile.WriterVersion writerVersion;
+
+  // Same for metastore cache - maintains the same background buffer, but includes postscript.
+  // This will only be set if the file footer/metadata was read from disk.
+  private final ByteBuffer footerMetaAndPsBuffer;
+
+  public static class StripeInformationImpl
+      implements StripeInformation {
+    private final OrcProto.StripeInformation stripe;
+
+    public StripeInformationImpl(OrcProto.StripeInformation stripe) {
+      this.stripe = stripe;
+    }
+
+    @Override
+    public long getOffset() {
+      return stripe.getOffset();
+    }
+
+    @Override
+    public long getLength() {
+      return stripe.getDataLength() + getIndexLength() + getFooterLength();
+    }
+
+    @Override
+    public long getDataLength() {
+      return stripe.getDataLength();
+    }
+
+    @Override
+    public long getFooterLength() {
+      return stripe.getFooterLength();
+    }
+
+    @Override
+    public long getIndexLength() {
+      return stripe.getIndexLength();
+    }
+
+    @Override
+    public long getNumberOfRows() {
+      return stripe.getNumberOfRows();
+    }
+
+    @Override
+    public String toString() {
+      return "offset: " + getOffset() + " data: " + getDataLength() +
+        " rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
+        " index: " + getIndexLength();
+    }
+  }
+
+  @Override
+  public long getNumberOfRows() {
+    return numberOfRows;
+  }
+
+  @Override
+  public List<String> getMetadataKeys() {
+    List<String> result = new ArrayList<String>();
+    for(OrcProto.UserMetadataItem item: userMetadata) {
+      result.add(item.getName());
+    }
+    return result;
+  }
+
+  @Override
+  public ByteBuffer getMetadataValue(String key) {
+    for(OrcProto.UserMetadataItem item: userMetadata) {
+      if (item.hasName() && item.getName().equals(key)) {
+        return item.getValue().asReadOnlyByteBuffer();
+      }
+    }
+    throw new IllegalArgumentException("Can't find user metadata " + key);
+  }
+
+  public boolean hasMetadataValue(String key) {
+    for(OrcProto.UserMetadataItem item: userMetadata) {
+      if (item.hasName() && item.getName().equals(key)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public org.apache.orc.CompressionKind getCompressionKind() {
+    return compressionKind;
+  }
+
+  @Override
+  public int getCompressionSize() {
+    return bufferSize;
+  }
+
+  @Override
+  public List<StripeInformation> getStripes() {
+    return stripes;
+  }
+
+  @Override
+  public long getContentLength() {
+    return contentLength;
+  }
+
+  @Override
+  public List<OrcProto.Type> getTypes() {
+    return types;
+  }
+
+  @Override
+  public OrcFile.Version getFileVersion() {
+    for (OrcFile.Version version: OrcFile.Version.values()) {
+      if ((versionList != null && !versionList.isEmpty()) &&
+          version.getMajor() == versionList.get(0) &&
+          version.getMinor() == versionList.get(1)) {
+        return version;
+      }
+    }
+    return OrcFile.Version.V_0_11;
+  }
+
+  @Override
+  public OrcFile.WriterVersion getWriterVersion() {
+    return writerVersion;
+  }
+
+  @Override
+  public int getRowIndexStride() {
+    return rowIndexStride;
+  }
+
+  @Override
+  public ColumnStatistics[] getStatistics() {
+    ColumnStatistics[] result = new ColumnStatistics[types.size()];
+    for(int i=0; i < result.length; ++i) {
+      result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i));
+    }
+    return result;
+  }
+
+  @Override
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
+  /**
+   * Ensure this is an ORC file to prevent users from trying to read text
+   * files or RC files as ORC files.
+   * @param in the file being read
+   * @param path the filename for error messages
+   * @param psLen the postscript length
+   * @param buffer the tail of the file
+   * @throws IOException
+   */
+  protected static void ensureOrcFooter(FSDataInputStream in,
+                                        Path path,
+                                        int psLen,
+                                        ByteBuffer buffer) throws IOException {
+    int magicLength = OrcFile.MAGIC.length();
+    int fullLength = magicLength + 1;
+    if (psLen < fullLength || buffer.remaining() < fullLength) {
+      throw new FileFormatException("Malformed ORC file " + path +
+          ". Invalid postscript length " + psLen);
+    }
+    int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
+    byte[] array = buffer.array();
+    // now look for the magic string at the end of the postscript.
+    if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
+      // If it isn't there, this may be the 0.11.0 version of ORC.
+      // Read the first 3 bytes of the file to check for the header
+      byte[] header = new byte[magicLength];
+      in.readFully(0, header, 0, magicLength);
+      // if it isn't there, this isn't an ORC file
+      if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) {
+        throw new FileFormatException("Malformed ORC file " + path +
+            ". Invalid postscript.");
+      }
+    }
+  }
+
+  /**
+   * Build a version string out of an array.
+   * @param version the version number as a list
+   * @return the human readable form of the version string
+   */
+  private static String versionString(List<Integer> version) {
+    StringBuilder buffer = new StringBuilder();
+    for(int i=0; i < version.size(); ++i) {
+      if (i != 0) {
+        buffer.append('.');
+      }
+      buffer.append(version.get(i));
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Check to see if this ORC file is from a future version and if so,
+   * warn the user that we may not be able to read all of the column encodings.
+   * @param log the logger to write any error message to
+   * @param path the data source path for error messages
+   * @param version the version of hive that wrote the file.
+   */
+  protected static void checkOrcVersion(Logger log, Path path,
+                                        List<Integer> version) {
+    if (version.size() >= 1) {
+      int major = version.get(0);
+      int minor = 0;
+      if (version.size() >= 2) {
+        minor = version.get(1);
+      }
+      if (major > OrcFile.Version.CURRENT.getMajor() ||
+          (major == OrcFile.Version.CURRENT.getMajor() &&
+           minor > OrcFile.Version.CURRENT.getMinor())) {
+        log.warn(path + " was written by a future Hive version " +
+                 versionString(version) +
+                 ". This file may not be readable by this version of Hive.");
+      }
+    }
+  }
+
+  /**
+  * Constructor that let's the user specify additional options.
+   * @param path pathname for file
+   * @param options options for reading
+   * @throws IOException
+   */
+  public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException {
+    FileSystem fs = options.getFilesystem();
+    if (fs == null) {
+      fs = path.getFileSystem(options.getConfiguration());
+    }
+    this.fileSystem = fs;
+    this.path = path;
+    this.conf = options.getConfiguration();
+    this.maxLength = options.getMaxLength();
+
+    FileMetadata fileMetadata = options.getFileMetadata();
+    if (fileMetadata != null) {
+      this.compressionKind = fileMetadata.getCompressionKind();
+      this.bufferSize = fileMetadata.getCompressionBufferSize();
+      this.codec = WriterImpl.createCodec(compressionKind);
+      this.metadataSize = fileMetadata.getMetadataSize();
+      this.stripeStats = fileMetadata.getStripeStats();
+      this.versionList = fileMetadata.getVersionList();
+      this.writerVersion =
+          OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum());
+      this.types = fileMetadata.getTypes();
+      this.rowIndexStride = fileMetadata.getRowIndexStride();
+      this.contentLength = fileMetadata.getContentLength();
+      this.numberOfRows = fileMetadata.getNumberOfRows();
+      this.fileStats = fileMetadata.getFileStats();
+      this.stripes = fileMetadata.getStripes();
+      this.userMetadata = null; // not cached and not needed here
+      this.footerMetaAndPsBuffer = null;
+    } else {
+      FileMetaInfo footerMetaData;
+      if (options.getFileMetaInfo() != null) {
+        footerMetaData = options.getFileMetaInfo();
+        this.footerMetaAndPsBuffer = null;
+      } else {
+        footerMetaData = extractMetaInfoFromFooter(fs, path,
+            options.getMaxLength());
+        this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
+      }
+      MetaInfoObjExtractor rInfo =
+          new MetaInfoObjExtractor(footerMetaData.compressionType,
+                                   footerMetaData.bufferSize,
+                                   footerMetaData.metadataSize,
+                                   footerMetaData.footerBuffer
+                                   );
+      this.compressionKind = rInfo.compressionKind;
+      this.codec = rInfo.codec;
+      this.bufferSize = rInfo.bufferSize;
+      this.metadataSize = rInfo.metadataSize;
+      this.stripeStats = rInfo.metadata.getStripeStatsList();
+      this.types = rInfo.footer.getTypesList();
+      this.rowIndexStride = rInfo.footer.getRowIndexStride();
+      this.contentLength = rInfo.footer.getContentLength();
+      this.numberOfRows = rInfo.footer.getNumberOfRows();
+      this.userMetadata = rInfo.footer.getMetadataList();
+      this.fileStats = rInfo.footer.getStatisticsList();
+      this.versionList = footerMetaData.versionList;
+      this.writerVersion = footerMetaData.writerVersion;
+      this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
+    }
+    this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
+  }
+
+  /**
+   * Get the WriterVersion based on the ORC file postscript.
+   * @param writerVersion the integer writer version
+   * @return the version of the software that produced the file
+   */
+  public static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
+    for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
+      if (version.getId() == writerVersion) {
+        return version;
+      }
+    }
+    return OrcFile.WriterVersion.FUTURE;
+  }
+
+  private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+      int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+    bb.position(footerAbsPos);
+    bb.limit(footerAbsPos + footerSize);
+    return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
+        Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+  }
+
+  private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+      int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+    bb.position(metadataAbsPos);
+    bb.limit(metadataAbsPos + metadataSize);
+    return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
+        Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+  }
+
+  private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+      int psLen, int psAbsOffset) throws IOException {
+    // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+    assert bb.hasArray();
+    CodedInputStream in = CodedInputStream.newInstance(
+        bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+    checkOrcVersion(LOG, path, ps.getVersionList());
+
+    // Check compression codec.
+    switch (ps.getCompression()) {
+      case NONE:
+        break;
+      case ZLIB:
+        break;
+      case SNAPPY:
+        break;
+      case LZO:
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown compression");
+    }
+    return ps;
+  }
+
+  private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
+                                                        Path path,
+                                                        long maxFileLength
+                                                        ) throws IOException {
+    FSDataInputStream file = fs.open(path);
+    ByteBuffer buffer = null, fullFooterBuffer = null;
+    OrcProto.PostScript ps = null;
+    OrcFile.WriterVersion writerVersion = null;
+    try {
+      // figure out the size of the file using the option or filesystem
+      long size;
+      if (maxFileLength == Long.MAX_VALUE) {
+        size = fs.getFileStatus(path).getLen();
+      } else {
+        size = maxFileLength;
+      }
+
+      //read last bytes into buffer to get PostScript
+      int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+      buffer = ByteBuffer.allocate(readSize);
+      assert buffer.position() == 0;
+      file.readFully((size - readSize),
+          buffer.array(), buffer.arrayOffset(), readSize);
+      buffer.position(0);
+
+      //read the PostScript
+      //get length of PostScript
+      int psLen = buffer.get(readSize - 1) & 0xff;
+      ensureOrcFooter(file, path, psLen, buffer);
+      int psOffset = readSize - 1 - psLen;
+      ps = extractPostScript(buffer, path, psLen, psOffset);
+
+      int footerSize = (int) ps.getFooterLength();
+      int metadataSize = (int) ps.getMetadataLength();
+      writerVersion = extractWriterVersion(ps);
+
+      //check if extra bytes need to be read
+      int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
+      if (extra > 0) {
+        //more bytes need to be read, seek back to the right place and read extra bytes
+        ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+        file.readFully((size - readSize - extra), extraBuf.array(),
+            extraBuf.arrayOffset() + extraBuf.position(), extra);
+        extraBuf.position(extra);
+        //append with already read bytes
+        extraBuf.put(buffer);
+        buffer = extraBuf;
+        buffer.position(0);
+        fullFooterBuffer = buffer.slice();
+        buffer.limit(footerSize + metadataSize);
+      } else {
+        //footer is already in the bytes in buffer, just adjust position, length
+        buffer.position(psOffset - footerSize - metadataSize);
+        fullFooterBuffer = buffer.slice();
+        buffer.limit(psOffset);
+      }
+
+      // remember position for later TODO: what later? this comment is useless
+      buffer.mark();
+    } finally {
+      try {
+        file.close();
+      } catch (IOException ex) {
+        LOG.error("Failed to close the file after another error", ex);
+      }
+    }
+
+    return new FileMetaInfo(
+        ps.getCompression().toString(),
+        (int) ps.getCompressionBlockSize(),
+        (int) ps.getMetadataLength(),
+        buffer,
+        ps.getVersionList(),
+        writerVersion,
+        fullFooterBuffer
+        );
+  }
+
+  protected static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) {
+    return (ps.hasWriterVersion()
+        ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
+  }
+
+  protected static List<StripeInformation> convertProtoStripesToStripes(
+      List<OrcProto.StripeInformation> stripes) {
+    List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size());
+    for (OrcProto.StripeInformation info : stripes) {
+      result.add(new StripeInformationImpl(info));
+    }
+    return result;
+  }
+
+  /**
+   * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl
+   *  from serialized fields.
+   * As the fields are final, the fields need to be initialized in the constructor and
+   *  can't be done in some helper function. So this helper class is used instead.
+   *
+   */
+  private static class MetaInfoObjExtractor{
+    final org.apache.orc.CompressionKind compressionKind;
+    final CompressionCodec codec;
+    final int bufferSize;
+    final int metadataSize;
+    final OrcProto.Metadata metadata;
+    final OrcProto.Footer footer;
+
+    MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize, 
+        ByteBuffer footerBuffer) throws IOException {
+
+      this.compressionKind = org.apache.orc.CompressionKind.valueOf(codecStr.toUpperCase());
+      this.bufferSize = bufferSize;
+      this.codec = WriterImpl.createCodec(compressionKind);
+      this.metadataSize = metadataSize;
+
+      int position = footerBuffer.position();
+      int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
+
+      this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize);
+      this.footer = extractFooter(
+          footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize);
+
+      footerBuffer.position(position);
+    }
+  }
+
+  @Override
+  public ByteBuffer getSerializedFileFooter() {
+    return footerMetaAndPsBuffer;
+  }
+
+  @Override
+  public RecordReader rows() throws IOException {
+    return rows(new Options());
+  }
+
+  @Override
+  public RecordReader rows(Options options) throws IOException {
+    LOG.info("Reading ORC rows from " + path + " with " + options);
+    boolean[] include = options.getInclude();
+    // if included columns is null, then include all columns
+    if (include == null) {
+      include = new boolean[types.size()];
+      Arrays.fill(include, true);
+      options.include(include);
+    }
+    return new RecordReaderImpl(this, options);
+  }
+
+
+  @Override
+  public long getRawDataSize() {
+    // if the deserializedSize is not computed, then compute it, else
+    // return the already computed size. since we are reading from the footer
+    // we don't have to compute deserialized size repeatedly
+    if (deserializedSize == -1) {
+      List<Integer> indices = Lists.newArrayList();
+      for (int i = 0; i < fileStats.size(); ++i) {
+        indices.add(i);
+      }
+      deserializedSize = getRawDataSizeFromColIndices(indices);
+    }
+    return deserializedSize;
+  }
+
+  @Override
+  public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+    return getRawDataSizeFromColIndices(colIndices, types, fileStats);
+  }
+
+  public static long getRawDataSizeFromColIndices(
+      List<Integer> colIndices, List<OrcProto.Type> types,
+      List<OrcProto.ColumnStatistics> stats) {
+    long result = 0;
+    for (int colIdx : colIndices) {
+      result += getRawDataSizeOfColumn(colIdx, types, stats);
+    }
+    return result;
+  }
+
+  private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types,
+      List<OrcProto.ColumnStatistics> stats) {
+    OrcProto.ColumnStatistics colStat = stats.get(colIdx);
+    long numVals = colStat.getNumberOfValues();
+    OrcProto.Type type = types.get(colIdx);
+
+    switch (type.getKind()) {
+    case BINARY:
+      // old orc format doesn't support binary statistics. checking for binary
+      // statistics is not required as protocol buffers takes care of it.
+      return colStat.getBinaryStatistics().getSum();
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      // old orc format doesn't support sum for string statistics. checking for
+      // existence is not required as protocol buffers takes care of it.
+
+      // ORC strings are deserialized to java strings. so use java data model's
+      // string size
+      numVals = numVals == 0 ? 1 : numVals;
+      int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals);
+      return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen);
+    case TIMESTAMP:
+      return numVals * JavaDataModel.get().lengthOfTimestamp();
+    case DATE:
+      return numVals * JavaDataModel.get().lengthOfDate();
+    case DECIMAL:
+      return numVals * JavaDataModel.get().lengthOfDecimal();
+    case DOUBLE:
+    case LONG:
+      return numVals * JavaDataModel.get().primitive2();
+    case FLOAT:
+    case INT:
+    case SHORT:
+    case BOOLEAN:
+    case BYTE:
+      return numVals * JavaDataModel.get().primitive1();
+    default:
+      LOG.debug("Unknown primitive category: " + type.getKind());
+      break;
+    }
+
+    return 0;
+  }
+
+  @Override
+  public long getRawDataSizeOfColumns(List<String> colNames) {
+    List<Integer> colIndices = getColumnIndicesFromNames(colNames);
+    return getRawDataSizeFromColIndices(colIndices);
+  }
+
+  private List<Integer> getColumnIndicesFromNames(List<String> colNames) {
+    // top level struct
+    OrcProto.Type type = types.get(0);
+    List<Integer> colIndices = Lists.newArrayList();
+    List<String> fieldNames = type.getFieldNamesList();
+    int fieldIdx;
+    for (String colName : colNames) {
+      if (fieldNames.contains(colName)) {
+        fieldIdx = fieldNames.indexOf(colName);
+      } else {
+        String s = "Cannot find field for: " + colName + " in ";
+        for (String fn : fieldNames) {
+          s += fn + ", ";
+        }
+        LOG.warn(s);
+        continue;
+      }
+
+      // a single field may span multiple columns. find start and end column
+      // index for the requested field
+      int idxStart = type.getSubtypes(fieldIdx);
+
+      int idxEnd;
+
+      // if the specified is the last field and then end index will be last
+      // column index
+      if (fieldIdx + 1 > fieldNames.size() - 1) {
+        idxEnd = getLastIdx() + 1;
+      } else {
+        idxEnd = type.getSubtypes(fieldIdx + 1);
+      }
+
+      // if start index and end index are same then the field is a primitive
+      // field else complex field (like map, list, struct, union)
+      if (idxStart == idxEnd) {
+        // simple field
+        colIndices.add(idxStart);
+      } else {
+        // complex fields spans multiple columns
+        for (int i = idxStart; i < idxEnd; i++) {
+          colIndices.add(i);
+        }
+      }
+    }
+    return colIndices;
+  }
+
+  private int getLastIdx() {
+    Set<Integer> indices = new HashSet<>();
+    for (OrcProto.Type type : types) {
+      indices.addAll(type.getSubtypesList());
+    }
+    return Collections.max(indices);
+  }
+
+  @Override
+  public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+    return stripeStats;
+  }
+
+  @Override
+  public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() {
+    return fileStats;
+  }
+
+  @Override
+  public List<StripeStatistics> getStripeStatistics() {
+    List<StripeStatistics> result = new ArrayList<>();
+    for (OrcProto.StripeStatistics ss : stripeStats) {
+      result.add(new StripeStatistics(ss.getColStatsList()));
+    }
+    return result;
+  }
+
+  public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
+    return userMetadata;
+  }
+
+  @Override
+  public List<Integer> getVersionList() {
+    return versionList;
+  }
+
+  @Override
+  public int getMetadataSize() {
+    return metadataSize;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("ORC Reader(");
+    buffer.append(path);
+    if (maxLength != -1) {
+      buffer.append(", ");
+      buffer.append(maxLength);
+    }
+    buffer.append(")");
+    return buffer.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
new file mode 100644
index 0000000..36a802e
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -0,0 +1,1215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcConf;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TimestampColumnStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.BloomFilterIO;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.ql.util.TimestampUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcProto;
+
+public class RecordReaderImpl implements RecordReader {
+  static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
+  private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
+  private static final Object UNKNOWN_VALUE = new Object();
+  protected final Path path;
+  private final long firstRow;
+  private final List<StripeInformation> stripes =
+      new ArrayList<StripeInformation>();
+  private OrcProto.StripeFooter stripeFooter;
+  private final long totalRowCount;
+  private final CompressionCodec codec;
+  protected final TypeDescription schema;
+  private final List<OrcProto.Type> types;
+  private final int bufferSize;
+  private final boolean[] included;
+  private final long rowIndexStride;
+  private long rowInStripe = 0;
+  private int currentStripe = -1;
+  private long rowBaseInStripe = 0;
+  private long rowCountInStripe = 0;
+  private final Map<StreamName, InStream> streams =
+      new HashMap<StreamName, InStream>();
+  DiskRangeList bufferChunks = null;
+  private final TreeReaderFactory.TreeReader reader;
+  private final OrcProto.RowIndex[] indexes;
+  private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+  private final SargApplier sargApp;
+  // an array about which row groups aren't skipped
+  private boolean[] includedRowGroups = null;
+  private final DataReader dataReader;
+
+  /**
+   * Given a list of column names, find the given column and return the index.
+   *
+   * @param columnNames the list of potential column names
+   * @param columnName  the column name to look for
+   * @param rootColumn  offset the result with the rootColumn
+   * @return the column number or -1 if the column wasn't found
+   */
+  static int findColumns(String[] columnNames,
+                         String columnName,
+                         int rootColumn) {
+    for(int i=0; i < columnNames.length; ++i) {
+      if (columnName.equals(columnNames[i])) {
+        return i + rootColumn;
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Find the mapping from predicate leaves to columns.
+   * @param sargLeaves the search argument that we need to map
+   * @param columnNames the names of the columns
+   * @param rootColumn the offset of the top level row, which offsets the
+   *                   result
+   * @return an array mapping the sarg leaves to concrete column numbers
+   */
+  public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
+                             String[] columnNames,
+                             int rootColumn) {
+    int[] result = new int[sargLeaves.size()];
+    Arrays.fill(result, -1);
+    for(int i=0; i < result.length; ++i) {
+      String colName = sargLeaves.get(i).getColumnName();
+      result[i] = findColumns(columnNames, colName, rootColumn);
+    }
+    return result;
+  }
+
+  protected RecordReaderImpl(ReaderImpl fileReader,
+                             Reader.Options options) throws IOException {
+    SchemaEvolution treeReaderSchema;
+    this.included = options.getInclude();
+    included[0] = true;
+    if (options.getSchema() == null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Schema on read not provided -- using file schema " +
+            fileReader.getSchema());
+      }
+      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
+    } else {
+
+      // Now that we are creating a record reader for a file, validate that the schema to read
+      // is compatible with the file schema.
+      //
+      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
+          options.getSchema(),included);
+    }
+    this.schema = treeReaderSchema.getReaderSchema();
+    this.path = fileReader.path;
+    this.codec = fileReader.codec;
+    this.types = fileReader.types;
+    this.bufferSize = fileReader.bufferSize;
+    this.rowIndexStride = fileReader.rowIndexStride;
+    SearchArgument sarg = options.getSearchArgument();
+    if (sarg != null && rowIndexStride != 0) {
+      sargApp = new SargApplier(
+          sarg, options.getColumnNames(), rowIndexStride, types,
+          included.length);
+    } else {
+      sargApp = null;
+    }
+    long rows = 0;
+    long skippedRows = 0;
+    long offset = options.getOffset();
+    long maxOffset = options.getMaxOffset();
+    for(StripeInformation stripe: fileReader.getStripes()) {
+      long stripeStart = stripe.getOffset();
+      if (offset > stripeStart) {
+        skippedRows += stripe.getNumberOfRows();
+      } else if (stripeStart < maxOffset) {
+        this.stripes.add(stripe);
+        rows += stripe.getNumberOfRows();
+      }
+    }
+
+    Boolean zeroCopy = options.getUseZeroCopy();
+    if (zeroCopy == null) {
+      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+    }
+    if (options.getDataReader() != null) {
+      this.dataReader = options.getDataReader();
+    } else {
+      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+          DataReaderProperties.builder()
+              .withBufferSize(bufferSize)
+              .withCompression(fileReader.compressionKind)
+              .withFileSystem(fileReader.fileSystem)
+              .withPath(fileReader.path)
+              .withTypeCount(types.size())
+              .withZeroCopy(zeroCopy)
+              .build());
+    }
+    this.dataReader.open();
+
+    firstRow = skippedRows;
+    totalRowCount = rows;
+    Boolean skipCorrupt = options.getSkipCorruptRecords();
+    if (skipCorrupt == null) {
+      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
+    }
+
+    reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
+        treeReaderSchema, included, skipCorrupt);
+    indexes = new OrcProto.RowIndex[types.size()];
+    bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+    advanceToNextRow(reader, 0L, true);
+  }
+
+  public static final class PositionProviderImpl implements PositionProvider {
+    private final OrcProto.RowIndexEntry entry;
+    private int index;
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+      this(entry, 0);
+    }
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
+      this.entry = entry;
+      this.index = startPos;
+    }
+
+    @Override
+    public long getNext() {
+      return entry.getPositions(index++);
+    }
+  }
+
+  public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+                                                ) throws IOException {
+    return dataReader.readStripeFooter(stripe);
+  }
+
+  enum Location {
+    BEFORE, MIN, MIDDLE, MAX, AFTER
+  }
+
+  /**
+   * Given a point and min and max, determine if the point is before, at the
+   * min, in the middle, at the max, or after the range.
+   * @param point the point to test
+   * @param min the minimum point
+   * @param max the maximum point
+   * @param <T> the type of the comparision
+   * @return the location of the point
+   */
+  static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+    int minCompare = point.compareTo(min);
+    if (minCompare < 0) {
+      return Location.BEFORE;
+    } else if (minCompare == 0) {
+      return Location.MIN;
+    }
+    int maxCompare = point.compareTo(max);
+    if (maxCompare > 0) {
+      return Location.AFTER;
+    } else if (maxCompare == 0) {
+      return Location.MAX;
+    }
+    return Location.MIDDLE;
+  }
+
+  /**
+   * Get the maximum value out of an index entry.
+   * @param index
+   *          the index entry
+   * @return the object for the maximum value or null if there isn't one
+   */
+  static Object getMax(ColumnStatistics index) {
+    if (index instanceof IntegerColumnStatistics) {
+      return ((IntegerColumnStatistics) index).getMaximum();
+    } else if (index instanceof DoubleColumnStatistics) {
+      return ((DoubleColumnStatistics) index).getMaximum();
+    } else if (index instanceof StringColumnStatistics) {
+      return ((StringColumnStatistics) index).getMaximum();
+    } else if (index instanceof DateColumnStatistics) {
+      return ((DateColumnStatistics) index).getMaximum();
+    } else if (index instanceof DecimalColumnStatistics) {
+      return ((DecimalColumnStatistics) index).getMaximum();
+    } else if (index instanceof TimestampColumnStatistics) {
+      return ((TimestampColumnStatistics) index).getMaximum();
+    } else if (index instanceof BooleanColumnStatistics) {
+      if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
+        return Boolean.TRUE;
+      } else {
+        return Boolean.FALSE;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get the minimum value out of an index entry.
+   * @param index
+   *          the index entry
+   * @return the object for the minimum value or null if there isn't one
+   */
+  static Object getMin(ColumnStatistics index) {
+    if (index instanceof IntegerColumnStatistics) {
+      return ((IntegerColumnStatistics) index).getMinimum();
+    } else if (index instanceof DoubleColumnStatistics) {
+      return ((DoubleColumnStatistics) index).getMinimum();
+    } else if (index instanceof StringColumnStatistics) {
+      return ((StringColumnStatistics) index).getMinimum();
+    } else if (index instanceof DateColumnStatistics) {
+      return ((DateColumnStatistics) index).getMinimum();
+    } else if (index instanceof DecimalColumnStatistics) {
+      return ((DecimalColumnStatistics) index).getMinimum();
+    } else if (index instanceof TimestampColumnStatistics) {
+      return ((TimestampColumnStatistics) index).getMinimum();
+    } else if (index instanceof BooleanColumnStatistics) {
+      if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
+        return Boolean.FALSE;
+      } else {
+        return Boolean.TRUE;
+      }
+    } else {
+      return UNKNOWN_VALUE; // null is not safe here
+    }
+  }
+
+  /**
+   * Evaluate a predicate with respect to the statistics from the column
+   * that is referenced in the predicate.
+   * @param statsProto the statistics for the column mentioned in the predicate
+   * @param predicate the leaf predicate we need to evaluation
+   * @param bloomFilter
+   * @return the set of truth values that may be returned for the given
+   *   predicate.
+   */
+  static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
+      PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
+    ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
+    Object minValue = getMin(cs);
+    Object maxValue = getMax(cs);
+    BloomFilterIO bf = null;
+    if (bloomFilter != null) {
+      bf = new BloomFilterIO(bloomFilter);
+    }
+    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
+  }
+
+  /**
+   * Evaluate a predicate with respect to the statistics from the column
+   * that is referenced in the predicate.
+   * @param stats the statistics for the column mentioned in the predicate
+   * @param predicate the leaf predicate we need to evaluation
+   * @return the set of truth values that may be returned for the given
+   *   predicate.
+   */
+  public static TruthValue evaluatePredicate(ColumnStatistics stats,
+                                             PredicateLeaf predicate,
+                                             BloomFilterIO bloomFilter) {
+    Object minValue = getMin(stats);
+    Object maxValue = getMax(stats);
+    return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
+  }
+
+  static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
+      Object max, boolean hasNull, BloomFilterIO bloomFilter) {
+    // if we didn't have any values, everything must have been null
+    if (min == null) {
+      if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+        return TruthValue.YES;
+      } else {
+        return TruthValue.NULL;
+      }
+    } else if (min == UNKNOWN_VALUE) {
+      return TruthValue.YES_NO_NULL;
+    }
+
+    TruthValue result;
+    Object baseObj = predicate.getLiteral();
+    try {
+      // Predicate object and stats objects are converted to the type of the predicate object.
+      Object minValue = getBaseObjectForComparison(predicate.getType(), min);
+      Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
+      Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
+
+      result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
+      if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
+        result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
+      }
+      // in case failed conversion, return the default YES_NO_NULL truth value
+    } catch (Exception e) {
+      if (LOG.isWarnEnabled()) {
+        final String statsType = min == null ?
+            (max == null ? "null" : max.getClass().getSimpleName()) :
+            min.getClass().getSimpleName();
+        final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
+        final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
+            " Skipping ORC PPD." +
+            " Exception: " + e.getMessage() +
+            " StatsType: " + statsType +
+            " PredicateType: " + predicateType;
+        LOG.warn(reason);
+        LOG.debug(reason, e);
+      }
+      if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
+        result = TruthValue.YES_NO;
+      } else {
+        result = TruthValue.YES_NO_NULL;
+      }
+    }
+    return result;
+  }
+
+  private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
+      TruthValue result, BloomFilterIO bloomFilter) {
+    // evaluate bloom filter only when
+    // 1) Bloom filter is available
+    // 2) Min/Max evaluation yield YES or MAYBE
+    // 3) Predicate is EQUALS or IN list
+    if (bloomFilter != null
+        && result != TruthValue.NO_NULL && result != TruthValue.NO
+        && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
+            || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+            || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
+      return true;
+    }
+    return false;
+  }
+
+  private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
+      Object minValue,
+      Object maxValue,
+      boolean hasNull) {
+    Location loc;
+
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return TruthValue.NO;
+        } else {
+          return TruthValue.YES_NO;
+        }
+      case EQUALS:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (minValue.equals(maxValue) && loc == Location.MIN) {
+          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+        } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case LESS_THAN:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (loc == Location.AFTER) {
+          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+        } else if (loc == Location.BEFORE || loc == Location.MIN) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case LESS_THAN_EQUALS:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (loc == Location.AFTER || loc == Location.MAX) {
+          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+        } else if (loc == Location.BEFORE) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case IN:
+        if (minValue.equals(maxValue)) {
+          // for a single value, look through to see if that value is in the
+          // set
+          for (Object arg : predicate.getLiteralList()) {
+            predObj = getBaseObjectForComparison(predicate.getType(), arg);
+            loc = compareToRange((Comparable) predObj, minValue, maxValue);
+            if (loc == Location.MIN) {
+              return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+            }
+          }
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          // are all of the values outside of the range?
+          for (Object arg : predicate.getLiteralList()) {
+            predObj = getBaseObjectForComparison(predicate.getType(), arg);
+            loc = compareToRange((Comparable) predObj, minValue, maxValue);
+            if (loc == Location.MIN || loc == Location.MIDDLE ||
+                loc == Location.MAX) {
+              return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+            }
+          }
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        }
+      case BETWEEN:
+        List<Object> args = predicate.getLiteralList();
+        Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
+
+        loc = compareToRange((Comparable) predObj1, minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.MIN) {
+          Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
+
+          Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
+          if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+            return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+          } else if (loc2 == Location.BEFORE) {
+            return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+          } else {
+            return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+          }
+        } else if (loc == Location.AFTER) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case IS_NULL:
+        // min = null condition above handles the all-nulls YES case
+        return hasNull ? TruthValue.YES_NO : TruthValue.NO;
+      default:
+        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+  }
+
+  private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
+      final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        // null safe equals does not return *_NULL variant. So set hasNull to false
+        return checkInBloomFilter(bloomFilter, predObj, false);
+      case EQUALS:
+        return checkInBloomFilter(bloomFilter, predObj, hasNull);
+      case IN:
+        for (Object arg : predicate.getLiteralList()) {
+          // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
+          Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
+          TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
+          if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
+            return result;
+          }
+        }
+        return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+      default:
+        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+  }
+
+  private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
+    TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+
+    if (predObj instanceof Long) {
+      if (bf.testLong(((Long) predObj).longValue())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Double) {
+      if (bf.testDouble(((Double) predObj).doubleValue())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof String || predObj instanceof Text ||
+        predObj instanceof HiveDecimalWritable ||
+        predObj instanceof BigDecimal) {
+      if (bf.testString(predObj.toString())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Timestamp) {
+      if (bf.testLong(((Timestamp) predObj).getTime())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Date) {
+      if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else {
+        // if the predicate object is null and if hasNull says there are no nulls then return NO
+        if (predObj == null && !hasNull) {
+          result = TruthValue.NO;
+        } else {
+          result = TruthValue.YES_NO_NULL;
+        }
+      }
+
+    if (result == TruthValue.YES_NO_NULL && !hasNull) {
+      result = TruthValue.YES_NO;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Bloom filter evaluation: " + result.toString());
+    }
+
+    return result;
+  }
+
+  private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
+    if (obj == null) {
+      return null;
+    }
+    switch (type) {
+      case BOOLEAN:
+        if (obj instanceof Boolean) {
+          return obj;
+        } else {
+          // will only be true if the string conversion yields "true", all other values are
+          // considered false
+          return Boolean.valueOf(obj.toString());
+        }
+      case DATE:
+        if (obj instanceof Date) {
+          return obj;
+        } else if (obj instanceof String) {
+          return Date.valueOf((String) obj);
+        } else if (obj instanceof Timestamp) {
+          return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
+        }
+        // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
+        break;
+      case DECIMAL:
+        if (obj instanceof Boolean) {
+          return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
+              HiveDecimal.ONE : HiveDecimal.ZERO);
+        } else if (obj instanceof Integer) {
+          return new HiveDecimalWritable(((Integer) obj).intValue());
+        } else if (obj instanceof Long) {
+          return new HiveDecimalWritable(((Long) obj));
+        } else if (obj instanceof Float || obj instanceof Double ||
+            obj instanceof String) {
+          return new HiveDecimalWritable(obj.toString());
+        } else if (obj instanceof BigDecimal) {
+          return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
+        } else if (obj instanceof HiveDecimal) {
+          return new HiveDecimalWritable((HiveDecimal) obj);
+        } else if (obj instanceof HiveDecimalWritable) {
+          return obj;
+        } else if (obj instanceof Timestamp) {
+          return new HiveDecimalWritable(Double.toString(
+              TimestampUtils.getDouble((Timestamp) obj)));
+        }
+        break;
+      case FLOAT:
+        if (obj instanceof Number) {
+          // widening conversion
+          return ((Number) obj).doubleValue();
+        } else if (obj instanceof HiveDecimal) {
+          return ((HiveDecimal) obj).doubleValue();
+        } else if (obj instanceof String) {
+          return Double.valueOf(obj.toString());
+        } else if (obj instanceof Timestamp) {
+          return TimestampUtils.getDouble((Timestamp) obj);
+        } else if (obj instanceof HiveDecimal) {
+          return ((HiveDecimal) obj).doubleValue();
+        } else if (obj instanceof BigDecimal) {
+          return ((BigDecimal) obj).doubleValue();
+        }
+        break;
+      case LONG:
+        if (obj instanceof Number) {
+          // widening conversion
+          return ((Number) obj).longValue();
+        } else if (obj instanceof HiveDecimal) {
+          return ((HiveDecimal) obj).longValue();
+        } else if (obj instanceof String) {
+          return Long.valueOf(obj.toString());
+        }
+        break;
+      case STRING:
+        if (obj != null) {
+          return (obj.toString());
+        }
+        break;
+      case TIMESTAMP:
+        if (obj instanceof Timestamp) {
+          return obj;
+        } else if (obj instanceof Integer) {
+          return new Timestamp(((Number) obj).longValue());
+        } else if (obj instanceof Float) {
+          return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue());
+        } else if (obj instanceof Double) {
+          return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue());
+        } else if (obj instanceof HiveDecimal) {
+          return TimestampUtils.decimalToTimestamp((HiveDecimal) obj);
+        } else if (obj instanceof HiveDecimalWritable) {
+          return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
+        } else if (obj instanceof Date) {
+          return new Timestamp(((Date) obj).getTime());
+        }
+        // float/double conversion to timestamp is interpreted as seconds whereas integer conversion
+        // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
+        // is also config driven. The filter operator changes its promotion based on config:
+        // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
+        break;
+      default:
+        break;
+    }
+
+    throw new IllegalArgumentException(String.format(
+        "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
+            .getSimpleName(), type));
+  }
+
+  public static class SargApplier {
+    public final static boolean[] READ_ALL_RGS = null;
+    public final static boolean[] READ_NO_RGS = new boolean[0];
+
+    private final SearchArgument sarg;
+    private final List<PredicateLeaf> sargLeaves;
+    private final int[] filterColumns;
+    private final long rowIndexStride;
+    // same as the above array, but indices are set to true
+    private final boolean[] sargColumns;
+
+    public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
+        List<OrcProto.Type> types, int includedCount) {
+      this.sarg = sarg;
+      sargLeaves = sarg.getLeaves();
+      filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
+      this.rowIndexStride = rowIndexStride;
+      // included will not be null, row options will fill the array with trues if null
+      sargColumns = new boolean[includedCount];
+      for (int i : filterColumns) {
+        // filter columns may have -1 as index which could be partition column in SARG.
+        if (i > 0) {
+          sargColumns[i] = true;
+        }
+      }
+    }
+
+    /**
+     * Pick the row groups that we need to load from the current stripe.
+     *
+     * @return an array with a boolean for each row group or null if all of the
+     * row groups must be read.
+     * @throws IOException
+     */
+    public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
+        OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
+      long rowsInStripe = stripe.getNumberOfRows();
+      int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+      boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
+      TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+      boolean hasSelected = false, hasSkipped = false;
+      for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
+        for (int pred = 0; pred < leafValues.length; ++pred) {
+          int columnIx = filterColumns[pred];
+          if (columnIx != -1) {
+            if (indexes[columnIx] == null) {
+              throw new AssertionError("Index is not populated for " + columnIx);
+            }
+            OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
+            if (entry == null) {
+              throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
+            }
+            OrcProto.ColumnStatistics stats = entry.getStatistics();
+            OrcProto.BloomFilter bf = null;
+            if (bloomFilterIndices != null && bloomFilterIndices[filterColumns[pred]] != null) {
+              bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
+            }
+            leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Stats = " + stats);
+              LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
+            }
+          } else {
+            // the column is a virtual column
+            leafValues[pred] = TruthValue.YES_NO_NULL;
+          }
+        }
+        result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+        hasSelected = hasSelected || result[rowGroup];
+        hasSkipped = hasSkipped || (!result[rowGroup]);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+              (rowIndexStride * (rowGroup + 1) - 1) + " is " +
+              (result[rowGroup] ? "" : "not ") + "included.");
+        }
+      }
+
+      return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
+    }
+  }
+
+  /**
+   * Pick the row groups that we need to load from the current stripe.
+   *
+   * @return an array with a boolean for each row group or null if all of the
+   * row groups must be read.
+   * @throws IOException
+   */
+  protected boolean[] pickRowGroups() throws IOException {
+    // if we don't have a sarg or indexes, we read everything
+    if (sargApp == null) {
+      return null;
+    }
+    readRowIndex(currentStripe, included, sargApp.sargColumns);
+    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
+  }
+
+  private void clearStreams() {
+    // explicit close of all streams to de-ref ByteBuffers
+    for (InStream is : streams.values()) {
+      is.close();
+    }
+    if (bufferChunks != null) {
+      if (dataReader.isTrackingDiskRanges()) {
+        for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
+          if (!(range instanceof BufferChunk)) {
+            continue;
+          }
+          dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+        }
+      }
+    }
+    bufferChunks = null;
+    streams.clear();
+  }
+
+  /**
+   * Read the current stripe into memory.
+   *
+   * @throws IOException
+   */
+  private void readStripe() throws IOException {
+    StripeInformation stripe = beginReadStripe();
+    includedRowGroups = pickRowGroups();
+
+    // move forward to the first unskipped row
+    if (includedRowGroups != null) {
+      while (rowInStripe < rowCountInStripe &&
+          !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+        rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+      }
+    }
+
+    // if we haven't skipped the whole stripe, read the data
+    if (rowInStripe < rowCountInStripe) {
+      // if we aren't projecting columns or filtering rows, just read it all
+      if (included == null && includedRowGroups == null) {
+        readAllDataStreams(stripe);
+      } else {
+        readPartialDataStreams(stripe);
+      }
+      reader.startStripe(streams, stripeFooter);
+      // if we skipped the first row group, move the pointers forward
+      if (rowInStripe != 0) {
+        seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+      }
+    }
+  }
+
+  private StripeInformation beginReadStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    clearStreams();
+    // setup the position in the stripe
+    rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
+    rowBaseInStripe = 0;
+    for (int i = 0; i < currentStripe; ++i) {
+      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+    }
+    // reset all of the indexes
+    for (int i = 0; i < indexes.length; ++i) {
+      indexes[i] = null;
+    }
+    return stripe;
+  }
+
+  private void readAllDataStreams(StripeInformation stripe) throws IOException {
+    long start = stripe.getIndexLength();
+    long end = start + stripe.getDataLength();
+    // explicitly trigger 1 big read
+    DiskRangeList toRead = new DiskRangeList(start, end);
+    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+    List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+  }
+
+  /**
+   * Plan the ranges of the file that we need to read given the list of
+   * columns and row groups.
+   *
+   * @param streamList        the list of streams available
+   * @param indexes           the indexes that have been loaded
+   * @param includedColumns   which columns are needed
+   * @param includedRowGroups which row groups are needed
+   * @param isCompressed      does the file have generic compression
+   * @param encodings         the encodings for each column
+   * @param types             the types of the columns
+   * @param compressionSize   the compression block size
+   * @return the list of disk ranges that will be loaded
+   */
+  static DiskRangeList planReadPartialDataStreams
+  (List<OrcProto.Stream> streamList,
+      OrcProto.RowIndex[] indexes,
+      boolean[] includedColumns,
+      boolean[] includedRowGroups,
+      boolean isCompressed,
+      List<OrcProto.ColumnEncoding> encodings,
+      List<OrcProto.Type> types,
+      int compressionSize,
+      boolean doMergeBuffers) {
+    long offset = 0;
+    // figure out which columns have a present stream
+    boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
+    CreateHelper list = new CreateHelper();
+    for (OrcProto.Stream stream : streamList) {
+      long length = stream.getLength();
+      int column = stream.getColumn();
+      OrcProto.Stream.Kind streamKind = stream.getKind();
+      // since stream kind is optional, first check if it exists
+      if (stream.hasKind() &&
+          (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
+          (column < includedColumns.length && includedColumns[column])) {
+        // if we aren't filtering or it is a dictionary, load it.
+        if (includedRowGroups == null
+            || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
+          RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
+        } else {
+          RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
+              isCompressed, indexes[column], encodings.get(column), types.get(column),
+              compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
+        }
+      }
+      offset += length;
+    }
+    return list.extract();
+  }
+
+  void createStreams(List<OrcProto.Stream> streamDescriptions,
+      DiskRangeList ranges,
+      boolean[] includeColumn,
+      CompressionCodec codec,
+      int bufferSize,
+      Map<StreamName, InStream> streams) throws IOException {
+    long streamOffset = 0;
+    for (OrcProto.Stream streamDesc : streamDescriptions) {
+      int column = streamDesc.getColumn();
+      if ((includeColumn != null &&
+          (column < included.length && !includeColumn[column])) ||
+          streamDesc.hasKind() &&
+              (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
+        streamOffset += streamDesc.getLength();
+        continue;
+      }
+      List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+          ranges, streamOffset, streamDesc.getLength());
+      StreamName name = new StreamName(column, streamDesc.getKind());
+      streams.put(name, InStream.create(name.toString(), buffers,
+          streamDesc.getLength(), codec, bufferSize));
+      streamOffset += streamDesc.getLength();
+    }
+  }
+
+  private void readPartialDataStreams(StripeInformation stripe) throws IOException {
+    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    DiskRangeList toRead = planReadPartialDataStreams(streamList,
+        indexes, included, includedRowGroups, codec != null,
+        stripeFooter.getColumnsList(), types, bufferSize, true);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+    }
+    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+    }
+
+    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+  }
+
+  /**
+   * Read the next stripe until we find a row that we don't skip.
+   *
+   * @throws IOException
+   */
+  private void advanceStripe() throws IOException {
+    rowInStripe = rowCountInStripe;
+    while (rowInStripe >= rowCountInStripe &&
+        currentStripe < stripes.size() - 1) {
+      currentStripe += 1;
+      readStripe();
+    }
+  }
+
+  /**
+   * Skip over rows that we aren't selecting, so that the next row is
+   * one that we will read.
+   *
+   * @param nextRow the row we want to go to
+   * @throws IOException
+   */
+  private boolean advanceToNextRow(
+      TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
+      throws IOException {
+    long nextRowInStripe = nextRow - rowBaseInStripe;
+    // check for row skipping
+    if (rowIndexStride != 0 &&
+        includedRowGroups != null &&
+        nextRowInStripe < rowCountInStripe) {
+      int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+      if (!includedRowGroups[rowGroup]) {
+        while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
+          rowGroup += 1;
+        }
+        if (rowGroup >= includedRowGroups.length) {
+          if (canAdvanceStripe) {
+            advanceStripe();
+          }
+          return canAdvanceStripe;
+        }
+        nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+      }
+    }
+    if (nextRowInStripe >= rowCountInStripe) {
+      if (canAdvanceStripe) {
+        advanceStripe();
+      }
+      return canAdvanceStripe;
+    }
+    if (nextRowInStripe != rowInStripe) {
+      if (rowIndexStride != 0) {
+        int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+        seekToRowEntry(reader, rowGroup);
+        reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+      } else {
+        reader.skipRows(nextRowInStripe - rowInStripe);
+      }
+      rowInStripe = nextRowInStripe;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+    try {
+      if (rowInStripe >= rowCountInStripe) {
+        currentStripe += 1;
+        if (currentStripe >= stripes.size()) {
+          batch.size = 0;
+          return false;
+        }
+        readStripe();
+      }
+
+      int batchSize = computeBatchSize(batch.getMaxSize());
+
+      rowInStripe += batchSize;
+      reader.setVectorColumnCount(batch.getDataColumnCount());
+      reader.nextBatch(batch, batchSize);
+      batch.selectedInUse = false;
+      batch.size = batchSize;
+      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+      return batch.size  != 0;
+    } catch (IOException e) {
+      // Rethrow exception with file name in log message
+      throw new IOException("Error reading file: " + path, e);
+    }
+  }
+
+  private int computeBatchSize(long targetBatchSize) {
+    final int batchSize;
+    // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
+    // groups are selected then marker position is set to the end of range (subset of row groups
+    // within strip). Batch size computed out of marker position makes sure that batch size is
+    // aware of row group boundary and will not cause overflow when reading rows
+    // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
+    if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
+      int startRowGroup = (int) (rowInStripe / rowIndexStride);
+      if (!includedRowGroups[startRowGroup]) {
+        while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
+          startRowGroup += 1;
+        }
+      }
+
+      int endRowGroup = startRowGroup;
+      while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
+        endRowGroup += 1;
+      }
+
+      final long markerPosition =
+          (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
+              : rowCountInStripe;
+      batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
+
+      if (isLogDebugEnabled && batchSize < targetBatchSize) {
+        LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
+      }
+    } else {
+      batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
+    }
+    return batchSize;
+  }
+
+  @Override
+  public void close() throws IOException {
+    clearStreams();
+    dataReader.close();
+  }
+
+  @Override
+  public long getRowNumber() {
+    return rowInStripe + rowBaseInStripe + firstRow;
+  }
+
+  /**
+   * Return the fraction of rows that have been read from the selected.
+   * section of the file
+   *
+   * @return fraction between 0.0 and 1.0 of rows consumed
+   */
+  @Override
+  public float getProgress() {
+    return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+  }
+
+  private int findStripe(long rowNumber) {
+    for (int i = 0; i < stripes.size(); i++) {
+      StripeInformation stripe = stripes.get(i);
+      if (stripe.getNumberOfRows() > rowNumber) {
+        return i;
+      }
+      rowNumber -= stripe.getNumberOfRows();
+    }
+    throw new IllegalArgumentException("Seek after the end of reader range");
+  }
+
+  public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
+                               boolean[] sargColumns) throws IOException {
+    return readRowIndex(stripeIndex, included, null, null, sargColumns);
+  }
+
+  public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
+                               OrcProto.RowIndex[] indexes,
+                               OrcProto.BloomFilterIndex[] bloomFilterIndex,
+                               boolean[] sargColumns) throws IOException {
+    StripeInformation stripe = stripes.get(stripeIndex);
+    OrcProto.StripeFooter stripeFooter = null;
+    // if this is the current stripe, use the cached objects.
+    if (stripeIndex == currentStripe) {
+      stripeFooter = this.stripeFooter;
+      indexes = indexes == null ? this.indexes : indexes;
+      bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
+      sargColumns = sargColumns == null ?
+          (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
+    }
+    return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
+        bloomFilterIndex);
+  }
+
+  private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
+      throws IOException {
+    PositionProvider[] index = new PositionProvider[indexes.length];
+    for (int i = 0; i < indexes.length; ++i) {
+      if (indexes[i] != null) {
+        index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+      }
+    }
+    reader.seek(index);
+  }
+
+  @Override
+  public void seekToRow(long rowNumber) throws IOException {
+    if (rowNumber < 0) {
+      throw new IllegalArgumentException("Seek to a negative row number " +
+          rowNumber);
+    } else if (rowNumber < firstRow) {
+      throw new IllegalArgumentException("Seek before reader range " +
+          rowNumber);
+    }
+    // convert to our internal form (rows from the beginning of slice)
+    rowNumber -= firstRow;
+
+    // move to the right stripe
+    int rightStripe = findStripe(rowNumber);
+    if (rightStripe != currentStripe) {
+      currentStripe = rightStripe;
+      readStripe();
+    }
+    readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
+
+    // if we aren't to the right row yet, advance in the stripe.
+    advanceToNextRow(reader, rowNumber, true);
+  }
+
+  private static final String TRANSLATED_SARG_SEPARATOR = "_";
+  public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
+    return rootColumn + TRANSLATED_SARG_SEPARATOR
+        + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
+  }
+
+  public static int[] mapTranslatedSargColumns(
+      List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
+    int[] result = new int[sargLeaves.size()];
+    OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
+    String lastRootStr = null;
+    for (int i = 0; i < result.length; ++i) {
+      String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
+      assert rootAndIndex.length == 2;
+      String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
+      int index = Integer.parseInt(indexStr);
+      // First, check if the column even maps to anything.
+      if (index == -1) {
+        result[i] = -1;
+        continue;
+      }
+      assert index >= 0;
+      // Then, find the root type if needed.
+      if (!rootStr.equals(lastRootStr)) {
+        lastRoot = types.get(Integer.parseInt(rootStr));
+        lastRootStr = rootStr;
+      }
+      // Subtypes of the root types correspond, in order, to the columns in the table schema
+      // (disregarding schema evolution that doesn't presently work). Get the index for the
+      // corresponding subtype.
+      result[i] = lastRoot.getSubtypes(index);
+    }
+    return result;
+  }
+}