You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/26 23:42:21 UTC

svn commit: r1517707 [7/17] - in /hive/branches/tez: ./ beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/apache/hadoop/hive/co...

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Aug 26 21:42:12 2013
@@ -27,10 +27,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -43,6 +51,9 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 
 class RecordReaderImpl implements RecordReader {
+
+  private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+
   private final FSDataInputStream file;
   private final long firstRow;
   private final List<StripeInformation> stripes =
@@ -50,17 +61,25 @@ class RecordReaderImpl implements Record
   private OrcProto.StripeFooter stripeFooter;
   private final long totalRowCount;
   private final CompressionCodec codec;
+  private final List<OrcProto.Type> types;
   private final int bufferSize;
   private final boolean[] included;
   private final long rowIndexStride;
   private long rowInStripe = 0;
-  private int currentStripe = 0;
+  private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
       new HashMap<StreamName, InStream>();
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
+  private final SearchArgument sarg;
+  // the leaf predicates for the sarg
+  private final List<PredicateLeaf> sargLeaves;
+  // an array the same length as the sargLeaves that map them to column ids
+  private final int[] filterColumns;
+  // an array about which row groups aren't skipped
+  private boolean[] includedRowGroups = null;
 
   RecordReaderImpl(Iterable<StripeInformation> stripes,
                    FileSystem fileSystem,
@@ -70,12 +89,27 @@ class RecordReaderImpl implements Record
                    CompressionCodec codec,
                    int bufferSize,
                    boolean[] included,
-                   long strideRate
+                   long strideRate,
+                   SearchArgument sarg,
+                   String[] columnNames
                   ) throws IOException {
     this.file = fileSystem.open(path);
     this.codec = codec;
+    this.types = types;
     this.bufferSize = bufferSize;
     this.included = included;
+    this.sarg = sarg;
+    if (sarg != null) {
+      sargLeaves = sarg.getLeaves();
+      filterColumns = new int[sargLeaves.size()];
+      for(int i=0; i < filterColumns.length; ++i) {
+        String colName = sargLeaves.get(i).getColumnName();
+        filterColumns[i] = findColumns(columnNames, colName);
+      }
+    } else {
+      sargLeaves = null;
+      filterColumns = null;
+    }
     long rows = 0;
     long skippedRows = 0;
     for(StripeInformation stripe: stripes) {
@@ -92,9 +126,17 @@ class RecordReaderImpl implements Record
     reader = createTreeReader(path, 0, types, included);
     indexes = new OrcProto.RowIndex[types.size()];
     rowIndexStride = strideRate;
-    if (this.stripes.size() > 0) {
-      readStripe();
+    advanceToNextRow(0L);
+  }
+
+  private static int findColumns(String[] columnNames,
+                                 String columnName) {
+    for(int i=0; i < columnNames.length; ++i) {
+      if (columnName.equals(columnNames[i])) {
+        return i;
+      }
     }
+    return -1;
   }
 
   private static final class PositionProviderImpl implements PositionProvider {
@@ -129,6 +171,21 @@ class RecordReaderImpl implements Record
       }
     }
 
+    IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+        InStream in,
+        boolean signed) throws IOException {
+      switch (kind) {
+      case DIRECT_V2:
+      case DICTIONARY_V2:
+        return new RunLengthIntegerReaderV2(in, signed);
+      case DIRECT:
+      case DICTIONARY:
+        return new RunLengthIntegerReader(in, signed);
+      default:
+        throw new IllegalArgumentException("Unknown encoding " + kind);
+      }
+    }
+
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encoding
                     ) throws IOException {
@@ -265,20 +322,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class ShortTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     ShortTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -309,20 +375,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class IntTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     IntTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -353,20 +428,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class LongTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     LongTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -491,13 +575,22 @@ class RecordReaderImpl implements Record
 
   private static class BinaryTreeReader extends TreeReader{
     private InStream stream;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths = null;
 
     BinaryTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
@@ -505,9 +598,8 @@ class RecordReaderImpl implements Record
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
       stream = streams.get(name);
-      lengths = new RunLengthIntegerReader(streams.get(new
-          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
-          false);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
+          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
     }
 
     @Override
@@ -554,22 +646,33 @@ class RecordReaderImpl implements Record
   }
 
   private static class TimestampTreeReader extends TreeReader{
-    private RunLengthIntegerReader data;
-    private RunLengthIntegerReader nanos;
+    private IntegerReader data = null;
+    private IntegerReader nanos = null;
 
     TimestampTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
-      data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA)), true);
-      nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.SECONDARY)), false);
+      data = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.DATA)), true);
+      nanos = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.SECONDARY)), false);
     }
 
     @Override
@@ -624,20 +727,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class DateTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     DateTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -669,20 +781,29 @@ class RecordReaderImpl implements Record
 
   private static class DecimalTreeReader extends TreeReader{
     private InStream valueStream;
-    private RunLengthIntegerReader scaleStream;
+    private IntegerReader scaleStream = null;
 
     DecimalTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
     ) throws IOException {
       super.startStripe(streams, encodings);
       valueStream = streams.get(new StreamName(columnId,
           OrcProto.Stream.Kind.DATA));
-      scaleStream = new RunLengthIntegerReader(streams.get(
+      scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
           new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
     }
 
@@ -713,18 +834,156 @@ class RecordReaderImpl implements Record
     }
   }
 
