You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/08/12 17:03:31 UTC

svn commit: r1513155 [1/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/ ql/...

Author: omalley
Date: Mon Aug 12 15:03:30 2013
New Revision: 1513155

URL: http://svn.apache.org/r1513155
Log:
HIVE-4123 Improved ORC integer RLE version 2. (Prasanth Jayachandran via 
omalley)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
    hive/trunk/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
    hive/trunk/ql/src/test/resources/orc-file-dump.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 12 15:03:30 2013
@@ -502,6 +502,9 @@ public class HiveConf extends Configurat
 
     // Maximum fraction of heap that can be used by ORC file writers
     HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50%
+    // use 0.11 version of RLE encoding. if this conf is not defined or any
+    // other value specified, ORC will use the new RLE encoding
+    HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", "0.11"),
 
     HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
 

Modified: hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java (original)
+++ hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java Mon Aug 12 15:03:30 2013
@@ -5695,10 +5695,14 @@ public final class OrcProto {
         implements com.google.protobuf.ProtocolMessageEnum {
       DIRECT(0, 0),
       DICTIONARY(1, 1),
+      DIRECT_V2(2, 2),
+      DICTIONARY_V2(3, 3),
       ;
       
       public static final int DIRECT_VALUE = 0;
       public static final int DICTIONARY_VALUE = 1;
+      public static final int DIRECT_V2_VALUE = 2;
+      public static final int DICTIONARY_V2_VALUE = 3;
       
       
       public final int getNumber() { return value; }
@@ -5707,6 +5711,8 @@ public final class OrcProto {
         switch (value) {
           case 0: return DIRECT;
           case 1: return DICTIONARY;
+          case 2: return DIRECT_V2;
+          case 3: return DICTIONARY_V2;
           default: return null;
         }
       }
@@ -5737,7 +5743,7 @@ public final class OrcProto {
       }
       
       private static final Kind[] VALUES = {
-        DIRECT, DICTIONARY, 
+        DIRECT, DICTIONARY, DIRECT_V2, DICTIONARY_V2, 
       };
       
       public static Kind valueOf(
@@ -11117,42 +11123,42 @@ public final class OrcProto {
       "eam.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004",
       "\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGT" +
       "H\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_C" +
-      "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n" +
+      "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n" +
       "\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.apac" +
       "he.hadoop.hive.ql.io.orc.ColumnEncoding." +
-      "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006" +
-      "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFoote" +
-      "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" +
-      "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." +
-      "org.apache.hadoop.hive.ql.io.orc.ColumnE",
-      "ncoding\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" +
-      "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s" +
-      "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001" +
-      "\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002" +
-      "\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE" +
-      "\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020" +
-      "\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNIO" +
-      "N\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInfo" +
-      "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" +
-      " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength",
-      "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" +
-      "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002" +
-      "\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconten" +
-      "tLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apa" +
-      "che.hadoop.hive.ql.io.orc.StripeInformat" +
-      "ion\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.h" +
-      "ive.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.o" +
-      "rg.apache.hadoop.hive.ql.io.orc.UserMeta" +
-      "dataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatis" +
-      "tics\030\007 \003(\01322.org.apache.hadoop.hive.ql.i",
-      "o.orc.ColumnStatistics\022\026\n\016rowIndexStride" +
-      "\030\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 " +
-      "\001(\004\022F\n\013compression\030\002 \001(\01621.org.apache.ha" +
-      "doop.hive.ql.io.orc.CompressionKind\022\034\n\024c" +
-      "ompressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003" +
-      "(\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKin" +
-      "d\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO" +
-      "\020\003"
+      "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006" +
+      "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022" +
+      "\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n\007s" +
+      "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" +
+      ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap",
+      "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
+      "g\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" +
+      "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" +
+      "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001\n\004Kind" +
+      "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" +
+      "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" +
+      "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" +
+      "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" +
+      "\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInformatio" +
+      "n\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022",
+      "\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004" +
+      "\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataIt" +
+      "em\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Foot" +
+      "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" +
+      "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" +
+      "doop.hive.ql.io.orc.StripeInformation\0225\n" +
+      "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql" +
+      ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" +
+      "che.hadoop.hive.ql.io.orc.UserMetadataIt" +
+      "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007",
+      " \003(\01322.org.apache.hadoop.hive.ql.io.orc." +
+      "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" +
+      "\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" +
+      "\013compression\030\002 \001(\01621.org.apache.hadoop.h" +
+      "ive.ql.io.orc.CompressionKind\022\034\n\024compres" +
+      "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" +
+      "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" +
+      "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading integers.
+ */
+interface IntegerReader {
+
+  /**
+   * Seek to the position provided by index.
+   * @param index
+   * @throws IOException
+   */
+  void seek(PositionProvider index) throws IOException;
+
+  /**
+   * Skip number of specified rows.
+   * @param numValues
+   * @throws IOException
+   */
+  void skip(long numValues) throws IOException;
+
+  /**
+   * Check if there are any more values left.
+   * @return
+   * @throws IOException
+   */
+  boolean hasNext() throws IOException;
+
+  /**
+   * Return the next available value.
+   * @return
+   * @throws IOException
+   */
+  long next() throws IOException;
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+interface IntegerWriter {
+
+  /**
+   * Get position from the stream.
+   * @param recorder
+   * @throws IOException
+   */
+  void getPosition(PositionRecorder recorder) throws IOException;
+
+  /**
+   * Write the integer value
+   * @param value
+   * @throws IOException
+   */
+  void write(long value) throws IOException;
+
+  /**
+   * Flush the buffer
+   * @throws IOException
+   */
+  void flush() throws IOException;
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Aug 12 15:03:30 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -130,6 +131,21 @@ class RecordReaderImpl implements Record
       }
     }
 
+    IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+        InStream in,
+        boolean signed) throws IOException {
+      switch (kind) {
+      case DIRECT_V2:
+      case DICTIONARY_V2:
+        return new RunLengthIntegerReaderV2(in, signed);
+      case DIRECT:
+      case DICTIONARY:
+        return new RunLengthIntegerReader(in, signed);
+      default:
+        throw new IllegalArgumentException("Unknown encoding " + kind);
+      }
+    }
+
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encoding
                     ) throws IOException {
@@ -266,20 +282,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class ShortTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     ShortTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -310,20 +335,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class IntTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     IntTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -354,20 +388,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class LongTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     LongTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -492,13 +535,22 @@ class RecordReaderImpl implements Record
 
   private static class BinaryTreeReader extends TreeReader{
     private InStream stream;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths = null;
 
     BinaryTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
@@ -506,9 +558,8 @@ class RecordReaderImpl implements Record
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
       stream = streams.get(name);
-      lengths = new RunLengthIntegerReader(streams.get(new
-          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
-          false);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
+          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
     }
 
     @Override
@@ -555,22 +606,33 @@ class RecordReaderImpl implements Record
   }
 
   private static class TimestampTreeReader extends TreeReader{
-    private RunLengthIntegerReader data;
-    private RunLengthIntegerReader nanos;
+    private IntegerReader data = null;
+    private IntegerReader nanos = null;
 
     TimestampTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
-      data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.DATA)), true);
-      nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.SECONDARY)), false);
+      data = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.DATA)), true);
+      nanos = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.SECONDARY)), false);
     }
 
     @Override