+  /**
+   * A tree reader that will read string columns. At the start of the
+   * stripe, it creates an internal reader based on whether a direct or
+   * dictionary encoding was used.
+   */
   private static class StringTreeReader extends TreeReader {
-    private DynamicByteArray dictionaryBuffer = null;
-    private int dictionarySize;
-    private int[] dictionaryOffsets;
-    private RunLengthIntegerReader reader;
+    private TreeReader reader;
 
     StringTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      reader.checkEncoding(encoding);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      // For each stripe, checks the encoding and initializes the appropriate
+      // reader
+      switch (encodings.get(columnId).getKind()) {
+        case DIRECT:
+        case DIRECT_V2:
+          reader = new StringDirectTreeReader(path, columnId);
+          break;
+        case DICTIONARY:
+        case DICTIONARY_V2:
+          reader = new StringDictionaryTreeReader(path, columnId);
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported encoding " +
+              encodings.get(columnId).getKind());
+      }
+      reader.startStripe(streams, encodings);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      reader.seek(index);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      return reader.next(previous);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skipRows(items);
+    }
+  }
+
+  /**
+   * A reader for string columns that are direct encoded in the current
+   * stripe.
+   */
+  private static class StringDirectTreeReader extends TreeReader {
+    private InStream stream;
+    private IntegerReader lengths;
+
+    StringDirectTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+          false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      stream.seek(index[columnId]);
+      lengths.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Text result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new Text();
+        } else {
+          result = (Text) previous;
+        }
+        int len = (int) lengths.next();
+        int offset = 0;
+        byte[] bytes = new byte[len];
+        while (len > 0) {
+          int written = stream.read(bytes, offset, len);
+          if (written < 0) {
+            throw new EOFException("Can't finish byte read from " + stream);
+          }
+          len -= written;
+          offset += written;
+        }
+        result.set(bytes);
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long lengthToSkip = 0;
+      for(int i=0; i < items; ++i) {
+        lengthToSkip += lengths.next();
+      }
+      stream.skip(lengthToSkip);
+    }
+  }
+
+  /**
+   * A reader for string columns that are dictionary encoded in the current
+   * stripe.
+   */
+  private static class StringDictionaryTreeReader extends TreeReader {
+    private DynamicByteArray dictionaryBuffer;
+    private int[] dictionaryOffsets;
+    private IntegerReader reader;
+
+    StringDictionaryTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY) {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
             columnId + " of " + path);
       }
@@ -737,7 +996,7 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
 
       // read the dictionary blob
-      dictionarySize = encodings.get(columnId).getDictionarySize();
+      int dictionarySize = encodings.get(columnId).getDictionarySize();
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
       InStream in = streams.get(name);
@@ -752,7 +1011,8 @@ class RecordReaderImpl implements Record
       // read the lengths
       name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
       in = streams.get(name);
-      RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+      IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
+          .getKind(), in, false);
       int offset = 0;
       if (dictionaryOffsets == null ||
           dictionaryOffsets.length < dictionarySize + 1) {
@@ -767,7 +1027,8 @@ class RecordReaderImpl implements Record
 
       // set up the row reader
       name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), false);
+      reader = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(name), false);
     }
 
     @Override
@@ -839,7 +1100,9 @@ class RecordReaderImpl implements Record
     void seek(PositionProvider[] index) throws IOException {
       super.seek(index);
       for(TreeReader kid: fields) {
-        kid.seek(index);
+        if (kid != null) {
+          kid.seek(index);
+        }
       }
     }
 
@@ -885,7 +1148,9 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       for(TreeReader field: fields) {
-        field.skipRows(items);
+        if (field != null) {
+          field.skipRows(items);
+        }
       }
     }
   }
@@ -965,7 +1230,7 @@ class RecordReaderImpl implements Record
 
   private static class ListTreeReader extends TreeReader {
     private final TreeReader elementReader;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths = null;
 
     ListTreeReader(Path path, int columnId,
                    List<OrcProto.Type> types,
@@ -1014,12 +1279,22 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
-      lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.LENGTH)), false);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.LENGTH)), false);
       if (elementReader != null) {
         elementReader.startStripe(streams, encodings);
       }
@@ -1039,7 +1314,7 @@ class RecordReaderImpl implements Record
   private static class MapTreeReader extends TreeReader {
     private final TreeReader keyReader;
     private final TreeReader valueReader;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths = null;
 
     MapTreeReader(Path path,
                   int columnId,
@@ -1092,12 +1367,22 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
-      lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.LENGTH)), false);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.LENGTH)), false);
       if (keyReader != null) {
         keyReader.startStripe(streams, encodings);
       }
@@ -1173,113 +1458,707 @@ class RecordReaderImpl implements Record
     ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
     file.seek(offset);
     file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
-      codec, bufferSize));
+    return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
+        new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
+        bufferSize));
   }
 