@@ -625,20 +687,29 @@ class RecordReaderImpl implements Record
   }
 
   private static class DateTreeReader extends TreeReader{
-    private RunLengthIntegerReader reader = null;
+    private IntegerReader reader = null;
 
     DateTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
     }
 
     @Override
@@ -670,20 +741,29 @@ class RecordReaderImpl implements Record
 
   private static class DecimalTreeReader extends TreeReader{
     private InStream valueStream;
-    private RunLengthIntegerReader scaleStream;
+    private IntegerReader scaleStream = null;
 
     DecimalTreeReader(Path path, int columnId) {
       super(path, columnId);
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
     ) throws IOException {
       super.startStripe(streams, encodings);
       valueStream = streams.get(new StreamName(columnId,
           OrcProto.Stream.Kind.DATA));
-      scaleStream = new RunLengthIntegerReader(streams.get(
+      scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
           new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
     }
 
@@ -726,12 +806,9 @@ class RecordReaderImpl implements Record
       super(path, columnId);
     }
 
+    @Override
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
-          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
-        throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
-      }
+      reader.checkEncoding(encoding);
     }
 
     @Override
@@ -742,9 +819,11 @@ class RecordReaderImpl implements Record
       // reader
       switch (encodings.get(columnId).getKind()) {
         case DIRECT:
+        case DIRECT_V2:
           reader = new StringDirectTreeReader(path, columnId);
           break;
         case DICTIONARY:
+        case DICTIONARY_V2:
           reader = new StringDictionaryTreeReader(path, columnId);
           break;
         default:
@@ -776,7 +855,7 @@ class RecordReaderImpl implements Record
    */
   private static class StringDirectTreeReader extends TreeReader {
     private InStream stream;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths;
 
     StringDirectTreeReader(Path path, int columnId) {
       super(path, columnId);
@@ -784,7 +863,11 @@ class RecordReaderImpl implements Record
 
     @Override
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      // PASS
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
     }
 
     @Override
@@ -795,8 +878,8 @@ class RecordReaderImpl implements Record
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
       stream = streams.get(name);
-      lengths = new RunLengthIntegerReader(streams.get(new
-          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+      lengths = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
           false);
     }
 
@@ -851,7 +934,7 @@ class RecordReaderImpl implements Record
   private static class StringDictionaryTreeReader extends TreeReader {
     private DynamicByteArray dictionaryBuffer;
     private int[] dictionaryOffsets;
-    private RunLengthIntegerReader reader;
+    private IntegerReader reader;
 
     StringDictionaryTreeReader(Path path, int columnId) {
       super(path, columnId);
@@ -859,7 +942,11 @@ class RecordReaderImpl implements Record
 
     @Override
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      // PASS
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
     }
 
     @Override
@@ -884,7 +971,8 @@ class RecordReaderImpl implements Record
       // read the lengths
       name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
       in = streams.get(name);
-      RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+      IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
+          .getKind(), in, false);
       int offset = 0;
       if (dictionaryOffsets == null ||
           dictionaryOffsets.length < dictionarySize + 1) {
@@ -899,7 +987,8 @@ class RecordReaderImpl implements Record
 
       // set up the row reader
       name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
-      reader = new RunLengthIntegerReader(streams.get(name), false);
+      reader = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(name), false);
     }
 
     @Override
@@ -1101,7 +1190,7 @@ class RecordReaderImpl implements Record
 
   private static class ListTreeReader extends TreeReader {
     private final TreeReader elementReader;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths = null;
 
     ListTreeReader(Path path, int columnId,
                    List<OrcProto.Type> types,
@@ -1150,12 +1239,22 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
-      lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.LENGTH)), false);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.LENGTH)), false);
       if (elementReader != null) {
         elementReader.startStripe(streams, encodings);
       }