-  private void readStripe() throws IOException {
-    StripeInformation stripe = stripes.get(currentStripe);
-    stripeFooter = readStripeFooter(stripe);
-    long offset = stripe.getOffset();
-    streams.clear();
+  static enum Location {
+    BEFORE, MIN, MIDDLE, MAX, AFTER
+  }
 
-    // if we aren't projecting columns, just read the whole stripe
-    if (included == null) {
-      byte[] buffer =
-        new byte[(int) (stripe.getDataLength())];
-      file.seek(offset + stripe.getIndexLength());
-      file.readFully(buffer, 0, buffer.length);
-      int sectionOffset = 0;
-      for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
-        if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
-          int sectionLength = (int) section.getLength();
-          ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
-              sectionLength);
-          StreamName name = new StreamName(section.getColumn(),
-              section.getKind());
-          streams.put(name,
-              InStream.create(name.toString(), sectionBuffer, codec,
-                  bufferSize));
-          sectionOffset += sectionLength;
-        }
+  /**
+   * Given a point and min and max, determine if the point is before, at the
+   * min, in the middle, at the max, or after the range.
+   * @param point the point to test
+   * @param min the minimum point
+   * @param max the maximum point
+   * @param <T> the type of the comparision
+   * @return the location of the point
+   */
+  static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+    int minCompare = point.compareTo(min);
+    if (minCompare < 0) {
+      return Location.BEFORE;
+    } else if (minCompare == 0) {
+      return Location.MIN;
+    }
+    int maxCompare = point.compareTo(max);
+    if (maxCompare > 0) {
+      return Location.AFTER;
+    } else if (maxCompare == 0) {
+      return Location.MAX;
+    }
+    return Location.MIDDLE;
+  }
+
+  /**
+   * Get the minimum value out of an index entry.
+   * @param index the index entry
+   * @return the object for the minimum value or null if there isn't one
+   */
+  static Object getMin(OrcProto.ColumnStatistics index) {
+    if (index.hasIntStatistics()) {
+      OrcProto.IntegerStatistics stat = index.getIntStatistics();
+      if (stat.hasMinimum()) {
+        return stat.getMinimum();
       }
-    } else {
-      List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
-      // the index of the current section
-      int currentSection = 0;
-      while (currentSection < streamList.size() &&
-          StreamName.getArea(streamList.get(currentSection).getKind()) !=
-              StreamName.Area.DATA) {
-        currentSection += 1;
-      }
-      // byte position of the current section relative to the stripe start
-      long sectionOffset = stripe.getIndexLength();
-      while (currentSection < streamList.size()) {
-        int bytes = 0;
-
-        // find the first section that shouldn't be read
-        int excluded=currentSection;
-        while (excluded < streamList.size() &&
-               included[streamList.get(excluded).getColumn()]) {
-          bytes += streamList.get(excluded).getLength();
-          excluded += 1;
-        }
-
-        // actually read the bytes as a big chunk
-        if (bytes != 0) {
-          byte[] buffer = new byte[bytes];
-          file.seek(offset + sectionOffset);
-          file.readFully(buffer, 0, bytes);
-          sectionOffset += bytes;
-
-          // create the streams for the sections we just read
-          bytes = 0;
-          while (currentSection < excluded) {
-            OrcProto.Stream section = streamList.get(currentSection);
-            StreamName name =
-              new StreamName(section.getColumn(), section.getKind());
-            this.streams.put(name,
-                InStream.create(name.toString(),
-                    ByteBuffer.wrap(buffer, bytes,
-                        (int) section.getLength()), codec, bufferSize));
-            currentSection += 1;
-            bytes += section.getLength();
+    }
+    if (index.hasStringStatistics()) {
+      OrcProto.StringStatistics stat = index.getStringStatistics();
+      if (stat.hasMinimum()) {
+        return stat.getMinimum();
+      }
+    }
+    if (index.hasDoubleStatistics()) {
+      OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+      if (stat.hasMinimum()) {
+        return stat.getMinimum();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the maximum value out of an index entry.
+   * @param index the index entry
+   * @return the object for the maximum value or null if there isn't one
+   */
+  static Object getMax(OrcProto.ColumnStatistics index) {
+    if (index.hasIntStatistics()) {
+      OrcProto.IntegerStatistics stat = index.getIntStatistics();
+      if (stat.hasMaximum()) {
+        return stat.getMaximum();
+      }
+    }
+    if (index.hasStringStatistics()) {
+      OrcProto.StringStatistics stat = index.getStringStatistics();
+      if (stat.hasMaximum()) {
+        return stat.getMaximum();
+      }
+    }
+    if (index.hasDoubleStatistics()) {
+      OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+      if (stat.hasMaximum()) {
+        return stat.getMaximum();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Evaluate a predicate with respect to the statistics from the column
+   * that is referenced in the predicate.
+   * @param index the statistics for the column mentioned in the predicate
+   * @param predicate the leaf predicate we need to evaluation
+   * @return the set of truth values that may be returned for the given
+   *   predicate.
+   */
+  static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index,
+                               PredicateLeaf predicate) {
+    Object minValue = getMin(index);
+    // if we didn't have any values, everything must have been null
+    if (minValue == null) {
+      if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+        return TruthValue.YES;
+      } else {
+        return TruthValue.NULL;
+      }
+    }
+    Object maxValue = getMax(index);
+    Location loc;
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return TruthValue.NO;
+        } else {
+          return TruthValue.YES_NO;
+        }
+      case EQUALS:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (minValue.equals(maxValue) && loc == Location.MIN) {
+          return TruthValue.YES_NULL;
+        } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
+        }
+      case LESS_THAN:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (loc == Location.AFTER) {
+          return TruthValue.YES_NULL;
+        } else if (loc == Location.BEFORE || loc == Location.MIN) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
+        }
+      case LESS_THAN_EQUALS:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (loc == Location.AFTER || loc == Location.MAX) {
+          return TruthValue.YES_NULL;
+        } else if (loc == Location.BEFORE) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
+        }
+      case IN:
+        if (minValue.equals(maxValue)) {
+          // for a single value, look through to see if that value is in the
+          // set
+          for(Object arg: predicate.getLiteralList()) {
+            loc = compareToRange((Comparable) arg, minValue, maxValue);
+            if (loc == Location.MIN) {
+              return TruthValue.YES_NULL;
+            }
           }
+          return TruthValue.NO_NULL;
+        } else {
+          // are all of the values outside of the range?
+          for(Object arg: predicate.getLiteralList()) {
+            loc = compareToRange((Comparable) arg, minValue, maxValue);
+            if (loc == Location.MIN || loc == Location.MIDDLE ||
+                loc == Location.MAX) {
+              return TruthValue.YES_NO_NULL;
+            }
+          }
+          return TruthValue.NO_NULL;
         }
-
-        // skip forward until we get back to a section that we need
-        while (currentSection < streamList.size() &&
-               !included[streamList.get(currentSection).getColumn()]) {
-          sectionOffset += streamList.get(currentSection).getLength();
-          currentSection += 1;
+      case BETWEEN:
+        List<Object> args = predicate.getLiteralList();
+        loc = compareToRange((Comparable) args.get(0), minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.MIN) {
+          Location loc2 = compareToRange((Comparable) args.get(1), minValue,
+              maxValue);
+          if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+            return TruthValue.YES_NULL;
+          } else if (loc2 == Location.BEFORE) {
+            return TruthValue.NO_NULL;
+          } else {
+            return TruthValue.YES_NO_NULL;
+          }
+        } else if (loc == Location.AFTER) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
         }
+      case IS_NULL:
+        return TruthValue.YES_NO;
+      default:
+        return TruthValue.YES_NO_NULL;
+    }
+  }
+
+  /**
+   * Pick the row groups that we need to load from the current stripe.
+   * @return an array with a boolean for each row group or null if all of the
+   *    row groups must be read.
+   * @throws IOException
+   */
+  private boolean[] pickRowGroups() throws IOException {
+    // if we don't have a sarg or indexes, we read everything
+    if (sarg == null || rowIndexStride == 0) {
+      return null;
+    }
+    readRowIndex();
+    long rowsInStripe = stripes.get(currentStripe).getNumberOfRows();
+    int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
+        rowIndexStride);
+    boolean[] result = new boolean[groupsInStripe];
+    TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+    for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
+      for(int pred=0; pred < leafValues.length; ++pred) {
+        OrcProto.ColumnStatistics stats =
+            indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
+        leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Stats = " + stats);
+          LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
+              leafValues[pred]);
+        }
+      }
+      result[rowGroup] = sarg.evaluate(leafValues).isNotNeeded();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+            (rowIndexStride * (rowGroup+1) - 1) + " is " +
+            (result[rowGroup] ? "" : "not ") + "included.");
       }
     }
-    reader.startStripe(streams, stripeFooter.getColumnsList());
-    rowInStripe = 0;
+
+    // if we found something to skip, use the array. otherwise, return null.
+    for(boolean b: result) {
+      if (!b) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Read the current stripe into memory.
+   * @throws IOException
+   */
+  private void readStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    streams.clear();
+    // setup the position in the stripe
     rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
     rowBaseInStripe = 0;
     for(int i=0; i < currentStripe; ++i) {
       rowBaseInStripe += stripes.get(i).getNumberOfRows();
     }
+    // reset all of the indexes
     for(int i=0; i < indexes.length; ++i) {
       indexes[i] = null;
     }
+    includedRowGroups = pickRowGroups();
+
+    // move forward to the first unskipped row
+    if (includedRowGroups != null) {
+      while (rowInStripe < rowCountInStripe &&
+             !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+        rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+      }
+    }
+
+    // if we haven't skipped the whole stripe, read the data
+    if (rowInStripe < rowCountInStripe) {
+      // if we aren't projecting columns or filtering rows, just read it all
+      if (included == null && includedRowGroups == null) {
+        readAllDataStreams(stripe);
+      } else {
+        readPartialDataStreams(stripe);
+      }
+      reader.startStripe(streams, stripeFooter.getColumnsList());
+      // if we skipped the first row group, move the pointers forward
+      if (rowInStripe != 0) {
+        seekToRowEntry((int) (rowInStripe / rowIndexStride));
+      }
+    }
+  }
+
+  private void readAllDataStreams(StripeInformation stripe
+                                  ) throws IOException {
+    byte[] buffer =
+      new byte[(int) (stripe.getDataLength())];
+    file.seek(stripe.getOffset() + stripe.getIndexLength());
+    file.readFully(buffer, 0, buffer.length);
+    int sectionOffset = 0;
+    for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+      if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+        int sectionLength = (int) section.getLength();
+        ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+            sectionLength);
+        StreamName name = new StreamName(section.getColumn(),
+            section.getKind());
+        streams.put(name,
+            InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer},
+                new long[]{0}, sectionLength, codec, bufferSize));
+        sectionOffset += sectionLength;
+      }
+    }
+  }
+
+  /**
+   * The secionts of stripe that we need to read.
+   */
+  static class DiskRange {
+    /** the first address we need to read. */
+    long offset;
+    /** the first address afterwards. */
+    long end;
+
+    DiskRange(long offset, long end) {
+      this.offset = offset;
+      this.end = end;
+      if (end < offset) {
+        throw new IllegalArgumentException("invalid range " + this);
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || other.getClass() != getClass()) {
+        return false;
+      }
+      DiskRange otherR = (DiskRange) other;
+      return otherR.offset == offset && otherR.end == end;
+    }
+
+    @Override
+    public String toString() {
+      return "range start: " + offset + " end: " + end;
+    }
+  }
+
+  private static final int BYTE_STREAM_POSITIONS = 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS =
+      BYTE_STREAM_POSITIONS + 1;
+  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS =
+    BYTE_STREAM_POSITIONS + 1;
+
+  /**
+   * Get the offset in the index positions for the column that the given
+   * stream starts.
+   * @param encoding the encoding of the column
+   * @param type the type of the column
+   * @param stream the kind of the stream
+   * @param isCompressed is the file compressed
+   * @param hasNulls does the column have a PRESENT stream?
+   * @return the number of positions that will be used for that stream
+   */
+  static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding,
+                              OrcProto.Type.Kind type,
+                              OrcProto.Stream.Kind stream,
+                              boolean isCompressed,
+                              boolean hasNulls) {
+    if (stream == OrcProto.Stream.Kind.PRESENT) {
+      return 0;
+    }
+    int compressionValue = isCompressed ? 1 : 0;
+    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+    switch (type) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case STRUCT:
+      case MAP:
+      case LIST:
+      case UNION:
+        return base;
+      case STRING:
+        if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+          return base;
+        } else {
+          if (stream == OrcProto.Stream.Kind.DATA) {
+            return base;
+          } else {
+            return base + BYTE_STREAM_POSITIONS + compressionValue;
+          }
+        }
+      case BINARY:
+        if (stream == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case DECIMAL:
+        if (stream == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case TIMESTAMP:
+        if (stream == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+      default:
+        throw new IllegalArgumentException("Unknown type " + type);
+    }
+  }
+
+  // for uncompressed streams, what is the most overlap with the following set
+  // of rows (long vint literal group).
+  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+  /**
+   * Is this stream part of a dictionary?
+   * @return is this part of a dictionary?
+   */
+  static boolean isDictionary(OrcProto.Stream.Kind kind,
+                              OrcProto.ColumnEncoding encoding) {
+    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+      (kind == OrcProto.Stream.Kind.LENGTH &&
+       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
+  /**
+   * Plan the ranges of the file that we need to read given the list of
+   * columns and row groups.
+   * @param streamList the list of streams avaiable
+   * @param indexes the indexes that have been loaded
+   * @param includedColumns which columns are needed
+   * @param includedRowGroups which row groups are needed
+   * @param isCompressed does the file have generic compression
+   * @param encodings the encodings for each column
+   * @param types the types of the columns
+   * @param compressionSize the compression block size
+   * @return the list of disk ranges that will be loaded
+   */
+  static List<DiskRange> planReadPartialDataStreams
+      (List<OrcProto.Stream> streamList,
+       OrcProto.RowIndex[] indexes,
+       boolean[] includedColumns,
+       boolean[] includedRowGroups,
+       boolean isCompressed,
+       List<OrcProto.ColumnEncoding> encodings,
+       List<OrcProto.Type> types,
+       int compressionSize) {
+    List<DiskRange> result = new ArrayList<DiskRange>();
+    long offset = 0;
+    // figure out which columns have a present stream
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    for(OrcProto.Stream stream: streamList) {
+      long length = stream.getLength();
+      int column = stream.getColumn();
+      OrcProto.Stream.Kind streamKind = stream.getKind();
+      if (StreamName.getArea(streamKind) == StreamName.Area.DATA &&
+          includedColumns[column]) {
+        // if we aren't filtering or it is a dictionary, load it.
+        if (includedRowGroups == null ||
+            isDictionary(streamKind, encodings.get(column))) {
+          result.add(new DiskRange(offset, offset + length));
+        } else {
+          for(int group=0; group < includedRowGroups.length; ++group) {
+            if (includedRowGroups[group]) {
+              int posn = getIndexPosition(encodings.get(column).getKind(),
+                  types.get(column).getKind(), stream.getKind(), isCompressed,
+                  hasNull[column]);
+              long start = indexes[column].getEntry(group).getPositions(posn);
+              // figure out the worst case last location
+              long end = (group == includedRowGroups.length - 1) ?
+                  length : Math.min(length,
+                                    indexes[column].getEntry(group + 1)
+                                        .getPositions(posn)
+                                        + (isCompressed ?
+                                            (OutStream.HEADER_SIZE
+                                              + compressionSize) :
+                                            WORST_UNCOMPRESSED_SLOP));
+              result.add(new DiskRange(offset + start, offset + end));
+            }
+          }
+        }
+      }
+      offset += length;
+    }
+    return result;
+  }
+
+  /**
+   * Update the disk ranges to collapse adjacent or overlapping ranges. It
+   * assumes that the ranges are sorted.
+   * @param ranges the list of disk ranges to merge
+   */
+  static void mergeDiskRanges(List<DiskRange> ranges) {
+    DiskRange prev = null;
+    for(int i=0; i < ranges.size(); ++i) {
+      DiskRange current = ranges.get(i);
+      if (prev != null && overlap(prev.offset, prev.end,
+          current.offset, current.end)) {
+        prev.offset = Math.min(prev.offset, current.offset);
+        prev.end = Math.max(prev.end, current.end);
+        ranges.remove(i);
+        i -= 1;
+      } else {
+        prev = current;
+      }
+    }
+  }
+
+  /**
+   * Read the list of ranges from the file.
+   * @param file the file to read
+   * @param base the base of the stripe
+   * @param ranges the disk ranges within the stripe to read
+   * @return the bytes read for each disk range, which is the same length as
+   *    ranges
+   * @throws IOException
+   */
+  static byte[][] readDiskRanges(FSDataInputStream file,
+                                 long base,
+                                 List<DiskRange> ranges) throws IOException {
+    byte[][] result = new byte[ranges.size()][];
+    int i = 0;
+    for(DiskRange range: ranges) {
+      int len = (int) (range.end - range.offset);
+      result[i] = new byte[len];
+      file.seek(base + range.offset);
+      file.readFully(result[i]);
+      i += 1;
+    }
+    return result;
+  }
+
+  /**
+   * Does region A overlap region B? The end points are inclusive on both sides.
+   * @param leftA A's left point
+   * @param rightA A's right point
+   * @param leftB B's left point
+   * @param rightB B's right point
+   * @return Does region A overlap region B?
+   */
+  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
+  }
+
+  /**
+   * Build a string representation of a list of disk ranges.
+   * @param ranges ranges to stringify
+   * @return the resulting string
+   */
+  static String stringifyDiskRanges(List<DiskRange> ranges) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("[");
+    for(int i=0; i < ranges.size(); ++i) {
+      if (i != 0) {
+        buffer.append(", ");
+      }
+      buffer.append(ranges.get(i).toString());
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  static void createStreams(List<OrcProto.Stream> streamDescriptions,
+                            List<DiskRange> ranges,
+                            byte[][] bytes,
+                            boolean[] includeColumn,
+                            CompressionCodec codec,
+                            int bufferSize,
+                            Map<StreamName, InStream> streams
+                           ) throws IOException {
+    long offset = 0;
+    for(OrcProto.Stream streamDesc: streamDescriptions) {
+      int column = streamDesc.getColumn();
+      if (includeColumn[column] &&
+          StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
+        long length = streamDesc.getLength();
+        int first = -1;
+        int last = -2;
+        for(int i=0; i < bytes.length; ++i) {
+          DiskRange range = ranges.get(i);
+          if (overlap(offset, offset+length, range.offset, range.end)) {
+            if (first == -1) {
+              first = i;
+            }
+            last = i;
+          }
+        }
+        ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
+        long[] offsets = new long[last - first + 1];
+        for(int i=0; i < buffers.length; ++i) {
+          DiskRange range = ranges.get(i + first);
+          long start = Math.max(range.offset, offset);
+          long end = Math.min(range.end, offset+length);
+          buffers[i] = ByteBuffer.wrap(bytes[first + i],
+              Math.max(0, (int) (offset - range.offset)), (int) (end - start));
+          offsets[i] = Math.max(0, range.offset - offset);
+        }
+        StreamName name = new StreamName(column, streamDesc.getKind());
+        streams.put(name, InStream.create(name.toString(), buffers, offsets,
+            length, codec, bufferSize));
+      }
+      offset += streamDesc.getLength();
+    }
+  }
+
+  private void readPartialDataStreams(StripeInformation stripe
+                                      ) throws IOException {
+    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    List<DiskRange> chunks =
+        planReadPartialDataStreams(streamList,
+            indexes, included, includedRowGroups, codec != null,
+            stripeFooter.getColumnsList(), types, bufferSize);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("chunks = " + stringifyDiskRanges(chunks));
+    }
+    mergeDiskRanges(chunks);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("merge = " + stringifyDiskRanges(chunks));
+    }
+    byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks);
+    createStreams(streamList, chunks, bytes, included, codec, bufferSize,
+        streams);
   }
 
   @Override
   public boolean hasNext() throws IOException {
-    return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+    return rowInStripe < rowCountInStripe;
   }
 