@@ -1175,7 +1274,7 @@ class RecordReaderImpl implements Record
   private static class MapTreeReader extends TreeReader {
     private final TreeReader keyReader;
     private final TreeReader valueReader;
-    private RunLengthIntegerReader lengths;
+    private IntegerReader lengths = null;
 
     MapTreeReader(Path path,
                   int columnId,
@@ -1228,12 +1327,22 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+          (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
       super.startStripe(streams, encodings);
-      lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.LENGTH)), false);
+      lengths = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(new StreamName(columnId,
+              OrcProto.Stream.Kind.LENGTH)), false);
       if (keyReader != null) {
         keyReader.startStripe(streams, encodings);
       }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig Mon Aug 12 15:03:30 2013
@@ -0,0 +1,1505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+class RecordReaderImpl implements RecordReader {
+  private final FSDataInputStream file;
+  private final long firstRow;
+  private final List<StripeInformation> stripes =
+    new ArrayList<StripeInformation>();
+  private OrcProto.StripeFooter stripeFooter;
+  private final long totalRowCount;
+  private final CompressionCodec codec;
+  private final int bufferSize;
+  private final boolean[] included;
+  private final long rowIndexStride;
+  private long rowInStripe = 0;
+  private int currentStripe = 0;
+  private long rowBaseInStripe = 0;
+  private long rowCountInStripe = 0;
+  private final Map<StreamName, InStream> streams =
+      new HashMap<StreamName, InStream>();
+  private final TreeReader reader;
+  private final OrcProto.RowIndex[] indexes;
+
+  RecordReaderImpl(Iterable<StripeInformation> stripes,
+                   FileSystem fileSystem,
+                   Path path,
+                   long offset, long length,
+                   List<OrcProto.Type> types,
+                   CompressionCodec codec,
+                   int bufferSize,
+                   boolean[] included,
+                   long strideRate
+                  ) throws IOException {
+    this.file = fileSystem.open(path);
+    this.codec = codec;
+    this.bufferSize = bufferSize;
+    this.included = included;
+    long rows = 0;
+    long skippedRows = 0;
+    for(StripeInformation stripe: stripes) {
+      long stripeStart = stripe.getOffset();
+      if (offset > stripeStart) {
+        skippedRows += stripe.getNumberOfRows();
+      } else if (stripeStart < offset + length) {
+        this.stripes.add(stripe);
+        rows += stripe.getNumberOfRows();
+      }
+    }
+    firstRow = skippedRows;
+    totalRowCount = rows;
+    reader = createTreeReader(path, 0, types, included);
+    indexes = new OrcProto.RowIndex[types.size()];
+    rowIndexStride = strideRate;
+    if (this.stripes.size() > 0) {
+      readStripe();
+    }
+  }
+
+  private static final class PositionProviderImpl implements PositionProvider {
+    private final OrcProto.RowIndexEntry entry;
+    private int index = 0;
+
+    PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+      this.entry = entry;
+    }
+
+    @Override
+    public long getNext() {
+      return entry.getPositions(index++);
+    }
+  }
+
+  private abstract static class TreeReader {
+    protected final Path path;
+    protected final int columnId;
+    private BitFieldReader present = null;
+    protected boolean valuePresent = false;
+
+    TreeReader(Path path, int columnId) {
+      this.path = path;
+      this.columnId = columnId;
+    }
+
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encoding
+                    ) throws IOException {
+      checkEncoding(encoding.get(columnId));
+      InStream in = streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.PRESENT));
+      if (in == null) {
+        present = null;
+        valuePresent = true;
+      } else {
+        present = new BitFieldReader(in, 1);
+      }
+    }
+
+    /**
+     * Seek to the given position.
+     * @param index the indexes loaded from the file
+     * @throws IOException
+     */
+    void seek(PositionProvider[] index) throws IOException {
+      if (present != null) {
+        present.seek(index[columnId]);
+      }
+    }
+
+    protected long countNonNulls(long rows) throws IOException {
+      if (present != null) {
+        long result = 0;
+        for(long c=0; c < rows; ++c) {
+          if (present.next() == 1) {
+            result += 1;
+          }
+        }
+        return result;
+      } else {
+        return rows;
+      }
+    }
+
+    abstract void skipRows(long rows) throws IOException;
+
+    Object next(Object previous) throws IOException {
+      if (present != null) {
+        valuePresent = present.next() == 1;
+      }
+      return previous;
+    }
+  }
+
+  private static class BooleanTreeReader extends TreeReader{
+    private BitFieldReader reader = null;
+
+    BooleanTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                     ) throws IOException {
+      super.startStripe(streams, encodings);
+      reader = new BitFieldReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)), 1);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      BooleanWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new BooleanWritable();
+        } else {
+          result = (BooleanWritable) previous;
+        }
+        result.set(reader.next() == 1);
+      }
+      return result;
+    }
+  }
+
+  private static class ByteTreeReader extends TreeReader{
+    private RunLengthByteReader reader = null;
+
+    ByteTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      reader = new RunLengthByteReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)));
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      ByteWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new ByteWritable();
+        } else {
+          result = (ByteWritable) previous;
+        }
+        result.set(reader.next());
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  private static class ShortTreeReader extends TreeReader{
+    private RunLengthIntegerReader reader = null;
+
+    ShortTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = new RunLengthIntegerReader(streams.get(name), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      ShortWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new ShortWritable();
+        } else {
+          result = (ShortWritable) previous;
+        }
+        result.set((short) reader.next());
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  private static class IntTreeReader extends TreeReader{
+    private RunLengthIntegerReader reader = null;
+
+    IntTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = new RunLengthIntegerReader(streams.get(name), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      IntWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new IntWritable();
+        } else {
+          result = (IntWritable) previous;
+        }
+        result.set((int) reader.next());
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  private static class LongTreeReader extends TreeReader{
+    private RunLengthIntegerReader reader = null;
+
+    LongTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = new RunLengthIntegerReader(streams.get(name), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      LongWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new LongWritable();
+        } else {
+          result = (LongWritable) previous;
+        }
+        result.set(reader.next());
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  private static class FloatTreeReader extends TreeReader{
+    private InStream stream;
+
+    FloatTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      stream.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      FloatWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new FloatWritable();
+        } else {
+          result = (FloatWritable) previous;
+        }
+        result.set(SerializationUtils.readFloat(stream));
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for(int i=0; i < items; ++i) {
+        SerializationUtils.readFloat(stream);
+      }
+    }
+  }
+
+  private static class DoubleTreeReader extends TreeReader{
+    private InStream stream;
+
+    DoubleTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name =
+        new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      stream.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      DoubleWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new DoubleWritable();
+        } else {
+          result = (DoubleWritable) previous;
+        }
+        result.set(SerializationUtils.readDouble(stream));
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      stream.skip(items * 8);
+    }
+  }
+
+  private static class BinaryTreeReader extends TreeReader{
+    private InStream stream;
+    private RunLengthIntegerReader lengths;
+
+    BinaryTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+      lengths = new RunLengthIntegerReader(streams.get(new
+          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+          false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      stream.seek(index[columnId]);
+      lengths.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      BytesWritable result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new BytesWritable();
+        } else {
+          result = (BytesWritable) previous;
+        }
+        int len = (int) lengths.next();
+        result.setSize(len);
+        int offset = 0;
+        while (len > 0) {
+          int written = stream.read(result.getBytes(), offset, len);
+          if (written < 0) {
+            throw new EOFException("Can't finish byte read from " + stream);
+          }
+          len -= written;
+          offset += written;
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long lengthToSkip = 0;
+      for(int i=0; i < items; ++i) {
+        lengthToSkip += lengths.next();
+      }
+      stream.skip(lengthToSkip);
+    }
+  }
+
+  private static class TimestampTreeReader extends TreeReader{
+    private RunLengthIntegerReader data;
+    private RunLengthIntegerReader nanos;
+
+    TimestampTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)), true);
+      nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.SECONDARY)), false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      data.seek(index[columnId]);
+      nanos.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Timestamp result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new Timestamp(0);
+        } else {
+          result = (Timestamp) previous;
+        }
+        long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
+            WriterImpl.MILLIS_PER_SECOND;
+        int newNanos = parseNanos(nanos.next());
+        // fix the rounding when we divided by 1000.
+        if (millis >= 0) {
+          millis += newNanos / 1000000;
+        } else {
+          millis -= newNanos / 1000000;
+        }
+        result.setTime(millis);
+        result.setNanos(newNanos);
+      }
+      return result;
+    }
+
+    private static int parseNanos(long serialized) {
+      int zeros = 7 & (int) serialized;
+      int result = (int) serialized >>> 3;
+      if (zeros != 0) {
+        for(int i =0; i <= zeros; ++i) {
+          result *= 10;
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      data.skip(items);
+      nanos.skip(items);
+    }
+  }
+
+  private static class DateTreeReader extends TreeReader{
+    private RunLengthIntegerReader reader = null;
+
+    DateTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = new RunLengthIntegerReader(streams.get(name), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Date result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new Date(0);
+        } else {
+          result = (Date) previous;
+        }
+        result.setTime(DateWritable.daysToMillis((int) reader.next()));
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  private static class DecimalTreeReader extends TreeReader{
+    private InStream valueStream;
+    private RunLengthIntegerReader scaleStream;
+
+    DecimalTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+    ) throws IOException {
+      super.startStripe(streams, encodings);
+      valueStream = streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA));
+      scaleStream = new RunLengthIntegerReader(streams.get(
+          new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      valueStream.seek(index[columnId]);
+      scaleStream.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      if (valuePresent) {
+        return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
+            (int) scaleStream.next());
+      }
+      return null;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for(int i=0; i < items; i++) {
+        SerializationUtils.readBigInteger(valueStream);
+      }
+      scaleStream.skip(items);
+    }
+  }
+
+  /**
+   * A tree reader that will read string columns. At the start of the
+   * stripe, it creates an internal reader based on whether a direct or
+   * dictionary encoding was used.
+   */
+  private static class StringTreeReader extends TreeReader {
+    private TreeReader reader;
+
+    StringTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId + " of " + path);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      // For each stripe, checks the encoding and initializes the appropriate
+      // reader
+      switch (encodings.get(columnId).getKind()) {
+        case DIRECT:
+          reader = new StringDirectTreeReader(path, columnId);
+          break;
+        case DICTIONARY:
+          reader = new StringDictionaryTreeReader(path, columnId);
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported encoding " +
+              encodings.get(columnId).getKind());
+      }
+      reader.startStripe(streams, encodings);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      reader.seek(index);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      return reader.next(previous);
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skipRows(items);
+    }
+  }
+
+  /**
+   * A reader for string columns that are direct encoded in the current
+   * stripe.
+   */
+  private static class StringDirectTreeReader extends TreeReader {
+    private InStream stream;
+    private RunLengthIntegerReader lengths;
+
+    StringDirectTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      // PASS
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      stream = streams.get(name);
+      lengths = new RunLengthIntegerReader(streams.get(new
+          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+          false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      stream.seek(index[columnId]);
+      lengths.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Text result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new Text();
+        } else {
+          result = (Text) previous;
+        }
+        int len = (int) lengths.next();
+        int offset = 0;
+        byte[] bytes = new byte[len];
+        while (len > 0) {
+          int written = stream.read(bytes, offset, len);
+          if (written < 0) {
+            throw new EOFException("Can't finish byte read from " + stream);
+          }
+          len -= written;
+          offset += written;
+        }
+        result.set(bytes);
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long lengthToSkip = 0;
+      for(int i=0; i < items; ++i) {
+        lengthToSkip += lengths.next();
+      }
+      stream.skip(lengthToSkip);
+    }
+  }
+
+  /**
+   * A reader for string columns that are dictionary encoded in the current
+   * stripe.
+   */
+  private static class StringDictionaryTreeReader extends TreeReader {
+    private DynamicByteArray dictionaryBuffer;
+    private int[] dictionaryOffsets;
+    private RunLengthIntegerReader reader;
+
+    StringDictionaryTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      // PASS
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+
+      // read the dictionary blob
+      int dictionarySize = encodings.get(columnId).getDictionarySize();
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DICTIONARY_DATA);
+      InStream in = streams.get(name);
+      if (in.available() > 0) {
+        dictionaryBuffer = new DynamicByteArray(64, in.available());
+        dictionaryBuffer.readAll(in);
+      } else {
+        dictionaryBuffer = null;
+      }
+      in.close();
+
+      // read the lengths
+      name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
+      in = streams.get(name);
+      RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+      int offset = 0;
+      if (dictionaryOffsets == null ||
+          dictionaryOffsets.length < dictionarySize + 1) {
+        dictionaryOffsets = new int[dictionarySize + 1];
+      }
+      for(int i=0; i < dictionarySize; ++i) {
+        dictionaryOffsets[i] = offset;
+        offset += (int) lenReader.next();
+      }
+      dictionaryOffsets[dictionarySize] = offset;
+      in.close();
+
+      // set up the row reader
+      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
+      reader = new RunLengthIntegerReader(streams.get(name), false);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Text result = null;
+      if (valuePresent) {
+        int entry = (int) reader.next();
+        if (previous == null) {
+          result = new Text();
+        } else {
+          result = (Text) previous;
+        }
+        int offset = dictionaryOffsets[entry];
+        int length;
+        // if it isn't the last entry, subtract the offsets otherwise use
+        // the buffer length.
+        if (entry < dictionaryOffsets.length - 1) {
+          length = dictionaryOffsets[entry + 1] - offset;
+        } else {
+          length = dictionaryBuffer.size() - offset;
+        }
+        // If the column is just empty strings, the size will be zero,
+        // so the buffer will be null, in that case just return result
+        // as it will default to empty
+        if (dictionaryBuffer != null) {
+          dictionaryBuffer.setText(result, offset, length);
+        } else {
+          result.clear();
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
+  private static class StructTreeReader extends TreeReader {
+    private final TreeReader[] fields;
+    private final String[] fieldNames;
+
+    StructTreeReader(Path path, int columnId,
+                     List<OrcProto.Type> types,
+                     boolean[] included) throws IOException {
+      super(path, columnId);
+      OrcProto.Type type = types.get(columnId);
+      int fieldCount = type.getFieldNamesCount();
+      this.fields = new TreeReader[fieldCount];
+      this.fieldNames = new String[fieldCount];
+      for(int i=0; i < fieldCount; ++i) {
+        int subtype = type.getSubtypes(i);
+        if (included == null || included[subtype]) {
+          this.fields[i] = createTreeReader(path, subtype, types, included);
+        }
+        this.fieldNames[i] = type.getFieldNames(i);
+      }
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      for(TreeReader kid: fields) {
+        if (kid != null) {
+          kid.seek(index);
+        }
+      }
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      OrcStruct result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new OrcStruct(fields.length);
+        } else {
+          result = (OrcStruct) previous;
+
+          // If the input format was initialized with a file with a
+          // different number of fields, the number of fields needs to
+          // be updated to the correct number
+          if (result.getNumFields() != fields.length) {
+            result.setNumFields(fields.length);
+          }
+        }
+        for(int i=0; i < fields.length; ++i) {
+          if (fields[i] != null) {
+            result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
+          }
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      for(TreeReader field: fields) {
+        if (field != null) {
+          field.startStripe(streams, encodings);
+        }
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for(TreeReader field: fields) {
+        if (field != null) {
+          field.skipRows(items);
+        }
+      }
+    }
+  }
+
+  private static class UnionTreeReader extends TreeReader {
+    private final TreeReader[] fields;
+    private RunLengthByteReader tags;
+
+    UnionTreeReader(Path path, int columnId,
+                    List<OrcProto.Type> types,
+                    boolean[] included) throws IOException {
+      super(path, columnId);
+      OrcProto.Type type = types.get(columnId);
+      int fieldCount = type.getSubtypesCount();
+      this.fields = new TreeReader[fieldCount];
+      for(int i=0; i < fieldCount; ++i) {
+        int subtype = type.getSubtypes(i);
+        if (included == null || included[subtype]) {
+          this.fields[i] = createTreeReader(path, subtype, types, included);
+        }
+      }
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      tags.seek(index[columnId]);
+      for(TreeReader kid: fields) {
+        kid.seek(index);
+      }
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      OrcUnion result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new OrcUnion();
+        } else {
+          result = (OrcUnion) previous;
+        }
+        byte tag = tags.next();
+        Object previousVal = result.getObject();
+        result.set(tag, fields[tag].next(tag == result.getTag() ?
+            previousVal : null));
+      }
+      return result;
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                     ) throws IOException {
+      super.startStripe(streams, encodings);
+      tags = new RunLengthByteReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA)));
+      for(TreeReader field: fields) {
+        if (field != null) {
+          field.startStripe(streams, encodings);
+        }
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long[] counts = new long[fields.length];
+      for(int i=0; i < items; ++i) {
+        counts[tags.next()] += 1;
+      }
+      for(int i=0; i < counts.length; ++i) {
+        fields[i].skipRows(counts[i]);
+      }
+    }
+  }
+
+  private static class ListTreeReader extends TreeReader {
+    private final TreeReader elementReader;
+    private RunLengthIntegerReader lengths;
+
+    ListTreeReader(Path path, int columnId,
+                   List<OrcProto.Type> types,
+                   boolean[] included) throws IOException {
+      super(path, columnId);
+      OrcProto.Type type = types.get(columnId);
+      elementReader = createTreeReader(path, type.getSubtypes(0), types,
+          included);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      lengths.seek(index[columnId]);
+      elementReader.seek(index);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      List<Object> result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new ArrayList<Object>();
+        } else {
+          result = (ArrayList<Object>) previous;
+        }
+        int prevLength = result.size();
+        int length = (int) lengths.next();
+        // extend the list to the new length
+        for(int i=prevLength; i < length; ++i) {
+          result.add(null);
+        }
+        // read the new elements into the array
+        for(int i=0; i< length; i++) {
+          result.set(i, elementReader.next(i < prevLength ?
+              result.get(i) : null));
+        }
+        // remove any extra elements
+        for(int i=prevLength - 1; i >= length; --i) {
+          result.remove(i);
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.LENGTH)), false);
+      if (elementReader != null) {
+        elementReader.startStripe(streams, encodings);
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long childSkip = 0;
+      for(long i=0; i < items; ++i) {
+        childSkip += lengths.next();
+      }
+      elementReader.skipRows(childSkip);
+    }
+  }
+
+  private static class MapTreeReader extends TreeReader {
+    private final TreeReader keyReader;
+    private final TreeReader valueReader;
+    private RunLengthIntegerReader lengths;
+
+    MapTreeReader(Path path,
+                  int columnId,
+                  List<OrcProto.Type> types,
+                  boolean[] included) throws IOException {
+      super(path, columnId);
+      OrcProto.Type type = types.get(columnId);
+      int keyColumn = type.getSubtypes(0);
+      int valueColumn = type.getSubtypes(1);
+      if (included == null || included[keyColumn]) {
+        keyReader = createTreeReader(path, keyColumn, types, included);
+      } else {
+        keyReader = null;
+      }
+      if (included == null || included[valueColumn]) {
+        valueReader = createTreeReader(path, valueColumn, types, included);
+      } else {
+        valueReader = null;
+      }
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      lengths.seek(index[columnId]);
+      keyReader.seek(index);
+      valueReader.seek(index);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Map<Object, Object> result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new HashMap<Object, Object>();
+        } else {
+          result = (HashMap<Object, Object>) previous;
+        }
+        // for now just clear and create new objects
+        result.clear();
+        int length = (int) lengths.next();
+        // read the new elements into the array
+        for(int i=0; i< length; i++) {
+          result.put(keyReader.next(null), valueReader.next(null));
+        }
+      }
+      return result;
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.LENGTH)), false);
+      if (keyReader != null) {
+        keyReader.startStripe(streams, encodings);
+      }
+      if (valueReader != null) {
+        valueReader.startStripe(streams, encodings);
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      long childSkip = 0;
+      for(long i=0; i < items; ++i) {
+        childSkip += lengths.next();
+      }
+      keyReader.skipRows(childSkip);
+      valueReader.skipRows(childSkip);
+    }
+  }
+
+  private static TreeReader createTreeReader(Path path,
+                                             int columnId,
+                                             List<OrcProto.Type> types,
+                                             boolean[] included
+                                            ) throws IOException {
+    OrcProto.Type type = types.get(columnId);
+    switch (type.getKind()) {
+      case BOOLEAN:
+        return new BooleanTreeReader(path, columnId);
+      case BYTE:
+        return new ByteTreeReader(path, columnId);
+      case DOUBLE:
+        return new DoubleTreeReader(path, columnId);
+      case FLOAT:
+        return new FloatTreeReader(path, columnId);
+      case SHORT:
+        return new ShortTreeReader(path, columnId);
+      case INT:
+        return new IntTreeReader(path, columnId);
+      case LONG:
+        return new LongTreeReader(path, columnId);
+      case STRING:
+        return new StringTreeReader(path, columnId);
+      case BINARY:
+        return new BinaryTreeReader(path, columnId);
+      case TIMESTAMP:
+        return new TimestampTreeReader(path, columnId);
+      case DATE:
+        return new DateTreeReader(path, columnId);
+      case DECIMAL:
+        return new DecimalTreeReader(path, columnId);
+      case STRUCT:
+        return new StructTreeReader(path, columnId, types, included);
+      case LIST:
+        return new ListTreeReader(path, columnId, types, included);
+      case MAP:
+        return new MapTreeReader(path, columnId, types, included);
+      case UNION:
+        return new UnionTreeReader(path, columnId, types, included);
+      default:
+        throw new IllegalArgumentException("Unsupported type " +
+          type.getKind());
+    }
+  }
+
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+                                         ) throws IOException {
+    long offset = stripe.getOffset() + stripe.getIndexLength() +
+        stripe.getDataLength();
+    int tailLength = (int) stripe.getFooterLength();
+
+    // read the footer
+    ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+    file.seek(offset);
+    file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+    return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
+      codec, bufferSize));
+  }
+
+  private void readStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    long offset = stripe.getOffset();
+    streams.clear();
+
+    // if we aren't projecting columns, just read the whole stripe
+    if (included == null) {
+      byte[] buffer =
+        new byte[(int) (stripe.getDataLength())];
+      file.seek(offset + stripe.getIndexLength());
+      file.readFully(buffer, 0, buffer.length);
+      int sectionOffset = 0;
+      for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+        if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+          int sectionLength = (int) section.getLength();
+          ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+              sectionLength);
+          StreamName name = new StreamName(section.getColumn(),
+              section.getKind());
+          streams.put(name,
+              InStream.create(name.toString(), sectionBuffer, codec,
+                  bufferSize));
+          sectionOffset += sectionLength;
+        }
+      }
+    } else {
+      List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+      // the index of the current section
+      int currentSection = 0;
+      while (currentSection < streamList.size() &&
+          StreamName.getArea(streamList.get(currentSection).getKind()) !=
+              StreamName.Area.DATA) {
+        currentSection += 1;
+      }
+      // byte position of the current section relative to the stripe start
+      long sectionOffset = stripe.getIndexLength();
+      while (currentSection < streamList.size()) {
+        int bytes = 0;
+
+        // find the first section that shouldn't be read
+        int excluded=currentSection;
+        while (excluded < streamList.size() &&
+               included[streamList.get(excluded).getColumn()]) {
+          bytes += streamList.get(excluded).getLength();
+          excluded += 1;
+        }
+
+        // actually read the bytes as a big chunk
+        if (bytes != 0) {
+          byte[] buffer = new byte[bytes];
+          file.seek(offset + sectionOffset);
+          file.readFully(buffer, 0, bytes);
+          sectionOffset += bytes;
+
+          // create the streams for the sections we just read
+          bytes = 0;
+          while (currentSection < excluded) {
+            OrcProto.Stream section = streamList.get(currentSection);
+            StreamName name =
+              new StreamName(section.getColumn(), section.getKind());
+            this.streams.put(name,
+                InStream.create(name.toString(),
+                    ByteBuffer.wrap(buffer, bytes,
+                        (int) section.getLength()), codec, bufferSize));
+            currentSection += 1;
+            bytes += section.getLength();
+          }
+        }
+
+        // skip forward until we get back to a section that we need
+        while (currentSection < streamList.size() &&
+               !included[streamList.get(currentSection).getColumn()]) {
+          sectionOffset += streamList.get(currentSection).getLength();
+          currentSection += 1;
+        }
+      }
+    }
+    reader.startStripe(streams, stripeFooter.getColumnsList());
+    rowInStripe = 0;
+    rowCountInStripe = stripe.getNumberOfRows();
+    rowBaseInStripe = 0;
+    for(int i=0; i < currentStripe; ++i) {
+      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+    }
+    for(int i=0; i < indexes.length; ++i) {
+      indexes[i] = null;
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+  }
+
+  @Override
+  public Object next(Object previous) throws IOException {
+    if (rowInStripe >= rowCountInStripe) {
+      currentStripe += 1;
+      readStripe();
+    }
+    rowInStripe += 1;
+    return reader.next(previous);
+  }
+
+  @Override
+  public void close() throws IOException {
+    file.close();
+  }
+
+  @Override
+  public long getRowNumber() {
+    return rowInStripe + rowBaseInStripe + firstRow;
+  }
+
+  /**
+   * Return the fraction of rows that have been read from the selected.
+   * section of the file
+   * @return fraction between 0.0 and 1.0 of rows consumed
+   */
+  @Override
+  public float getProgress() {
+    return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+  }
+
+  private int findStripe(long rowNumber) {
+    if (rowNumber < 0) {
+      throw new IllegalArgumentException("Seek to a negative row number " +
+          rowNumber);
+    } else if (rowNumber < firstRow) {
+      throw new IllegalArgumentException("Seek before reader range " +
+          rowNumber);
+    }
+    rowNumber -= firstRow;
+    for(int i=0; i < stripes.size(); i++) {
+      StripeInformation stripe = stripes.get(i);
+      if (stripe.getNumberOfRows() > rowNumber) {
+        return i;
+      }
+      rowNumber -= stripe.getNumberOfRows();
+    }
+    throw new IllegalArgumentException("Seek after the end of reader range");
+  }
+
+  private void readRowIndex() throws IOException {
+    long offset = stripes.get(currentStripe).getOffset();
+    for(OrcProto.Stream stream: stripeFooter.getStreamsList()) {
+      if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
+        int col = stream.getColumn();
+        if ((included == null || included[col]) && indexes[col] == null) {
+          byte[] buffer = new byte[(int) stream.getLength()];
+          file.seek(offset);
+          file.readFully(buffer);
+          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+              ByteBuffer.wrap(buffer), codec, bufferSize));
+        }
+      }
+      offset += stream.getLength();
+    }
+  }
+
+  private void seekToRowEntry(int rowEntry) throws IOException {
+    PositionProvider[] index = new PositionProvider[indexes.length];
+    for(int i=0; i < indexes.length; ++i) {
+      if (indexes[i] != null) {
+        index[i]=
+            new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+      }
+    }
+    reader.seek(index);
+  }
+
+  @Override
+  public void seekToRow(long rowNumber) throws IOException {
+    int rightStripe = findStripe(rowNumber);
+    if (rightStripe != currentStripe) {
+      currentStripe = rightStripe;
+      readStripe();
+    }
+    readRowIndex();
+    rowInStripe = rowNumber - rowBaseInStripe - firstRow;
+    if (rowIndexStride != 0) {
+      long entry = rowInStripe / rowIndexStride;
+      seekToRowEntry((int) entry);
+      reader.skipRows(rowInStripe - entry * rowIndexStride);
+    } else {
+      reader.skipRows(rowInStripe);
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Mon Aug 12 15:03:30 2013
@@ -23,7 +23,7 @@ import java.io.IOException;
 /**
  * A reader that reads a sequence of integers.
  * */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
   private final InStream input;
   private final boolean signed;
   private final long[] literals =
@@ -71,11 +71,13 @@ class RunLengthIntegerReader {
     }
   }
 
-  boolean hasNext() throws IOException {
+  @Override
+  public boolean hasNext() throws IOException {
     return used != numLiterals || input.available() > 0;
   }
 
-  long next() throws IOException {
+  @Override
+  public long next() throws IOException {
     long result;
     if (used == numLiterals) {
       readValues();
@@ -88,7 +90,8 @@ class RunLengthIntegerReader {
     return result;
   }
 
-  void seek(PositionProvider index) throws IOException {
+  @Override
+  public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();
     if (consumed != 0) {
@@ -104,7 +107,8 @@ class RunLengthIntegerReader {
     }
   }
 
-  void skip(long numValues) throws IOException {
+  @Override
+  public void skip(long numValues) throws IOException {
     while (numValues > 0) {
       if (used == numLiterals) {
         readValues();