-  @Override
-  public Object next(Object previous) throws IOException {
-    if (rowInStripe >= rowCountInStripe) {
+  /**
+   * Read the next stripe until we find a row that we don't skip.
+   * @throws IOException
+   */
+  private void advanceStripe() throws IOException {
+    rowInStripe = rowCountInStripe;
+    while (rowInStripe >= rowCountInStripe &&
+        currentStripe < stripes.size() - 1) {
       currentStripe += 1;
       readStripe();
     }
+  }
+
+  /**
+   * Skip over rows that we aren't selecting, so that the next row is
+   * one that we will read.
+   * @param nextRow the row we want to go to
+   * @throws IOException
+   */
+  private void advanceToNextRow(long nextRow) throws IOException {
+    long nextRowInStripe = nextRow - rowBaseInStripe;
+    // check for row skipping
+    if (rowIndexStride != 0 &&
+        includedRowGroups != null &&
+        nextRowInStripe < rowCountInStripe) {
+      int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+      if (!includedRowGroups[rowGroup]) {
+        while (rowGroup < includedRowGroups.length &&
+               !includedRowGroups[rowGroup]) {
+          rowGroup += 1;
+        }
+        // if we are off the end of the stripe, just move stripes
+        if (rowGroup >= includedRowGroups.length) {
+          advanceStripe();
+          return;
+        }
+        nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+      }
+    }
+    if (nextRowInStripe < rowCountInStripe) {
+      if (nextRowInStripe != rowInStripe) {
+        if (rowIndexStride != 0) {
+          int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+          seekToRowEntry(rowGroup);
+          reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+        } else {
+          reader.skipRows(nextRowInStripe - rowInStripe);
+        }
+        rowInStripe = nextRowInStripe;
+      }
+    } else {
+      advanceStripe();
+    }
+  }
+
+  @Override
+  public Object next(Object previous) throws IOException {
+    Object result = reader.next(previous);
+    // find the next row
     rowInStripe += 1;
-    return reader.next(previous);
+    advanceToNextRow(rowInStripe + rowBaseInStripe);
+    return result;
   }
 
   @Override
@@ -1303,14 +2182,6 @@ class RecordReaderImpl implements Record
   }
 
   private int findStripe(long rowNumber) {
-    if (rowNumber < 0) {
-      throw new IllegalArgumentException("Seek to a negative row number " +
-          rowNumber);
-    } else if (rowNumber < firstRow) {
-      throw new IllegalArgumentException("Seek before reader range " +
-          rowNumber);
-    }
-    rowNumber -= firstRow;
     for(int i=0; i < stripes.size(); i++) {
       StripeInformation stripe = stripes.get(i);
       if (stripe.getNumberOfRows() > rowNumber) {
@@ -1331,7 +2202,8 @@ class RecordReaderImpl implements Record
           file.seek(offset);
           file.readFully(buffer);
           indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-              ByteBuffer.wrap(buffer), codec, bufferSize));
+              new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0},
+              stream.getLength(), codec, bufferSize));
         }
       }
       offset += stream.getLength();
@@ -1351,19 +2223,25 @@ class RecordReaderImpl implements Record
 
   @Override
   public void seekToRow(long rowNumber) throws IOException {
+    if (rowNumber < 0) {
+      throw new IllegalArgumentException("Seek to a negative row number " +
+                                         rowNumber);
+    } else if (rowNumber < firstRow) {
+      throw new IllegalArgumentException("Seek before reader range " +
+                                         rowNumber);
+    }
+    // convert to our internal form (rows from the beginning of slice)
+    rowNumber -= firstRow;
+
+    // move to the right stripe
     int rightStripe = findStripe(rowNumber);
     if (rightStripe != currentStripe) {
       currentStripe = rightStripe;
       readStripe();
     }
     readRowIndex();
-    rowInStripe = rowNumber - rowBaseInStripe;
-    if (rowIndexStride != 0) {
-      long entry = rowInStripe / rowIndexStride;
-      seekToRowEntry((int) entry);
-      reader.skipRows(rowInStripe - entry * rowIndexStride);
-    } else {
-      reader.skipRows(rowInStripe);
-    }
+
+    // if we aren't to the right row yet, advanance in the stripe.
+    advanceToNextRow(rowNumber);
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Mon Aug 26 21:42:12 2013
@@ -57,7 +57,7 @@ class RunLengthByteReader {
       while (bytes < numLiterals) {
         int result = input.read(literals, bytes, numLiterals - bytes);
         if (result == -1) {
-          throw new EOFException("Reading RLE byte literal got EOF");
+          throw new EOFException("Reading RLE byte literal got EOF in " + this);
         }
         bytes += result;
       }
@@ -108,4 +108,10 @@ class RunLengthByteReader {
       items -= consume;
     }
   }
+
+  @Override
+  public String toString() {
+    return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+        used + "/" + numLiterals + " from " + input;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Mon Aug 26 21:42:12 2013
@@ -23,7 +23,7 @@ import java.io.IOException;
 /**
  * A reader that reads a sequence of integers.
  * */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
   private final InStream input;
   private final boolean signed;
   private final long[] literals =
@@ -71,11 +71,13 @@ class RunLengthIntegerReader {
     }
   }
 
-  boolean hasNext() throws IOException {
+  @Override
+  public boolean hasNext() throws IOException {
     return used != numLiterals || input.available() > 0;
   }
 
-  long next() throws IOException {
+  @Override
+  public long next() throws IOException {
     long result;
     if (used == numLiterals) {
       readValues();
@@ -88,7 +90,8 @@ class RunLengthIntegerReader {
     return result;
   }
 
-  void seek(PositionProvider index) throws IOException {
+  @Override
+  public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();
     if (consumed != 0) {
@@ -104,7 +107,8 @@ class RunLengthIntegerReader {
     }
   }
 
-  void skip(long numValues) throws IOException {
+  @Override
+  public void skip(long numValues) throws IOException {
     while (numValues > 0) {
       if (used == numLiterals) {
         readValues();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java Mon Aug 26 21:42:12 2013
@@ -25,7 +25,7 @@ import java.io.IOException;
  * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
  * literal vint values follow.
  */
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
   static final int MIN_REPEAT_SIZE = 3;
   static final int MAX_DELTA = 127;
   static final int MIN_DELTA = -128;
@@ -71,12 +71,14 @@ class RunLengthIntegerWriter {
     }
   }
 
-  void flush() throws IOException {
+  @Override
+  public void flush() throws IOException {
     writeValues();
     output.flush();
   }
 
-  void write(long value) throws IOException {
+  @Override
+  public void write(long value) throws IOException {
     if (numLiterals == 0) {
       literals[numLiterals++] = value;
       tailRunLength = 1;
@@ -130,8 +132,10 @@ class RunLengthIntegerWriter {
     }
   }
 
-  void getPosition(PositionRecorder recorder) throws IOException {
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
     output.getPosition(recorder);
     recorder.addPosition(numLiterals);
   }
+
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Mon Aug 26 21:42:12 2013
@@ -185,4 +185,279 @@ final class SerializationUtils {
     result = result.shiftRight(1);
     return result;
   }
+
+  enum FixedBitSizes {
+    ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+    THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+    TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+    TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+  }
+
+  /**
+   * Count the number of bits required to encode the given value
+   * @param value
+   * @return bits required to store value
+   */
+  static int findClosestNumBits(long value) {
+    int count = 0;
+    while (value > 0) {
+      count++;
+      value = value >>> 1;
+    }
+    return getClosestFixedBits(count);
+  }
+
+  /**
+   * zigzag encode the given value
+   * @param val
+   * @return zigzag encoded value
+   */
+  static long zigzagEncode(long val) {
+    return (val << 1) ^ (val >> 63);
+  }
+
+  /**
+   * zigzag decode the given value
+   * @param val
+   * @return zizag decoded value
+   */
+  static long zigzagDecode(long val) {
+    return (val >>> 1) ^ -(val & 1);
+  }
+
+  /**
+   * Compute the bits required to represent pth percentile value
+   * @param data - array
+   * @param p - percentile value (>=0.0 to <=1.0)
+   * @return pth percentile bits
+   */
+  static int percentileBits(long[] data, double p) {
+    if ((p > 1.0) || (p <= 0.0)) {
+      return -1;
+    }
+
+    // histogram that store the encoded bit requirement for each values.
+    // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+    int[] hist = new int[32];
+
+    // compute the histogram
+    for(long l : data) {
+      int idx = encodeBitWidth(findClosestNumBits(l));
+      hist[idx] += 1;
+    }
+
+    int len = data.length;
+    int perLen = (int) (len * (1.0 - p));
+
+    // return the bits required by pth percentile length
+    for(int i = hist.length - 1; i >= 0; i--) {
+      perLen -= hist[i];
+      if (perLen < 0) {
+        return decodeBitWidth(i);
+      }
+    }
+
+    return 0;
+  }
+
+  /**
+   * Read n bytes in big endian order and convert to long
+   * @param b - byte array
+   * @return long value
+   */
+  static long bytesToLongBE(InStream input, int n) throws IOException {
+    long out = 0;
+    long val = 0;
+    while (n > 0) {
+      n--;
+      // store it in a long and then shift else integer overflow will occur
+      val = input.read();
+      out |= (val << (n * 8));
+    }
+    return out;
+  }
+
+  /**
+   * Calculate the number of bytes required
+   * @param n - number of values
+   * @param numBits - bit width
+   * @return number of bytes required
+   */
+  static int getTotalBytesRequired(int n, int numBits) {
+    return (n * numBits + 7) / 8;
+  }
+
+  /**
+   * For a given fixed bit this function will return the closest available fixed
+   * bit
+   * @param n
+   * @return closest valid fixed bit
+   */
+  static int getClosestFixedBits(int n) {
+    if (n == 0) {
+      return 1;
+    }
+
+    if (n >= 1 && n <= 24) {
+      return n;
+    } else if (n > 24 && n <= 26) {
+      return 26;
+    } else if (n > 26 && n <= 28) {
+      return 28;
+    } else if (n > 28 && n <= 30) {
+      return 30;
+    } else if (n > 30 && n <= 32) {
+      return 32;
+    } else if (n > 32 && n <= 40) {
+      return 40;
+    } else if (n > 40 && n <= 48) {
+      return 48;
+    } else if (n > 48 && n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  /**
+   * Finds the closest available fixed bit width match and returns its encoded
+   * value (ordinal)
+   * @param n - fixed bit width to encode
+   * @return encoded fixed bit width
+   */
+  static int encodeBitWidth(int n) {
+    n = getClosestFixedBits(n);
+
+    if (n >= 1 && n <= 24) {
+      return n - 1;
+    } else if (n > 24 && n <= 26) {
+      return FixedBitSizes.TWENTYSIX.ordinal();
+    } else if (n > 26 && n <= 28) {
+      return FixedBitSizes.TWENTYEIGHT.ordinal();
+    } else if (n > 28 && n <= 30) {
+      return FixedBitSizes.THIRTY.ordinal();
+    } else if (n > 30 && n <= 32) {
+      return FixedBitSizes.THIRTYTWO.ordinal();
+    } else if (n > 32 && n <= 40) {
+      return FixedBitSizes.FORTY.ordinal();
+    } else if (n > 40 && n <= 48) {
+      return FixedBitSizes.FORTYEIGHT.ordinal();
+    } else if (n > 48 && n <= 56) {
+      return FixedBitSizes.FIFTYSIX.ordinal();
+    } else {
+      return FixedBitSizes.SIXTYFOUR.ordinal();
+    }
+  }
+
+  /**
+   * Decodes the ordinal fixed bit value to actual fixed bit width value
+   * @param n - encoded fixed bit width
+   * @return decoded fixed bit width
+   */
+  static int decodeBitWidth(int n) {
+    if (n >= FixedBitSizes.ONE.ordinal()
+        && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+      return n + 1;
+    } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+      return 26;
+    } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+      return 28;
+    } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+      return 30;
+    } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+      return 32;
+    } else if (n == FixedBitSizes.FORTY.ordinal()) {
+      return 40;
+    } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+      return 48;
+    } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  /**
+   * Bitpack and write the input values to underlying output stream
+   * @param input - values to write
+   * @param offset - offset
+   * @param len - length
+   * @param bitSize - bit width
+   * @param output - output stream
+   * @throws IOException
+   */
+  static void writeInts(long[] input, int offset, int len, int bitSize,
+                        OutputStream output) throws IOException {
+    if (input == null || input.length < 1 || offset < 0 || len < 1
+        || bitSize < 1) {
+      return;
+    }
+
+    int bitsLeft = 8;
+    byte current = 0;
+    for(int i = offset; i < (offset + len); i++) {
+      long value = input[i];
+      int bitsToWrite = bitSize;
+      while (bitsToWrite > bitsLeft) {
+        // add the bits to the bottom of the current word
+        current |= value >>> (bitsToWrite - bitsLeft);
+        // subtract out the bits we just added
+        bitsToWrite -= bitsLeft;
+        // zero out the bits above bitsToWrite
+        value &= (1L << bitsToWrite) - 1;
+        output.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+      bitsLeft -= bitsToWrite;
+      current |= value << bitsLeft;
+      if (bitsLeft == 0) {
+        output.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+    }
+
+    // flush
+    if (bitsLeft != 8) {
+      output.write(current);
+      current = 0;
+      bitsLeft = 8;
+    }
+  }
+
+  /**
+   * Read bitpacked integers from input stream
+   * @param buffer - input buffer
+   * @param offset - offset
+   * @param len - length
+   * @param bitSize - bit width
+   * @param input - input stream
+   * @throws IOException
+   */
+  static void readInts(long[] buffer, int offset, int len, int bitSize,
+                       InStream input) throws IOException {
+    int bitsLeft = 0;
+    int current = 0;
+
+    for(int i = offset; i < (offset + len); i++) {
+      long result = 0;
+      int bitsLeftToRead = bitSize;
+      while (bitsLeftToRead > bitsLeft) {
+        result <<= bitsLeft;
+        result |= current & ((1 << bitsLeft) - 1);
+        bitsLeftToRead -= bitsLeft;
+        current = input.read();
+        bitsLeft = 8;
+      }
+
+      // handle the left over bits
+      if (bitsLeftToRead > 0) {
+        result <<= bitsLeftToRead;
+        bitsLeft -= bitsLeftToRead;
+        result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+      }
+      buffer[i] = result;
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java Mon Aug 26 21:42:12 2013
@@ -17,11 +17,11 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import org.apache.hadoop.io.Text;
-
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.io.Text;
+
 /**
  * A red-black tree that stores strings. The strings are stored as UTF-8 bytes
  * and an offset for each entry.
@@ -147,7 +147,7 @@ class StringRedBlackTree extends RedBlac
 
   /**
    * Visit all of the nodes in the tree in sorted order.
-   * @param visitor the action to be applied to each ndoe
+   * @param visitor the action to be applied to each node
    * @throws IOException
    */
   public void visit(Visitor visitor) throws IOException {
@@ -163,6 +163,17 @@ class StringRedBlackTree extends RedBlac
     keyOffsets.clear();
   }
 
+  public void getText(Text result, int originalPosition) {
+    int offset = keyOffsets.get(originalPosition);
+    int length;
+    if (originalPosition + 1 == keyOffsets.size()) {
+      length = byteArray.size() - offset;
+    } else {
+      length = keyOffsets.get(originalPosition + 1) - offset;
+    }
+    byteArray.setText(result, offset, length);
+  }
+
   /**
    * Get the size of the character data in the table.
    * @return the bytes used by the table