You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 05:37:01 UTC

[1/3] hive git commit: HIVE-12054. Create vectorized ORC write method. (omalley reviewed by prasanthj)

Repository: hive
Updated Branches:
  refs/heads/master 255b2bdd9 -> 7f65e36d9


http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 8fa388b..99744cd 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -157,6 +157,19 @@ public class BytesColumnVector extends ColumnVector {
   }
 
   /**
+   * Set a field by actually copying in to a local buffer.
+   * If you must actually copy data in to the array, use this method.
+   * DO NOT USE this method unless it's not practical to set data by reference with setRef().
+   * Setting data by reference tends to run a lot faster than copying data in.
+   *
+   * @param elementNum index within column vector to set
+   * @param sourceBuf container of source data
+   */
+  public void setVal(int elementNum, byte[] sourceBuf) {
+    setVal(elementNum, sourceBuf, 0, sourceBuf.length);
+  }
+
+  /**
    * Set a field to the concatenation of two string values. Result data is copied
    * into the internal buffer.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index f18b911..fcb1ae9 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -83,22 +83,31 @@ public abstract class ColumnVector {
   }
 
   /**
-     * Resets the column to default state
-     *  - fills the isNull array with false
-     *  - sets noNulls to true
-     *  - sets isRepeating to false
-     */
-    public void reset() {
-      if (!noNulls) {
-        Arrays.fill(isNull, false);
-      }
-      noNulls = true;
-      isRepeating = false;
-      preFlattenNoNulls = true;
-      preFlattenIsRepeating = false;
+   * Resets the column to default state
+   *  - fills the isNull array with false
+   *  - sets noNulls to true
+   *  - sets isRepeating to false
+   */
+  public void reset() {
+    if (!noNulls) {
+      Arrays.fill(isNull, false);
     }
+    noNulls = true;
+    isRepeating = false;
+    preFlattenNoNulls = true;
+    preFlattenIsRepeating = false;
+  }
+
+  /**
+   * Sets the isRepeating flag. Recurses over structs and unions so that the
+   * flags are set correctly.
+   * @param isRepeating
+   */
+  public void setRepeating(boolean isRepeating) {
+    this.isRepeating = isRepeating;
+  }
 
-    abstract public void flatten(boolean selectedInUse, int[] sel, int size);
+  abstract public void flatten(boolean selectedInUse, int[] sel, int size);
 
     // Simplify vector by brute-force flattening noNulls if isRepeating
     // This can be used to reduce combinatorial explosion of code paths in VectorExpressions

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
index f7c8b05..cf07bca 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
@@ -121,4 +121,12 @@ public class StructColumnVector extends ColumnVector {
       fields[i].unFlatten();
     }
   }
+
+  @Override
+  public void setRepeating(boolean isRepeating) {
+    super.setRepeating(isRepeating);
+    for(int i=0; i < fields.length; ++i) {
+      fields[i].setRepeating(isRepeating);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
index 2b3b013..298d588 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
@@ -131,4 +131,12 @@ public class UnionColumnVector extends ColumnVector {
       fields[i].unFlatten();
     }
   }
+
+  @Override
+  public void setRepeating(boolean isRepeating) {
+    super.setRepeating(isRepeating);
+    for(int i=0; i < fields.length; ++i) {
+      fields[i].setRepeating(isRepeating);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
index 7c18da6..e85491b 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
@@ -183,4 +183,14 @@ public class VectorizedRowBatch implements Writable {
       }
     }
   }
+
+  /**
+   * Set the maximum number of rows in the batch.
+   * Data is not preserved.
+   */
+  public void ensureSize(int rows) {
+    for(int i=0; i < cols.length; ++i) {
+      cols[i].ensureSize(rows, false);
+    }
+  }
 }


[3/3] hive git commit: HIVE-12054. Create vectorized ORC write method. (omalley reviewed by prasanthj)

Posted by om...@apache.org.
HIVE-12054. Create vectorized ORC write method. (omalley reviewed by prasanthj)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f65e36d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f65e36d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f65e36d

Branch: refs/heads/master
Commit: 7f65e36d9aa6ce9af8bf799fe07e0e0e7d749a0e
Parents: 255b2bd
Author: Owen O'Malley <om...@apache.org>
Authored: Wed Nov 11 15:24:03 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue Nov 17 20:35:39 2015 -0800

----------------------------------------------------------------------
 .../apache/hive/common/util/BloomFilter.java    |   18 +-
 .../org/apache/hive/common/util/Murmur3.java    |  107 +-
 .../apache/hive/common/util/TestMurmur3.java    |   45 +-
 .../hive/ql/io/orc/ColumnStatisticsImpl.java    |   79 +-
 .../hadoop/hive/ql/io/orc/MemoryManager.java    |    6 +-
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |    4 +
 .../hive/ql/io/orc/StringRedBlackTree.java      |    5 +
 .../hadoop/hive/ql/io/orc/TypeDescription.java  |   74 +
 .../apache/hadoop/hive/ql/io/orc/Writer.java    |    7 +
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |  852 +++++-
 .../hive/ql/io/orc/TestColumnStatistics.java    |   16 +-
 .../hive/ql/io/orc/TestMemoryManager.java       |    2 +-
 .../hadoop/hive/ql/io/orc/TestOrcFile.java      |    5 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   12 +-
 .../hive/ql/io/orc/TestVectorOrcFile.java       | 2744 ++++++++++++++++++
 .../hive/ql/exec/vector/BytesColumnVector.java  |   13 +
 .../hive/ql/exec/vector/ColumnVector.java       |   37 +-
 .../hive/ql/exec/vector/StructColumnVector.java |    8 +
 .../hive/ql/exec/vector/UnionColumnVector.java  |    8 +
 .../hive/ql/exec/vector/VectorizedRowBatch.java |   10 +
 20 files changed, 3917 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/common/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java
index d894241..bb0b8f2 100644
--- a/common/src/java/org/apache/hive/common/util/BloomFilter.java
+++ b/common/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -89,20 +89,21 @@ public class BloomFilter {
 
   public void add(byte[] val) {
     if (val == null) {
-      addBytes(val, -1);
+      addBytes(val, -1, -1);
     } else {
-      addBytes(val, val.length);
+      addBytes(val, 0, val.length);
     }
   }
 
-  public void addBytes(byte[] val, int length) {
+  public void addBytes(byte[] val, int offset, int length) {
     // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
     // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
     // implement a Bloom filter without any loss in the asymptotic false positive probability'
 
     // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
     // in the above paper
-    long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val, length);
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+        Murmur3.hash64(val, offset, length);
     addHash(hash64);
   }
 
@@ -139,13 +140,14 @@ public class BloomFilter {
 
   public boolean test(byte[] val) {
     if (val == null) {
-      return testBytes(val, -1);
+      return testBytes(val, -1, -1);
     }
-    return testBytes(val, val.length);
+    return testBytes(val, 0, val.length);
   }
 
-  public boolean testBytes(byte[] val, int length) {
-    long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val, length);
+  public boolean testBytes(byte[] val, int offset, int length) {
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+        Murmur3.hash64(val, offset, length);
     return testHash(hash64);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/common/src/java/org/apache/hive/common/util/Murmur3.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/Murmur3.java b/common/src/java/org/apache/hive/common/util/Murmur3.java
index 087407a..88c3514 100644
--- a/common/src/java/org/apache/hive/common/util/Murmur3.java
+++ b/common/src/java/org/apache/hive/common/util/Murmur3.java
@@ -128,11 +128,11 @@ public class Murmur3 {
    * @return - hashcode
    */
   public static long hash64(byte[] data) {
-    return hash64(data, data.length, DEFAULT_SEED);
+    return hash64(data, 0, data.length, DEFAULT_SEED);
   }
 
-  public static long hash64(byte[] data, int length) {
-    return hash64(data, length, DEFAULT_SEED);
+  public static long hash64(byte[] data, int offset, int length) {
+    return hash64(data, offset, length, DEFAULT_SEED);
   }
 
   /**
@@ -143,21 +143,21 @@ public class Murmur3 {
    * @param seed   - seed. (default is 0)
    * @return - hashcode
    */
-  public static long hash64(byte[] data, int length, int seed) {
+  public static long hash64(byte[] data, int offset, int length, int seed) {
     long hash = seed;
     final int nblocks = length >> 3;
 
     // body
     for (int i = 0; i < nblocks; i++) {
       final int i8 = i << 3;
-      long k = ((long) data[i8] & 0xff)
-          | (((long) data[i8 + 1] & 0xff) << 8)
-          | (((long) data[i8 + 2] & 0xff) << 16)
-          | (((long) data[i8 + 3] & 0xff) << 24)
-          | (((long) data[i8 + 4] & 0xff) << 32)
-          | (((long) data[i8 + 5] & 0xff) << 40)
-          | (((long) data[i8 + 6] & 0xff) << 48)
-          | (((long) data[i8 + 7] & 0xff) << 56);
+      long k = ((long) data[offset + i8] & 0xff)
+          | (((long) data[offset + i8 + 1] & 0xff) << 8)
+          | (((long) data[offset + i8 + 2] & 0xff) << 16)
+          | (((long) data[offset + i8 + 3] & 0xff) << 24)
+          | (((long) data[offset + i8 + 4] & 0xff) << 32)
+          | (((long) data[offset + i8 + 5] & 0xff) << 40)
+          | (((long) data[offset + i8 + 6] & 0xff) << 48)
+          | (((long) data[offset + i8 + 7] & 0xff) << 56);
 
       // mix functions
       k *= C1;
@@ -172,19 +172,19 @@ public class Murmur3 {
     int tailStart = nblocks << 3;
     switch (length - tailStart) {
       case 7:
-        k1 ^= ((long) data[tailStart + 6] & 0xff) << 48;
+        k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
       case 6:
-        k1 ^= ((long) data[tailStart + 5] & 0xff) << 40;
+        k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
       case 5:
-        k1 ^= ((long) data[tailStart + 4] & 0xff) << 32;
+        k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
       case 4:
-        k1 ^= ((long) data[tailStart + 3] & 0xff) << 24;
+        k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
       case 3:
-        k1 ^= ((long) data[tailStart + 2] & 0xff) << 16;
+        k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
       case 2:
-        k1 ^= ((long) data[tailStart + 1] & 0xff) << 8;
+        k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
       case 1:
-        k1 ^= ((long) data[tailStart] & 0xff);
+        k1 ^= ((long) data[offset + tailStart] & 0xff);
         k1 *= C1;
         k1 = Long.rotateLeft(k1, R1);
         k1 *= C2;
@@ -205,18 +205,19 @@ public class Murmur3 {
    * @return - hashcode (2 longs)
    */
   public static long[] hash128(byte[] data) {
-    return hash128(data, data.length, DEFAULT_SEED);
+    return hash128(data, 0, data.length, DEFAULT_SEED);
   }
 
   /**
    * Murmur3 128-bit variant.
    *
    * @param data   - input byte array
+   * @param offset - the first element of array
    * @param length - length of array
    * @param seed   - seed. (default is 0)
    * @return - hashcode (2 longs)
    */
-  public static long[] hash128(byte[] data, int length, int seed) {
+  public static long[] hash128(byte[] data, int offset, int length, int seed) {
     long h1 = seed;
     long h2 = seed;
     final int nblocks = length >> 4;
@@ -224,23 +225,23 @@ public class Murmur3 {
     // body
     for (int i = 0; i < nblocks; i++) {
       final int i16 = i << 4;
-      long k1 = ((long) data[i16] & 0xff)
-          | (((long) data[i16 + 1] & 0xff) << 8)
-          | (((long) data[i16 + 2] & 0xff) << 16)
-          | (((long) data[i16 + 3] & 0xff) << 24)
-          | (((long) data[i16 + 4] & 0xff) << 32)
-          | (((long) data[i16 + 5] & 0xff) << 40)
-          | (((long) data[i16 + 6] & 0xff) << 48)
-          | (((long) data[i16 + 7] & 0xff) << 56);
-
-      long k2 = ((long) data[i16 + 8] & 0xff)
-          | (((long) data[i16 + 9] & 0xff) << 8)
-          | (((long) data[i16 + 10] & 0xff) << 16)
-          | (((long) data[i16 + 11] & 0xff) << 24)
-          | (((long) data[i16 + 12] & 0xff) << 32)
-          | (((long) data[i16 + 13] & 0xff) << 40)
-          | (((long) data[i16 + 14] & 0xff) << 48)
-          | (((long) data[i16 + 15] & 0xff) << 56);
+      long k1 = ((long) data[offset + i16] & 0xff)
+          | (((long) data[offset + i16 + 1] & 0xff) << 8)
+          | (((long) data[offset + i16 + 2] & 0xff) << 16)
+          | (((long) data[offset + i16 + 3] & 0xff) << 24)
+          | (((long) data[offset + i16 + 4] & 0xff) << 32)
+          | (((long) data[offset + i16 + 5] & 0xff) << 40)
+          | (((long) data[offset + i16 + 6] & 0xff) << 48)
+          | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+      long k2 = ((long) data[offset + i16 + 8] & 0xff)
+          | (((long) data[offset + i16 + 9] & 0xff) << 8)
+          | (((long) data[offset + i16 + 10] & 0xff) << 16)
+          | (((long) data[offset + i16 + 11] & 0xff) << 24)
+          | (((long) data[offset + i16 + 12] & 0xff) << 32)
+          | (((long) data[offset + i16 + 13] & 0xff) << 40)
+          | (((long) data[offset + i16 + 14] & 0xff) << 48)
+          | (((long) data[offset + i16 + 15] & 0xff) << 56);
 
       // mix functions for k1
       k1 *= C1;
@@ -267,40 +268,40 @@ public class Murmur3 {
     int tailStart = nblocks << 4;
     switch (length - tailStart) {
       case 15:
-        k2 ^= (long) (data[tailStart + 14] & 0xff) << 48;
+        k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
       case 14:
-        k2 ^= (long) (data[tailStart + 13] & 0xff) << 40;
+        k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
       case 13:
-        k2 ^= (long) (data[tailStart + 12] & 0xff) << 32;
+        k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
       case 12:
-        k2 ^= (long) (data[tailStart + 11] & 0xff) << 24;
+        k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
       case 11:
-        k2 ^= (long) (data[tailStart + 10] & 0xff) << 16;
+        k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
       case 10:
-        k2 ^= (long) (data[tailStart + 9] & 0xff) << 8;
+        k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
       case 9:
-        k2 ^= (long) (data[tailStart + 8] & 0xff);
+        k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
         k2 *= C2;
         k2 = Long.rotateLeft(k2, R3);
         k2 *= C1;
         h2 ^= k2;
 
       case 8:
-        k1 ^= (long) (data[tailStart + 7] & 0xff) << 56;
+        k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
       case 7:
-        k1 ^= (long) (data[tailStart + 6] & 0xff) << 48;
+        k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
       case 6:
-        k1 ^= (long) (data[tailStart + 5] & 0xff) << 40;
+        k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
       case 5:
-        k1 ^= (long) (data[tailStart + 4] & 0xff) << 32;
+        k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
       case 4:
-        k1 ^= (long) (data[tailStart + 3] & 0xff) << 24;
+        k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
       case 3:
-        k1 ^= (long) (data[tailStart + 2] & 0xff) << 16;
+        k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
       case 2:
-        k1 ^= (long) (data[tailStart + 1] & 0xff) << 8;
+        k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
       case 1:
-        k1 ^= (long) (data[tailStart] & 0xff);
+        k1 ^= (long) (data[offset + tailStart] & 0xff);
         k1 *= C1;
         k1 = Long.rotateLeft(k1, R1);
         k1 *= C2;

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/common/src/test/org/apache/hive/common/util/TestMurmur3.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestMurmur3.java b/common/src/test/org/apache/hive/common/util/TestMurmur3.java
index e506f71..5facc7c 100644
--- a/common/src/test/org/apache/hive/common/util/TestMurmur3.java
+++ b/common/src/test/org/apache/hive/common/util/TestMurmur3.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.Arrays;
 import java.util.Random;
 
 /**
@@ -102,7 +103,7 @@ public class TestMurmur3 {
     buf.flip();
     long gl1 = buf.getLong();
     long gl2 = buf.getLong(8);
-    long[] hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed);
+    long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed);
     long m1 = hc[0];
     long m2 = hc[1];
     assertEquals(gl1, m1);
@@ -114,11 +115,39 @@ public class TestMurmur3 {
     buf.flip();
     gl1 = buf.getLong();
     gl2 = buf.getLong(8);
-    hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed);
+    byte[] keyBytes = key.getBytes();
+    hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed);
     m1 = hc[0];
     m2 = hc[1];
     assertEquals(gl1, m1);
     assertEquals(gl2, m2);
+
+    byte[] offsetKeyBytes = new byte[keyBytes.length + 35];
+    Arrays.fill(offsetKeyBytes, (byte) -1);
+    System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length);
+    hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed);
+    assertEquals(gl1, hc[0]);
+    assertEquals(gl2, hc[1]);
+  }
+
+  @Test
+  public void testHashCodeM3_64() {
+    byte[] origin = ("It was the best of times, it was the worst of times," +
+        " it was the age of wisdom, it was the age of foolishness," +
+        " it was the epoch of belief, it was the epoch of incredulity," +
+        " it was the season of Light, it was the season of Darkness," +
+        " it was the spring of hope, it was the winter of despair," +
+        " we had everything before us, we had nothing before us," +
+        " we were all going direct to Heaven," +
+        " we were all going direct the other way.").getBytes();
+    long hash = Murmur3.hash64(origin, 0, origin.length);
+    assertEquals(305830725663368540L, hash);
+
+    byte[] originOffset = new byte[origin.length + 150];
+    Arrays.fill(originOffset, (byte) 123);
+    System.arraycopy(origin, 0, originOffset, 150, origin.length);
+    hash = Murmur3.hash64(originOffset, 150, origin.length);
+    assertEquals(305830725663368540L, hash);
   }
 
   @Test
@@ -135,11 +164,17 @@ public class TestMurmur3 {
       buf.flip();
       long gl1 = buf.getLong();
       long gl2 = buf.getLong(8);
-      long[] hc = Murmur3.hash128(data, data.length, seed);
+      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
       long m1 = hc[0];
       long m2 = hc[1];
       assertEquals(gl1, m1);
       assertEquals(gl2, m2);
+
+      byte[] offsetData = new byte[data.length + 50];
+      System.arraycopy(data, 0, offsetData, 50, data.length);
+      hc = Murmur3.hash128(offsetData, 50, data.length, seed);
+      assertEquals(gl1, hc[0]);
+      assertEquals(gl2, hc[1]);
     }
   }
 
@@ -157,7 +192,7 @@ public class TestMurmur3 {
       buf.flip();
       long gl1 = buf.getLong();
       long gl2 = buf.getLong(8);
-      long[] hc = Murmur3.hash128(data, data.length, seed);
+      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
       long m1 = hc[0];
       long m2 = hc[1];
       assertEquals(gl1, m1);
@@ -179,7 +214,7 @@ public class TestMurmur3 {
       buf.flip();
       long gl1 = buf.getLong();
       long gl2 = buf.getLong(8);
-      long[] hc = Murmur3.hash128(data, data.length, seed);
+      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
       long m1 = hc[0];
       long m2 = hc[1];
       assertEquals(gl1, m1);

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
index f39d3e2..bcca9de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
 
 class ColumnStatisticsImpl implements ColumnStatistics {
 
@@ -47,9 +48,9 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     }
 
     @Override
-    void updateBoolean(boolean value) {
+    void updateBoolean(boolean value, int repetitions) {
       if (value) {
-        trueCount += 1;
+        trueCount += repetitions;
       }
     }
 
@@ -132,7 +133,7 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     }
 
     @Override
-    void updateInteger(long value) {
+    void updateInteger(long value, int repetitions) {
       if (!hasMinimum) {
         hasMinimum = true;
         minimum = value;
@@ -144,7 +145,7 @@ class ColumnStatisticsImpl implements ColumnStatistics {
       }
       if (!overflow) {
         boolean wasPositive = sum >= 0;
-        sum += value;
+        sum += value * repetitions;
         if ((value >= 0) == wasPositive) {
           overflow = (sum >= 0) != wasPositive;
         }
@@ -398,6 +399,23 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     }
 
     @Override
+    void updateString(byte[] bytes, int offset, int length, int repetitions) {
+      if (minimum == null) {
+        maximum = minimum = new Text();
+        maximum.set(bytes, offset, length);
+      } else if (WritableComparator.compareBytes(minimum.getBytes(), 0,
+          minimum.getLength(), bytes, offset, length) > 0) {
+        minimum = new Text();
+        minimum.set(bytes, offset, length);
+      } else if (WritableComparator.compareBytes(maximum.getBytes(), 0,
+          maximum.getLength(), bytes, offset, length) < 0) {
+        maximum = new Text();
+        maximum.set(bytes, offset, length);
+      }
+      sum += length * repetitions;
+    }
+
+    @Override
     void merge(ColumnStatisticsImpl other) {
       if (other instanceof StringStatisticsImpl) {
         StringStatisticsImpl str = (StringStatisticsImpl) other;
@@ -498,6 +516,11 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     }
 
     @Override
+    void updateBinary(byte[] bytes, int offset, int length, int repetitions) {
+      sum += length * repetitions;
+    }
+
+    @Override
     void merge(ColumnStatisticsImpl other) {
       if (other instanceof BinaryColumnStatistics) {
         BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other;
@@ -700,6 +723,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     }
 
     @Override
+    void updateDate(int value) {
+      if (minimum == null) {
+        minimum = value;
+        maximum = value;
+      } else if (minimum > value) {
+        minimum = value;
+      } else if (maximum < value) {
+        maximum = value;
+      }
+    }
+
+    @Override
     void merge(ColumnStatisticsImpl other) {
       if (other instanceof DateStatisticsImpl) {
         DateStatisticsImpl dateStats = (DateStatisticsImpl) other;
@@ -809,6 +844,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     }
 
     @Override
+    void updateTimestamp(long value) {
+      if (minimum == null) {
+        minimum = value;
+        maximum = value;
+      } else if (minimum > value) {
+        minimum = value;
+      } else if (maximum < value) {
+        maximum = value;
+      }
+    }
+
+    @Override
     void merge(ColumnStatisticsImpl other) {
       if (other instanceof TimestampStatisticsImpl) {
         TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other;
@@ -889,15 +936,19 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     count += 1;
   }
 
+  void increment(int count) {
+    this.count += count;
+  }
+
   void setNull() {
     hasNull = true;
   }
 
-  void updateBoolean(boolean value) {
+  void updateBoolean(boolean value, int repetitions) {
     throw new UnsupportedOperationException("Can't update boolean");
   }
 
-  void updateInteger(long value) {
+  void updateInteger(long value, int repetitions) {
     throw new UnsupportedOperationException("Can't update integer");
   }
 
@@ -909,10 +960,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     throw new UnsupportedOperationException("Can't update string");
   }
 
+  void updateString(byte[] bytes, int offset, int length, int repetitions) {
+    throw new UnsupportedOperationException("Can't update string");
+  }
+
   void updateBinary(BytesWritable value) {
     throw new UnsupportedOperationException("Can't update binary");
   }
 
+  void updateBinary(byte[] bytes, int offset, int length, int repetitions) {
+    throw new UnsupportedOperationException("Can't update string");
+  }
+
   void updateDecimal(HiveDecimal value) {
     throw new UnsupportedOperationException("Can't update decimal");
   }
@@ -921,10 +980,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     throw new UnsupportedOperationException("Can't update date");
   }
 
+  void updateDate(int value) {
+    throw new UnsupportedOperationException("Can't update date");
+  }
+
   void updateTimestamp(Timestamp value) {
     throw new UnsupportedOperationException("Can't update timestamp");
   }
 
+  void updateTimestamp(long value) {
+    throw new UnsupportedOperationException("Can't update timestamp");
+  }
+
   boolean isStatsExists() {
     return (count > 0 || hasNull == true);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
index 4d5f735..bb35b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
@@ -172,10 +172,12 @@ class MemoryManager {
 
   /**
    * Give the memory manager an opportunity for doing a memory check.
+   * @param rows number of rows added
    * @throws IOException
    */
-  void addedRow() throws IOException {
-    if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+  void addedRow(int rows) throws IOException {
+    rowsAddedSinceCheck += rows;
+    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
       notifyWriters();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 04b9eaf..84d627a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -1169,6 +1169,10 @@ public class RecordReaderImpl implements RecordReader {
     return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
   }
 
+  MetadataReader getMetadataReader() {
+    return metadata;
+  }
+
   private int findStripe(long rowNumber) {
     for (int i = 0; i < stripes.size(); i++) {
       StripeInformation stripe = stripes.get(i);

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
index 6094175..e0c52e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
@@ -55,6 +55,11 @@ class StringRedBlackTree extends RedBlackTree {
     return addNewKey();
   }
 
+  public int add(byte[] bytes, int offset, int length) {
+    newKey.set(bytes, offset, length);
+    return addNewKey();
+  }
+
   @Override
   protected int compareValue(int position) {
     int start = keyOffsets.get(position);

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
index 3481bb3..b365408 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
@@ -18,6 +18,17 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -264,6 +275,69 @@ public class TypeDescription {
     return maxId;
   }
 
+  private ColumnVector createColumn() {
+    switch (category) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case TIMESTAMP:
+      case DATE:
+        return new LongColumnVector();
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleColumnVector();
+      case DECIMAL:
+        return new DecimalColumnVector(precision, scale);
+      case STRING:
+      case BINARY:
+      case CHAR:
+      case VARCHAR:
+        return new BytesColumnVector();
+      case STRUCT: {
+        ColumnVector[] fieldVector = new ColumnVector[children.size()];
+        for(int i=0; i < fieldVector.length; ++i) {
+          fieldVector[i] = children.get(i).createColumn();
+        }
+        return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+                fieldVector);
+      }
+      case UNION: {
+        ColumnVector[] fieldVector = new ColumnVector[children.size()];
+        for(int i=0; i < fieldVector.length; ++i) {
+          fieldVector[i] = children.get(i).createColumn();
+        }
+        return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+            fieldVector);
+      }
+      case LIST:
+        return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+            children.get(0).createColumn());
+      case MAP:
+        return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+            children.get(0).createColumn(), children.get(1).createColumn());
+      default:
+        throw new IllegalArgumentException("Unknown type " + category);
+    }
+  }
+
+  public VectorizedRowBatch createRowBatch() {
+    VectorizedRowBatch result;
+    if (category == Category.STRUCT) {
+      result = new VectorizedRowBatch(children.size(),
+          VectorizedRowBatch.DEFAULT_SIZE);
+      for(int i=0; i < result.cols.length; ++i) {
+        result.cols[i] = children.get(i).createColumn();
+      }
+    } else {
+      result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE);
+      result.cols[0] = createColumn();
+    }
+    result.reset();
+    return result;
+  }
+
   /**
    * Get the kind of this type.
    * @return get the category for this type.

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
index 8991f2d..1873ed1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.io.IOException;
@@ -52,6 +53,12 @@ public interface Writer {
   void addRow(Object row) throws IOException;
 
   /**
+   * Add a row batch to the ORC file.
+   * @param batch the rows to add
+   */
+  void addRowBatch(VectorizedRowBatch batch) throws IOException;
+
+  /**
    * Flush all of the buffers and close the file. No methods on this writer
    * should be called afterwards.
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 5a82d20..c3916d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -26,6 +26,7 @@ import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
@@ -582,7 +593,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private abstract static class TreeWriter {
     protected final int id;
     protected final ObjectInspector inspector;
-    private final BitFieldWriter isPresent;
+    protected final BitFieldWriter isPresent;
     private final boolean isCompressed;
     protected final ColumnStatisticsImpl indexStatistics;
     protected final ColumnStatisticsImpl stripeColStatistics;
@@ -708,6 +719,73 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       }
     }
 
+    /**
+     * Handle the top level object write.
+     *
+     * This default method is used for all types except structs, which are the
+     * typical case. VectorizedRowBatch assumes the top level object is a
+     * struct, so we use the first column for all other types.
+     * @param batch the batch to write from
+     * @param offset the row to start on
+     * @param length the number of rows to write
+     * @throws IOException
+     */
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException {
+      writeBatch(batch.cols[0], offset, length);
+    }
+
+    /**
+     * Write the values from the given vector from offset for length elements.
+     * @param vector the vector to write from
+     * @param offset the first value from the vector to write
+     * @param length the number of values from the vector to write
+     * @throws IOException
+     */
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      if (vector.noNulls) {
+        indexStatistics.increment(length);
+        if (isPresent != null) {
+          for (int i = 0; i < length; ++i) {
+            isPresent.write(1);
+          }
+        }
+      } else {
+        if (vector.isRepeating) {
+          boolean isNull = vector.isNull[0];
+          if (isPresent != null) {
+            for (int i = 0; i < length; ++i) {
+              isPresent.write(isNull ? 0 : 1);
+            }
+          }
+          if (isNull) {
+            foundNulls = true;
+            indexStatistics.setNull();
+          } else {
+            indexStatistics.increment(length);
+          }
+        } else {
+          // count the number of non-null values
+          int nonNullCount = 0;
+          for(int i = 0; i < length; ++i) {
+            boolean isNull = vector.isNull[i + offset];
+            if (!isNull) {
+              nonNullCount += 1;
+            }
+            if (isPresent != null) {
+              isPresent.write(isNull ? 0 : 1);
+            }
+          }
+          indexStatistics.increment(nonNullCount);
+          if (nonNullCount != length) {
+            foundNulls = true;
+            indexStatistics.setNull();
+          }
+        }
+      }
+    }
+
     private void removeIsPresentPositions() {
       for(int i=0; i < rowIndex.getEntryCount(); ++i) {
         RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
@@ -876,12 +954,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.write(obj);
       if (obj != null) {
         boolean val = ((BooleanObjectInspector) inspector).get(obj);
-        indexStatistics.updateBoolean(val);
+        indexStatistics.updateBoolean(val, 1);
         writer.write(val ? 1 : 0);
       }
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int value = vec.vector[0] == 0 ? 0 : 1;
+          indexStatistics.updateBoolean(value != 0, length);
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int value = vec.vector[i + offset] == 0 ? 0 : 1;
+            writer.write(value);
+            indexStatistics.updateBoolean(value != 0, 1);
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -915,7 +1017,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.write(obj);
       if (obj != null) {
         byte val = ((ByteObjectInspector) inspector).get(obj);
-        indexStatistics.updateInteger(val);
+        indexStatistics.updateInteger(val, 1);
         if (createBloomFilter) {
           bloomFilter.addLong(val);
         }
@@ -924,6 +1026,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte value = (byte) vec.vector[0];
+          indexStatistics.updateInteger(value, length);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            byte value = (byte) vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateInteger(value, 1);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -994,7 +1126,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         } else {
           val = shortInspector.get(obj);
         }
-        indexStatistics.updateInteger(val);
+        indexStatistics.updateInteger(val, 1);
         if (createBloomFilter) {
           // integers are converted to longs in column statistics and during SARG evaluation
           bloomFilter.addLong(val);
@@ -1004,6 +1136,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          long value = vec.vector[0];
+          indexStatistics.updateInteger(value, length);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            long value = vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateInteger(value, 1);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1049,6 +1211,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DoubleColumnVector vec = (DoubleColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          float value = (float) vec.vector[0];
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            bloomFilter.addDouble(value);
+          }
+          for(int i=0; i < length; ++i) {
+            utils.writeFloat(stream, value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            float value = (float) vec.vector[i + offset];
+            utils.writeFloat(stream, value);
+            indexStatistics.updateDouble(value);
+            if (createBloomFilter) {
+              bloomFilter.addDouble(value);
+            }
+          }
+        }
+      }
+    }
+
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1093,6 +1286,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DoubleColumnVector vec = (DoubleColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          double value = vec.vector[0];
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            bloomFilter.addDouble(value);
+          }
+          for(int i=0; i < length; ++i) {
+            utils.writeDouble(stream, value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            double value = vec.vector[i + offset];
+            utils.writeDouble(stream, value);
+            indexStatistics.updateDouble(value);
+            if (createBloomFilter) {
+              bloomFilter.addDouble(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1107,16 +1330,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  private static class StringTreeWriter extends TreeWriter {
+  private static abstract class StringBaseTreeWriter extends TreeWriter {
     private static final int INITIAL_DICTIONARY_SIZE = 4096;
     private final OutStream stringOutput;
     private final IntegerWriter lengthOutput;
     private final IntegerWriter rowOutput;
-    private final StringRedBlackTree dictionary =
+    protected final StringRedBlackTree dictionary =
         new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
-    private final DynamicIntArray rows = new DynamicIntArray();
-    private final PositionedOutputStream directStreamOutput;
-    private final IntegerWriter directLengthOutput;
+    protected final DynamicIntArray rows = new DynamicIntArray();
+    protected final PositionedOutputStream directStreamOutput;
+    protected final IntegerWriter directLengthOutput;
     private final List<OrcProto.RowIndexEntry> savedRowIndex =
         new ArrayList<OrcProto.RowIndexEntry>();
     private final boolean buildIndex;
@@ -1124,12 +1347,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     // If the number of keys in a dictionary is greater than this fraction of
     //the total number of non-null rows, turn off dictionary encoding
     private final double dictionaryKeySizeThreshold;
-    private boolean useDictionaryEncoding = true;
+    protected boolean useDictionaryEncoding = true;
     private boolean isDirectV2 = true;
     private boolean doneDictionaryCheck;
     private final boolean strideDictionaryCheck;
 
-    StringTreeWriter(int columnId,
+    StringBaseTreeWriter(int columnId,
                      ObjectInspector inspector,
                      TypeDescription schema,
                      StreamFactory writer,
@@ -1171,7 +1394,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.write(obj);
       if (obj != null) {
         Text val = getTextValue(obj);
-        if (useDictionaryEncoding || !strideDictionaryCheck) {
+        if (useDictionaryEncoding) {
           rows.add(dictionary.add(val));
         } else {
           // write data and length
@@ -1180,7 +1403,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         }
         indexStatistics.updateString(val);
         if (createBloomFilter) {
-          bloomFilter.addBytes(val.getBytes(), val.getLength());
+          bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
         }
       }
     }
@@ -1364,10 +1587,69 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
+  private static class StringTreeWriter extends StringBaseTreeWriter {
+    StringTreeWriter(int columnId,
+                   ObjectInspector inspector,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, inspector, schema, writer, nullable);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(vec.vector[0], vec.start[0],
+                  vec.length[0]);
+              directLengthOutput.write(vec.length[0]);
+            }
+          }
+          indexStatistics.updateString(vec.vector[0], vec.start[0],
+              vec.length[0], length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]));
+            } else {
+              directStreamOutput.write(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+              directLengthOutput.write(vec.length[offset + i]);
+            }
+            indexStatistics.updateString(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i], 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Under the covers, char is written to ORC the same way as string.
    */
-  private static class CharTreeWriter extends StringTreeWriter {
+  private static class CharTreeWriter extends StringBaseTreeWriter {
+    private final int itemLength;
+    private final byte[] padding;
 
     CharTreeWriter(int columnId,
         ObjectInspector inspector,
@@ -1375,6 +1657,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         StreamFactory writer,
         boolean nullable) throws IOException {
       super(columnId, inspector, schema, writer, nullable);
+      itemLength = schema.getMaxLength();
+      padding = new byte[itemLength];
     }
 
     /**
@@ -1385,12 +1669,79 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       return (((HiveCharObjectInspector) inspector)
           .getPrimitiveWritableObject(obj)).getTextValue();
     }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte[] ptr;
+          int ptrOffset;
+          if (vec.length[0] >= itemLength) {
+            ptr = vec.vector[0];
+            ptrOffset = vec.start[0];
+          } else {
+            ptr = padding;
+            ptrOffset = 0;
+            System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+                vec.length[0]);
+            Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+          }
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(ptr, ptrOffset, itemLength);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(ptr, ptrOffset, itemLength);
+              directLengthOutput.write(itemLength);
+            }
+          }
+          indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            byte[] ptr;
+            int ptrOffset;
+            if (vec.length[offset + i] >= itemLength) {
+              ptr = vec.vector[offset + i];
+              ptrOffset = vec.start[offset + i];
+            } else {
+              // it is the wrong length, so copy it
+              ptr = padding;
+              ptrOffset = 0;
+              System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+                  ptr, 0, vec.length[offset + i]);
+              Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+            }
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+            } else {
+              directStreamOutput.write(ptr, ptrOffset, itemLength);
+              directLengthOutput.write(itemLength);
+            }
+            indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+            }
+          }
+        }
+      }
+    }
   }
 
   /**
    * Under the covers, varchar is written to ORC the same way as string.
    */
-  private static class VarcharTreeWriter extends StringTreeWriter {
+  private static class VarcharTreeWriter extends StringBaseTreeWriter {
+    private final int maxLength;
 
     VarcharTreeWriter(int columnId,
         ObjectInspector inspector,
@@ -1398,6 +1749,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         StreamFactory writer,
         boolean nullable) throws IOException {
       super(columnId, inspector, schema, writer, nullable);
+      maxLength = schema.getMaxLength();
     }
 
     /**
@@ -1408,6 +1760,55 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       return (((HiveVarcharObjectInspector) inspector)
           .getPrimitiveWritableObject(obj)).getTextValue();
     }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int itemLength = Math.min(vec.length[0], maxLength);
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(vec.vector[0], vec.start[0],
+                  itemLength);
+              directLengthOutput.write(itemLength);
+            }
+          }
+          indexStatistics.updateString(vec.vector[0], vec.start[0],
+              itemLength, length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int itemLength = Math.min(vec.length[offset + i], maxLength);
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength));
+            } else {
+              directStreamOutput.write(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength);
+              directLengthOutput.write(itemLength);
+            }
+            indexStatistics.updateString(vec.vector[offset + i],
+                vec.start[offset + i], itemLength, 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength);
+            }
+          }
+        }
+      }
+    }
   }
 
   private static class BinaryTreeWriter extends TreeWriter {
@@ -1449,12 +1850,48 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         length.write(val.getLength());
         indexStatistics.updateBinary(val);
         if (createBloomFilter) {
-          bloomFilter.addBytes(val.getBytes(), val.getLength());
+          bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
         }
       }
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          for(int i=0; i < length; ++i) {
+            stream.write(vec.vector[0], vec.start[0],
+                  vec.length[0]);
+            this.length.write(vec.length[0]);
+          }
+          indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+              vec.length[0], length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            stream.write(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+            this.length.write(vec.length[offset + i]);
+            indexStatistics.updateBinary(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i], 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+          }
+        }
+      }
+    }
+
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1472,6 +1909,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   static final int MILLIS_PER_SECOND = 1000;
+  static final int NANOS_PER_SECOND = 1000000000;
+  static final int MILLIS_PER_NANO  = 1000000;
   static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
 
   private static class TimestampTreeWriter extends TreeWriter {
@@ -1524,6 +1963,47 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          long value = vec.vector[0];
+          long valueMillis = value / MILLIS_PER_NANO;
+          indexStatistics.updateTimestamp(valueMillis);
+          if (createBloomFilter) {
+            bloomFilter.addLong(valueMillis);
+          }
+          final long secs = value / NANOS_PER_SECOND - base_timestamp;
+          final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
+          for(int i=0; i < length; ++i) {
+            seconds.write(secs);
+            nanos.write(nano);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            long value = vec.vector[i + offset];
+            long valueMillis = value / MILLIS_PER_NANO;
+            long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
+            int valueNanos = (int) (value % NANOS_PER_SECOND);
+            if (valueNanos < 0) {
+              valueNanos += NANOS_PER_SECOND;
+            }
+            seconds.write(valueSecs);
+            nanos.write(formatNanos(valueNanos));
+            indexStatistics.updateTimestamp(valueMillis);
+            if (createBloomFilter) {
+              bloomFilter.addLong(valueMillis);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1588,6 +2068,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int value = (int) vec.vector[0];
+          indexStatistics.updateDate(value);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int value = (int) vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateDate(value);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1660,6 +2170,40 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DecimalColumnVector vec = (DecimalColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          HiveDecimal value = vec.vector[0].getHiveDecimal();
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            bloomFilter.addString(value.toString());
+          }
+          for(int i=0; i < length; ++i) {
+            SerializationUtils.writeBigInteger(valueStream,
+                value.unscaledValue());
+            scaleStream.write(value.scale());
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
+            SerializationUtils.writeBigInteger(valueStream,
+                value.unscaledValue());
+            scaleStream.write(value.scale());
+            indexStatistics.updateDecimal(value);
+            if (createBloomFilter) {
+              bloomFilter.addString(value.toString());
+            }
+          }
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1685,13 +2229,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      boolean nullable) throws IOException {
       super(columnId, inspector, schema, writer, nullable);
       List<TypeDescription> children = schema.getChildren();
-      StructObjectInspector structObjectInspector =
-        (StructObjectInspector) inspector;
-      fields = structObjectInspector.getAllStructFieldRefs();
+      if (inspector != null) {
+        StructObjectInspector structObjectInspector =
+            (StructObjectInspector) inspector;
+        fields = structObjectInspector.getAllStructFieldRefs();
+      } else {
+        fields = null;
+      }
       childrenWriters = new TreeWriter[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
-        ObjectInspector childOI = i < fields.size() ?
-            fields.get(i).getFieldObjectInspector() : null;
+        ObjectInspector childOI;
+        if (fields != null && i < fields.size()) {
+          childOI = fields.get(i).getFieldObjectInspector();
+        } else {
+          childOI = null;
+        }
         childrenWriters[i] = createTreeWriter(
           childOI, children.get(i), writer,
           true);
@@ -1713,6 +2265,60 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException {
+      // update the statistics for the root column
+      indexStatistics.increment(length);
+      // I'm assuming that the root column isn't nullable so that I don't need
+      // to update isPresent.
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+      }
+    }
+
+    private static void writeFields(StructColumnVector vector,
+                                    TreeWriter[] childrenWriters,
+                                    int offset, int length) throws IOException {
+      for(int field=0; field < childrenWriters.length; ++field) {
+        childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+      }
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      StructColumnVector vec = (StructColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          writeFields(vec, childrenWriters, offset, length);
+        }
+      } else if (vector.noNulls) {
+        writeFields(vec, childrenWriters, offset, length);
+      } else {
+        // write the records in runs
+        int currentRun = 0;
+        boolean started = false;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            if (!started) {
+              started = true;
+              currentRun = i;
+            }
+          } else if (started) {
+            started = false;
+            writeFields(vec, childrenWriters, offset + currentRun,
+                i - currentRun);
+          }
+        }
+        if (started) {
+          writeFields(vec, childrenWriters, offset + currentRun,
+              length - currentRun);
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1734,8 +2340,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                    boolean nullable) throws IOException {
       super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
-      ObjectInspector childOI =
-        ((ListObjectInspector) inspector).getListElementObjectInspector();
+      ObjectInspector childOI = null;
+      if (inspector != null) {
+        childOI =
+            ((ListObjectInspector) inspector).getListElementObjectInspector();
+      }
       childrenWriters = new TreeWriter[1];
       childrenWriters[0] =
         createTreeWriter(childOI, schema.getChildren().get(0), writer, true);
@@ -1771,6 +2380,52 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      ListColumnVector vec = (ListColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int childOffset = (int) vec.offsets[0];
+          int childLength = (int) vec.lengths[0];
+          for(int i=0; i < length; ++i) {
+            lengths.write(childLength);
+            childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(childLength);
+          }
+        }
+      } else {
+        // write the elements in runs
+        int currentOffset = 0;
+        int currentLength = 0;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            int nextLength = (int) vec.lengths[offset + i];
+            int nextOffset = (int) vec.offsets[offset + i];
+            lengths.write(nextLength);
+            if (currentLength == 0) {
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else if (currentOffset + currentLength != nextOffset) {
+              childrenWriters[0].writeBatch(vec.child, currentOffset,
+                  currentLength);
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else {
+              currentLength += nextLength;
+            }
+          }
+        }
+        if (currentLength != 0) {
+          childrenWriters[0].writeBatch(vec.child, currentOffset,
+              currentLength);
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1799,15 +2454,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                   boolean nullable) throws IOException {
       super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
-      MapObjectInspector insp = (MapObjectInspector) inspector;
       childrenWriters = new TreeWriter[2];
       List<TypeDescription> children = schema.getChildren();
+      ObjectInspector keyInsp = null;
+      ObjectInspector valueInsp = null;
+      if (inspector != null) {
+        MapObjectInspector insp = (MapObjectInspector) inspector;
+        keyInsp = insp.getMapKeyObjectInspector();
+        valueInsp = insp.getMapValueObjectInspector();
+      }
       childrenWriters[0] =
-        createTreeWriter(insp.getMapKeyObjectInspector(), children.get(0),
-                         writer, true);
+        createTreeWriter(keyInsp, children.get(0), writer, true);
       childrenWriters[1] =
-        createTreeWriter(insp.getMapValueObjectInspector(), children.get(1),
-                         writer, true);
+        createTreeWriter(valueInsp, children.get(1), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
@@ -1843,6 +2502,57 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      MapColumnVector vec = (MapColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int childOffset = (int) vec.offsets[0];
+          int childLength = (int) vec.lengths[0];
+          for(int i=0; i < length; ++i) {
+            lengths.write(childLength);
+            childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
+            childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(childLength);
+          }
+        }
+      } else {
+        // write the elements in runs
+        int currentOffset = 0;
+        int currentLength = 0;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            int nextLength = (int) vec.lengths[offset + i];
+            int nextOffset = (int) vec.offsets[offset + i];
+            lengths.write(nextLength);
+            if (currentLength == 0) {
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else if (currentOffset + currentLength != nextOffset) {
+              childrenWriters[0].writeBatch(vec.keys, currentOffset,
+                  currentLength);
+              childrenWriters[1].writeBatch(vec.values, currentOffset,
+                  currentLength);
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else {
+              currentLength += nextLength;
+            }
+          }
+        }
+        if (currentLength != 0) {
+          childrenWriters[0].writeBatch(vec.keys, currentOffset,
+              currentLength);
+          childrenWriters[1].writeBatch(vec.values, currentOffset,
+              currentLength);
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -1869,13 +2579,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                   StreamFactory writer,
                   boolean nullable) throws IOException {
       super(columnId, inspector, schema, writer, nullable);
-      UnionObjectInspector insp = (UnionObjectInspector) inspector;
-      List<ObjectInspector> choices = insp.getObjectInspectors();
+      List<ObjectInspector> choices = null;
+      if (inspector != null) {
+        UnionObjectInspector insp = (UnionObjectInspector) inspector;
+        choices = insp.getObjectInspectors();
+      }
       List<TypeDescription> children = schema.getChildren();
       childrenWriters = new TreeWriter[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
-        childrenWriters[i] = createTreeWriter(choices.get(i),
-                                              children.get(i), writer, true);
+        childrenWriters[i] =
+            createTreeWriter(choices != null ? choices.get(i) : null,
+                             children.get(i), writer, true);
       }
       tags =
         new RunLengthByteWriter(writer.createStream(columnId,
@@ -1898,6 +2612,54 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      UnionColumnVector vec = (UnionColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte tag = (byte) vec.tags[0];
+          for(int i=0; i < length; ++i) {
+            tags.write(tag);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(tag);
+          }
+          childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+        }
+      } else {
+        // write the records in runs of the same tag
+        byte prevTag = 0;
+        int currentRun = 0;
+        boolean started = false;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            byte tag = (byte) vec.tags[offset + i];
+            tags.write(tag);
+            if (!started) {
+              started = true;
+              currentRun = i;
+              prevTag = tag;
+            } else if (tag != prevTag) {
+              childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                  offset + currentRun, i - currentRun);
+              currentRun = i;
+              prevTag = tag;
+            }
+          } else if (started) {
+            started = false;
+            childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                offset + currentRun, i - currentRun);
+          }
+        }
+        if (started) {
+          childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+              offset + currentRun, length - currentRun);
+        }
+      }
+    }
+
+    @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
@@ -2365,7 +3127,31 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         createRowIndexEntry();
       }
     }
-    memoryManager.addedRow();
+    memoryManager.addedRow(1);
+  }
+
+  @Override
+  public void addRowBatch(VectorizedRowBatch batch) throws IOException {
+    if (buildIndex) {
+      // Batch the writes up to the rowIndexStride so that we can get the
+      // right size indexes.
+      int posn = 0;
+      while (posn < batch.size) {
+        int chunkSize = Math.min(batch.size - posn,
+            rowIndexStride - rowsInIndex);
+        treeWriter.writeRootBatch(batch, posn, chunkSize);
+        posn += chunkSize;
+        rowsInIndex += chunkSize;
+        rowsInStripe += chunkSize;
+        if (rowsInIndex >= rowIndexStride) {
+          createRowIndexEntry();
+        }
+      }
+    } else {
+      rowsInStripe += batch.size;
+      treeWriter.writeRootBatch(batch, 0, batch.size);
+    }
+    memoryManager.addedRow(batch.size);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
index f6111e8..a51177e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
@@ -52,17 +52,16 @@ public class TestColumnStatistics {
 
     ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
     ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
-    stats1.updateInteger(10);
-    stats1.updateInteger(10);
-    stats2.updateInteger(1);
-    stats2.updateInteger(1000);
+    stats1.updateInteger(10, 2);
+    stats2.updateInteger(1, 1);
+    stats2.updateInteger(1000, 1);
     stats1.merge(stats2);
     IntegerColumnStatistics typed = (IntegerColumnStatistics) stats1;
     assertEquals(1, typed.getMinimum());
     assertEquals(1000, typed.getMaximum());
     stats1.reset();
-    stats1.updateInteger(-10);
-    stats1.updateInteger(10000);
+    stats1.updateInteger(-10, 1);
+    stats1.updateInteger(10000, 1);
     stats1.merge(stats2);
     assertEquals(-10, typed.getMinimum());
     assertEquals(10000, typed.getMaximum());
@@ -101,11 +100,14 @@ public class TestColumnStatistics {
     stats1.updateString(new Text("david"));
     stats1.updateString(new Text("charles"));
     stats2.updateString(new Text("anne"));
-    stats2.updateString(new Text("erin"));
+    byte[] erin = new byte[]{0, 1, 2, 3, 4, 5, 101, 114, 105, 110};
+    stats2.updateString(erin, 6, 4, 5);
+    assertEquals(24, ((StringColumnStatistics)stats2).getSum());
     stats1.merge(stats2);
     StringColumnStatistics typed = (StringColumnStatistics) stats1;
     assertEquals("anne", typed.getMinimum());
     assertEquals("erin", typed.getMaximum());
+    assertEquals(39, typed.getSum());
     stats1.reset();
     stats1.updateString(new Text("aaa"));
     stats1.updateString(new Text("zzz"));

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
index fb6be16..19aaff3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
@@ -122,7 +122,7 @@ public class TestMemoryManager {
     }
     // add enough rows to get the memory manager to check the limits
     for(int i=0; i < 10000; ++i) {
-      mgr.addedRow();
+      mgr.addedRow(1);
     }
     for(int call=0; call < calls.length; ++call) {
       verify(calls[call], times(2))

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index e78f7aa..146f5b1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -1818,8 +1818,9 @@ public class TestOrcFile {
     }
 
     @Override
-    void addedRow() throws IOException {
-      if (++rows % 100 == 0) {
+    void addedRow(int count) throws IOException {
+      rows += count;
+      if (rows % 100 == 0) {
         callback.checkMemory(rate);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 797bbfb..15ee24c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -815,8 +815,10 @@ public class TestOrcRawRecordMerger {
     MemoryManager mgr = new MemoryManager(conf){
       int rowsAddedSinceCheck = 0;
 
-      synchronized void addedRow() throws IOException {
-        if (++rowsAddedSinceCheck >= 2) {
+      @Override
+      synchronized void addedRow(int rows) throws IOException {
+        rowsAddedSinceCheck += rows;
+        if (rowsAddedSinceCheck >= 2) {
           notifyWriters();
           rowsAddedSinceCheck = 0;
         }
@@ -912,8 +914,10 @@ public class TestOrcRawRecordMerger {
     MemoryManager mgr = new MemoryManager(conf){
       int rowsAddedSinceCheck = 0;
 
-      synchronized void addedRow() throws IOException {
-        if (++rowsAddedSinceCheck >= 2) {
+      @Override
+      synchronized void addedRow(int rows) throws IOException {
+        rowsAddedSinceCheck += rows;
+        if (rowsAddedSinceCheck >= 2) {
           notifyWriters();
           rowsAddedSinceCheck = 0;
         }


[2/3] hive git commit: HIVE-12054. Create vectorized ORC write method. (omalley reviewed by prasanthj)

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
new file mode 100644
index 0000000..134f78c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
@@ -0,0 +1,2744 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.HiveTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+
+/**
+ * Tests for the vectorized reader and writer for ORC files.
+ */
+public class TestVectorOrcFile {
+
+  public static class InnerStruct {
+    int int1;
+    Text string1 = new Text();
+    InnerStruct(int int1, String string1) {
+      this.int1 = int1;
+      this.string1.set(string1);
+    }
+
+    public String toString() {
+      return "{" + int1 + ", " + string1 + "}";
+    }
+  }
+
+  public static class MiddleStruct {
+    List<InnerStruct> list = new ArrayList<InnerStruct>();
+
+    MiddleStruct(InnerStruct... items) {
+      list.clear();
+      list.addAll(Arrays.asList(items));
+    }
+  }
+
+  public static class BigRow {
+    Boolean boolean1;
+    Byte byte1;
+    Short short1;
+    Integer int1;
+    Long long1;
+    Float float1;
+    Double double1;
+    BytesWritable bytes1;
+    Text string1;
+    MiddleStruct middle;
+    List<InnerStruct> list = new ArrayList<InnerStruct>();
+    Map<Text, InnerStruct> map = new HashMap<Text, InnerStruct>();
+
+    BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1, Float f1,
+           Double d1,
+           BytesWritable b3, String s2, MiddleStruct m1,
+           List<InnerStruct> l2, Map<String, InnerStruct> m2) {
+      this.boolean1 = b1;
+      this.byte1 = b2;
+      this.short1 = s1;
+      this.int1 = i1;
+      this.long1 = l1;
+      this.float1 = f1;
+      this.double1 = d1;
+      this.bytes1 = b3;
+      if (s2 == null) {
+        this.string1 = null;
+      } else {
+        this.string1 = new Text(s2);
+      }
+      this.middle = m1;
+      this.list = l2;
+      if (m2 != null) {
+        this.map = new HashMap<Text, InnerStruct>();
+        for (Map.Entry<String, InnerStruct> item : m2.entrySet()) {
+          this.map.put(new Text(item.getKey()), item.getValue());
+        }
+      } else {
+        this.map = null;
+      }
+    }
+  }
+
+  private static InnerStruct inner(int i, String s) {
+    return new InnerStruct(i, s);
+  }
+
+  private static Map<String, InnerStruct> map(InnerStruct... items)  {
+    Map<String, InnerStruct> result = new HashMap<String, InnerStruct>();
+    for(InnerStruct i: items) {
+      result.put(i.string1.toString(), i);
+    }
+    return result;
+  }
+
+  private static List<InnerStruct> list(InnerStruct... items) {
+    List<InnerStruct> result = new ArrayList<InnerStruct>();
+    result.addAll(Arrays.asList(items));
+    return result;
+  }
+
+  private static BytesWritable bytes(int... items) {
+    BytesWritable result = new BytesWritable();
+    result.setSize(items.length);
+    for(int i=0; i < items.length; ++i) {
+      result.getBytes()[i] = (byte) items[i];
+    }
+    return result;
+  }
+
+  private static byte[] bytesArray(int... items) {
+    byte[] result = new byte[items.length];
+    for(int i=0; i < items.length; ++i) {
+      result[i] = (byte) items[i];
+    }
+    return result;
+  }
+
+  private static ByteBuffer byteBuf(int... items) {
+    ByteBuffer result = ByteBuffer.allocate(items.length);
+    for(int item: items) {
+      result.put((byte) item);
+    }
+    result.flip();
+    return result;
+  }
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem () throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestVectorOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Test
+  public void testReadFormat_0_11() throws Exception {
+    Path oldFilePath =
+        new Path(HiveTestUtils.getFileFromClasspath("orc-file-11-format.orc"));
+    Reader reader = OrcFile.createReader(oldFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+
+    int stripeCount = 0;
+    int rowCount = 0;
+    long currentOffset = -1;
+    for(StripeInformation stripe : reader.getStripes()) {
+      stripeCount += 1;
+      rowCount += stripe.getNumberOfRows();
+      if (currentOffset < 0) {
+        currentOffset = stripe.getOffset() + stripe.getIndexLength()
+            + stripe.getDataLength() + stripe.getFooterLength();
+      } else {
+        assertEquals(currentOffset, stripe.getOffset());
+        currentOffset += stripe.getIndexLength() + stripe.getDataLength()
+            + stripe.getFooterLength();
+      }
+    }
+    assertEquals(reader.getNumberOfRows(), rowCount);
+    assertEquals(2, stripeCount);
+
+    // check the stats
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(7500, stats[1].getNumberOfValues());
+    assertEquals(3750, ((BooleanColumnStatistics) stats[1]).getFalseCount());
+    assertEquals(3750, ((BooleanColumnStatistics) stats[1]).getTrueCount());
+    assertEquals("count: 7500 hasNull: true true: 3750", stats[1].toString());
+
+    assertEquals(2048, ((IntegerColumnStatistics) stats[3]).getMaximum());
+    assertEquals(1024, ((IntegerColumnStatistics) stats[3]).getMinimum());
+    assertEquals(true, ((IntegerColumnStatistics) stats[3]).isSumDefined());
+    assertEquals(11520000, ((IntegerColumnStatistics) stats[3]).getSum());
+    assertEquals("count: 7500 hasNull: true min: 1024 max: 2048 sum: 11520000",
+        stats[3].toString());
+
+    assertEquals(Long.MAX_VALUE,
+        ((IntegerColumnStatistics) stats[5]).getMaximum());
+    assertEquals(Long.MAX_VALUE,
+        ((IntegerColumnStatistics) stats[5]).getMinimum());
+    assertEquals(false, ((IntegerColumnStatistics) stats[5]).isSumDefined());
+    assertEquals(
+        "count: 7500 hasNull: true min: 9223372036854775807 max: 9223372036854775807",
+        stats[5].toString());
+
+    assertEquals(-15.0, ((DoubleColumnStatistics) stats[7]).getMinimum());
+    assertEquals(-5.0, ((DoubleColumnStatistics) stats[7]).getMaximum());
+    assertEquals(-75000.0, ((DoubleColumnStatistics) stats[7]).getSum(),
+        0.00001);
+    assertEquals("count: 7500 hasNull: true min: -15.0 max: -5.0 sum: -75000.0",
+        stats[7].toString());
+
+    assertEquals("count: 7500 hasNull: true min: bye max: hi sum: 0", stats[9].toString());
+
+    // check the inspectors
+    StructObjectInspector readerInspector = (StructObjectInspector) reader
+        .getObjectInspector();
+    assertEquals(ObjectInspector.Category.STRUCT, readerInspector.getCategory());
+    assertEquals("struct<boolean1:boolean,byte1:tinyint,short1:smallint,"
+        + "int1:int,long1:bigint,float1:float,double1:double,bytes1:"
+        + "binary,string1:string,middle:struct<list:array<struct<int1:int,"
+        + "string1:string>>>,list:array<struct<int1:int,string1:string>>,"
+        + "map:map<string,struct<int1:int,string1:string>>,ts:timestamp,"
+        + "decimal1:decimal(38,18)>", readerInspector.getTypeName());
+    List<? extends StructField> fields = readerInspector
+        .getAllStructFieldRefs();
+    BooleanObjectInspector bo = (BooleanObjectInspector) readerInspector
+        .getStructFieldRef("boolean1").getFieldObjectInspector();
+    ByteObjectInspector by = (ByteObjectInspector) readerInspector
+        .getStructFieldRef("byte1").getFieldObjectInspector();
+    ShortObjectInspector sh = (ShortObjectInspector) readerInspector
+        .getStructFieldRef("short1").getFieldObjectInspector();
+    IntObjectInspector in = (IntObjectInspector) readerInspector
+        .getStructFieldRef("int1").getFieldObjectInspector();
+    LongObjectInspector lo = (LongObjectInspector) readerInspector
+        .getStructFieldRef("long1").getFieldObjectInspector();
+    FloatObjectInspector fl = (FloatObjectInspector) readerInspector
+        .getStructFieldRef("float1").getFieldObjectInspector();
+    DoubleObjectInspector dbl = (DoubleObjectInspector) readerInspector
+        .getStructFieldRef("double1").getFieldObjectInspector();
+    BinaryObjectInspector bi = (BinaryObjectInspector) readerInspector
+        .getStructFieldRef("bytes1").getFieldObjectInspector();
+    StringObjectInspector st = (StringObjectInspector) readerInspector
+        .getStructFieldRef("string1").getFieldObjectInspector();
+    StructObjectInspector mid = (StructObjectInspector) readerInspector
+        .getStructFieldRef("middle").getFieldObjectInspector();
+    List<? extends StructField> midFields = mid.getAllStructFieldRefs();
+    ListObjectInspector midli = (ListObjectInspector) midFields.get(0)
+        .getFieldObjectInspector();
+    StructObjectInspector inner = (StructObjectInspector) midli
+        .getListElementObjectInspector();
+    List<? extends StructField> inFields = inner.getAllStructFieldRefs();
+    ListObjectInspector li = (ListObjectInspector) readerInspector
+        .getStructFieldRef("list").getFieldObjectInspector();
+    MapObjectInspector ma = (MapObjectInspector) readerInspector
+        .getStructFieldRef("map").getFieldObjectInspector();
+    TimestampObjectInspector tso = (TimestampObjectInspector) readerInspector
+        .getStructFieldRef("ts").getFieldObjectInspector();
+    HiveDecimalObjectInspector dco = (HiveDecimalObjectInspector) readerInspector
+        .getStructFieldRef("decimal1").getFieldObjectInspector();
+    StringObjectInspector mk = (StringObjectInspector) ma
+        .getMapKeyObjectInspector();
+    RecordReader rows = reader.rows();
+    Object row = rows.next(null);
+    assertNotNull(row);
+    // check the contents of the first row
+    assertEquals(false,
+        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals(1,
+        by.get(readerInspector.getStructFieldData(row, fields.get(1))));
+    assertEquals(1024,
+        sh.get(readerInspector.getStructFieldData(row, fields.get(2))));
+    assertEquals(65536,
+        in.get(readerInspector.getStructFieldData(row, fields.get(3))));
+    assertEquals(Long.MAX_VALUE,
+        lo.get(readerInspector.getStructFieldData(row, fields.get(4))));
+    assertEquals(1.0,
+        fl.get(readerInspector.getStructFieldData(row, fields.get(5))), 0.00001);
+    assertEquals(-15.0,
+        dbl.get(readerInspector.getStructFieldData(row, fields.get(6))),
+        0.00001);
+    assertEquals(bytes(0, 1, 2, 3, 4),
+        bi.getPrimitiveWritableObject(readerInspector.getStructFieldData(row,
+            fields.get(7))));
+    assertEquals("hi", st.getPrimitiveJavaObject(readerInspector
+        .getStructFieldData(row, fields.get(8))));
+    List<?> midRow = midli.getList(mid.getStructFieldData(
+        readerInspector.getStructFieldData(row, fields.get(9)),
+        midFields.get(0)));
+    assertNotNull(midRow);
+    assertEquals(2, midRow.size());
+    assertEquals(1,
+        in.get(inner.getStructFieldData(midRow.get(0), inFields.get(0))));
+    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        midRow.get(0), inFields.get(1))));
+    assertEquals(2,
+        in.get(inner.getStructFieldData(midRow.get(1), inFields.get(0))));
+    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        midRow.get(1), inFields.get(1))));
+    List<?> list = li.getList(readerInspector.getStructFieldData(row,
+        fields.get(10)));
+    assertEquals(2, list.size());
+    assertEquals(3,
+        in.get(inner.getStructFieldData(list.get(0), inFields.get(0))));
+    assertEquals("good", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        list.get(0), inFields.get(1))));
+    assertEquals(4,
+        in.get(inner.getStructFieldData(list.get(1), inFields.get(0))));
+    assertEquals("bad", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        list.get(1), inFields.get(1))));
+    Map<?, ?> map = ma.getMap(readerInspector.getStructFieldData(row,
+        fields.get(11)));
+    assertEquals(0, map.size());
+    assertEquals(Timestamp.valueOf("2000-03-12 15:00:00"),
+        tso.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
+            fields.get(12))));
+    assertEquals(HiveDecimal.create("12345678.6547456"),
+        dco.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
+            fields.get(13))));
+
+    // check the contents of second row
+    assertEquals(true, rows.hasNext());
+    rows.seekToRow(7499);
+    row = rows.next(null);
+    assertEquals(true,
+        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals(100,
+        by.get(readerInspector.getStructFieldData(row, fields.get(1))));
+    assertEquals(2048,
+        sh.get(readerInspector.getStructFieldData(row, fields.get(2))));
+    assertEquals(65536,
+        in.get(readerInspector.getStructFieldData(row, fields.get(3))));
+    assertEquals(Long.MAX_VALUE,
+        lo.get(readerInspector.getStructFieldData(row, fields.get(4))));
+    assertEquals(2.0,
+        fl.get(readerInspector.getStructFieldData(row, fields.get(5))), 0.00001);
+    assertEquals(-5.0,
+        dbl.get(readerInspector.getStructFieldData(row, fields.get(6))),
+        0.00001);
+    assertEquals(bytes(), bi.getPrimitiveWritableObject(readerInspector
+        .getStructFieldData(row, fields.get(7))));
+    assertEquals("bye", st.getPrimitiveJavaObject(readerInspector
+        .getStructFieldData(row, fields.get(8))));
+    midRow = midli.getList(mid.getStructFieldData(
+        readerInspector.getStructFieldData(row, fields.get(9)),
+        midFields.get(0)));
+    assertNotNull(midRow);
+    assertEquals(2, midRow.size());
+    assertEquals(1,
+        in.get(inner.getStructFieldData(midRow.get(0), inFields.get(0))));
+    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        midRow.get(0), inFields.get(1))));
+    assertEquals(2,
+        in.get(inner.getStructFieldData(midRow.get(1), inFields.get(0))));
+    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        midRow.get(1), inFields.get(1))));
+    list = li.getList(readerInspector.getStructFieldData(row, fields.get(10)));
+    assertEquals(3, list.size());
+    assertEquals(100000000,
+        in.get(inner.getStructFieldData(list.get(0), inFields.get(0))));
+    assertEquals("cat", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        list.get(0), inFields.get(1))));
+    assertEquals(-100000,
+        in.get(inner.getStructFieldData(list.get(1), inFields.get(0))));
+    assertEquals("in", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        list.get(1), inFields.get(1))));
+    assertEquals(1234,
+        in.get(inner.getStructFieldData(list.get(2), inFields.get(0))));
+    assertEquals("hat", st.getPrimitiveJavaObject(inner.getStructFieldData(
+        list.get(2), inFields.get(1))));
+    map = ma.getMap(readerInspector.getStructFieldData(row, fields.get(11)));
+    assertEquals(2, map.size());
+    boolean[] found = new boolean[2];
+    for(Object key : map.keySet()) {
+      String str = mk.getPrimitiveJavaObject(key);
+      if (str.equals("chani")) {
+        assertEquals(false, found[0]);
+        assertEquals(5,
+            in.get(inner.getStructFieldData(map.get(key), inFields.get(0))));
+        assertEquals(str, st.getPrimitiveJavaObject(inner.getStructFieldData(
+            map.get(key), inFields.get(1))));
+        found[0] = true;
+      } else if (str.equals("mauddib")) {
+        assertEquals(false, found[1]);
+        assertEquals(1,
+            in.get(inner.getStructFieldData(map.get(key), inFields.get(0))));
+        assertEquals(str, st.getPrimitiveJavaObject(inner.getStructFieldData(
+            map.get(key), inFields.get(1))));
+        found[1] = true;
+      } else {
+        throw new IllegalArgumentException("Unknown key " + str);
+      }
+    }
+    assertEquals(true, found[0]);
+    assertEquals(true, found[1]);
+    assertEquals(Timestamp.valueOf("2000-03-12 15:00:01"),
+        tso.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
+            fields.get(12))));
+    assertEquals(HiveDecimal.create("12345678.6547457"),
+        dco.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
+            fields.get(13))));
+
+    // handle the close up
+    assertEquals(false, rows.hasNext());
+    rows.close();
+  }
+
+  @Test
+  public void testTimestamp() throws Exception {
+    ObjectInspector inspector;
+    synchronized (TestVectorOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector(Timestamp.class,
+          ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    TypeDescription schema = TypeDescription.createTimestamp();
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+            .bufferSize(10000).version(Version.V_0_11));
+    List<Timestamp> tslist = Lists.newArrayList();
+    tslist.add(Timestamp.valueOf("2037-01-01 00:00:00.000999"));
+    tslist.add(Timestamp.valueOf("2003-01-01 00:00:00.000000222"));
+    tslist.add(Timestamp.valueOf("1999-01-01 00:00:00.999999999"));
+    tslist.add(Timestamp.valueOf("1995-01-01 00:00:00.688888888"));
+    tslist.add(Timestamp.valueOf("2002-01-01 00:00:00.1"));
+    tslist.add(Timestamp.valueOf("2010-03-02 00:00:00.000009001"));
+    tslist.add(Timestamp.valueOf("2005-01-01 00:00:00.000002229"));
+    tslist.add(Timestamp.valueOf("2006-01-01 00:00:00.900203003"));
+    tslist.add(Timestamp.valueOf("2003-01-01 00:00:00.800000007"));
+    tslist.add(Timestamp.valueOf("1996-08-02 00:00:00.723100809"));
+    tslist.add(Timestamp.valueOf("1998-11-02 00:00:00.857340643"));
+    tslist.add(Timestamp.valueOf("2008-10-02 00:00:00"));
+
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    LongColumnVector vec = new LongColumnVector(1024);
+    batch.cols[0] = vec;
+    batch.reset();
+    batch.size = tslist.size();
+    for (int i=0; i < tslist.size(); ++i) {
+      Timestamp ts = tslist.get(i);
+      vec.vector[i] = TimestampUtils.getTimeNanoSec(ts);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows(null);
+    int idx = 0;
+    while (rows.hasNext()) {
+      Object row = rows.next(null);
+      assertEquals(tslist.get(idx++).getNanos(), ((TimestampWritable) row).getNanos());
+    }
+    assertEquals(tslist.size(), rows.getRowNumber());
+    assertEquals(0, writer.getSchema().getMaximumId());
+    boolean[] expected = new boolean[] {false};
+    boolean[] included = OrcUtils.includeColumns("", writer.getSchema());
+    assertEquals(true, Arrays.equals(expected, included));
+  }
+
+  @Test
+  public void testStringAndBinaryStatistics() throws Exception {
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("bytes1", TypeDescription.createBinary())
+        .addField("string1", TypeDescription.createString());
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(100000)
+                                         .bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 4;
+    BytesColumnVector field1 = (BytesColumnVector) batch.cols[0];
+    BytesColumnVector field2 = (BytesColumnVector) batch.cols[1];
+    field1.setVal(0, bytesArray(0, 1, 2, 3, 4));
+    field1.setVal(1, bytesArray(0, 1, 2, 3));
+    field1.setVal(2, bytesArray(0, 1, 2, 3, 4, 5));
+    field1.noNulls = false;
+    field1.isNull[3] = true;
+    field2.setVal(0, "foo".getBytes());
+    field2.setVal(1, "bar".getBytes());
+    field2.noNulls = false;
+    field2.isNull[2] = true;
+    field2.setVal(3, "hi".getBytes());
+    writer.addRowBatch(batch);
+    writer.close();
+    schema = writer.getSchema();
+    assertEquals(2, schema.getMaximumId());
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+
+    boolean[] expected = new boolean[] {false, false, true};
+    boolean[] included = OrcUtils.includeColumns("string1", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    expected = new boolean[] {false, false, false};
+    included = OrcUtils.includeColumns("", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    expected = new boolean[] {false, false, false};
+    included = OrcUtils.includeColumns(null, schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    // check the stats
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(4, stats[0].getNumberOfValues());
+    assertEquals("count: 4 hasNull: false", stats[0].toString());
+
+    assertEquals(3, stats[1].getNumberOfValues());
+    assertEquals(15, ((BinaryColumnStatistics) stats[1]).getSum());
+    assertEquals("count: 3 hasNull: true sum: 15", stats[1].toString());
+
+    assertEquals(3, stats[2].getNumberOfValues());
+    assertEquals("bar", ((StringColumnStatistics) stats[2]).getMinimum());
+    assertEquals("hi", ((StringColumnStatistics) stats[2]).getMaximum());
+    assertEquals(8, ((StringColumnStatistics) stats[2]).getSum());
+    assertEquals("count: 3 hasNull: true min: bar max: hi sum: 8",
+        stats[2].toString());
+
+    // check the inspectors
+    StructObjectInspector readerInspector =
+        (StructObjectInspector) reader.getObjectInspector();
+    assertEquals(ObjectInspector.Category.STRUCT,
+        readerInspector.getCategory());
+    assertEquals("struct<bytes1:binary,string1:string>",
+        readerInspector.getTypeName());
+    List<? extends StructField> fields =
+        readerInspector.getAllStructFieldRefs();
+    BinaryObjectInspector bi = (BinaryObjectInspector) readerInspector.
+        getStructFieldRef("bytes1").getFieldObjectInspector();
+    StringObjectInspector st = (StringObjectInspector) readerInspector.
+        getStructFieldRef("string1").getFieldObjectInspector();
+    RecordReader rows = reader.rows();
+    Object row = rows.next(null);
+    assertNotNull(row);
+    // check the contents of the first row
+    assertEquals(bytes(0,1,2,3,4), bi.getPrimitiveWritableObject(
+        readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals("foo", st.getPrimitiveJavaObject(readerInspector.
+        getStructFieldData(row, fields.get(1))));
+
+    // check the contents of second row
+    assertEquals(true, rows.hasNext());
+    row = rows.next(row);
+    assertEquals(bytes(0,1,2,3), bi.getPrimitiveWritableObject(
+        readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals("bar", st.getPrimitiveJavaObject(readerInspector.
+        getStructFieldData(row, fields.get(1))));
+
+    // check the contents of third row
+    assertEquals(true, rows.hasNext());
+    row = rows.next(row);
+    assertEquals(bytes(0,1,2,3,4,5), bi.getPrimitiveWritableObject(
+        readerInspector.getStructFieldData(row, fields.get(0))));
+    assertNull(st.getPrimitiveJavaObject(readerInspector.
+        getStructFieldData(row, fields.get(1))));
+
+    // check the contents of fourth row
+    assertEquals(true, rows.hasNext());
+    row = rows.next(row);
+    assertNull(bi.getPrimitiveWritableObject(
+        readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals("hi", st.getPrimitiveJavaObject(readerInspector.
+        getStructFieldData(row, fields.get(1))));
+
+    // handle the close up
+    assertEquals(false, rows.hasNext());
+    rows.close();
+  }
+
+
+  @Test
+  public void testStripeLevelStats() throws Exception {
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("string1", TypeDescription.createString());
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 1000;
+    LongColumnVector field1 = (LongColumnVector) batch.cols[0];
+    BytesColumnVector field2 = (BytesColumnVector) batch.cols[1];
+    field1.isRepeating = true;
+    field2.isRepeating = true;
+    for (int b = 0; b < 11; b++) {
+      if (b >= 5) {
+        if (b >= 10) {
+          field1.vector[0] = 3;
+          field2.setVal(0, "three".getBytes());
+        } else {
+          field1.vector[0] = 2;
+          field2.setVal(0, "two".getBytes());
+        }
+      } else {
+        field1.vector[0] = 1;
+        field2.setVal(0, "one".getBytes());
+      }
+      writer.addRowBatch(batch);
+    }
+
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+
+    schema = writer.getSchema();
+    assertEquals(2, schema.getMaximumId());
+    boolean[] expected = new boolean[] {false, true, false};
+    boolean[] included = OrcUtils.includeColumns("int1", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    List<StripeStatistics> stats = reader.getStripeStatistics();
+    int numStripes = stats.size();
+    assertEquals(3, numStripes);
+    StripeStatistics ss1 = stats.get(0);
+    StripeStatistics ss2 = stats.get(1);
+    StripeStatistics ss3 = stats.get(2);
+
+    assertEquals(5000, ss1.getColumnStatistics()[0].getNumberOfValues());
+    assertEquals(5000, ss2.getColumnStatistics()[0].getNumberOfValues());
+    assertEquals(1000, ss3.getColumnStatistics()[0].getNumberOfValues());
+
+    assertEquals(5000, (ss1.getColumnStatistics()[1]).getNumberOfValues());
+    assertEquals(5000, (ss2.getColumnStatistics()[1]).getNumberOfValues());
+    assertEquals(1000, (ss3.getColumnStatistics()[1]).getNumberOfValues());
+    assertEquals(1, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getMinimum());
+    assertEquals(2, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getMinimum());
+    assertEquals(3, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getMinimum());
+    assertEquals(1, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getMaximum());
+    assertEquals(2, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getMaximum());
+    assertEquals(3, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getMaximum());
+    assertEquals(5000, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getSum());
+    assertEquals(10000, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getSum());
+    assertEquals(3000, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getSum());
+
+    assertEquals(5000, (ss1.getColumnStatistics()[2]).getNumberOfValues());
+    assertEquals(5000, (ss2.getColumnStatistics()[2]).getNumberOfValues());
+    assertEquals(1000, (ss3.getColumnStatistics()[2]).getNumberOfValues());
+    assertEquals("one", ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getMinimum());
+    assertEquals("two", ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getMinimum());
+    assertEquals("three", ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getMinimum());
+    assertEquals("one", ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getMaximum());
+    assertEquals("two", ((StringColumnStatistics) ss2.getColumnStatistics()[2]).getMaximum());
+    assertEquals("three", ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getMaximum());
+    assertEquals(15000, ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getSum());
+    assertEquals(15000, ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getSum());
+    assertEquals(5000, ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getSum());
+
+    RecordReaderImpl recordReader = (RecordReaderImpl) reader.rows();
+    OrcProto.RowIndex[] index = recordReader.readRowIndex(0, null, null).getRowGroupIndex();
+    assertEquals(3, index.length);
+    List<OrcProto.RowIndexEntry> items = index[1].getEntryList();
+    assertEquals(1, items.size());
+    assertEquals(3, items.get(0).getPositionsCount());
+    assertEquals(0, items.get(0).getPositions(0));
+    assertEquals(0, items.get(0).getPositions(1));
+    assertEquals(0, items.get(0).getPositions(2));
+    assertEquals(1,
+                 items.get(0).getStatistics().getIntStatistics().getMinimum());
+    index = recordReader.readRowIndex(1, null, null).getRowGroupIndex();
+    assertEquals(3, index.length);
+    items = index[1].getEntryList();
+    assertEquals(2,
+        items.get(0).getStatistics().getIntStatistics().getMaximum());
+  }
+
+  private static void setInner(StructColumnVector inner, int rowId,
+                               int i, String value) {
+    ((LongColumnVector) inner.fields[0]).vector[rowId] = i;
+    if (value != null) {
+      ((BytesColumnVector) inner.fields[1]).setVal(rowId, value.getBytes());
+    } else {
+      inner.fields[1].isNull[rowId] = true;
+      inner.fields[1].noNulls = false;
+    }
+  }
+
+  private static void setInnerList(ListColumnVector list, int rowId,
+                                   List<InnerStruct> value) {
+    if (value != null) {
+      if (list.childCount + value.size() > list.child.isNull.length) {
+        list.child.ensureSize(list.childCount * 2, true);
+      }
+      list.lengths[rowId] = value.size();
+      list.offsets[rowId] = list.childCount;
+      for (int i = 0; i < list.lengths[rowId]; ++i) {
+        InnerStruct inner = value.get(i);
+        setInner((StructColumnVector) list.child, i + list.childCount,
+            inner.int1, inner.string1.toString());
+      }
+      list.childCount += value.size();
+    } else {
+      list.isNull[rowId] = true;
+      list.noNulls = false;
+    }
+  }
+
+  private static void setInnerMap(MapColumnVector map, int rowId,
+                                  Map<String, InnerStruct> value) {
+    if (value != null) {
+      if (map.childCount >= map.keys.isNull.length) {
+        map.keys.ensureSize(map.childCount * 2, true);
+        map.values.ensureSize(map.childCount * 2, true);
+      }
+      map.lengths[rowId] = value.size();
+      int offset = map.childCount;
+      map.offsets[rowId] = offset;
+
+      for (Map.Entry<String, InnerStruct> entry : value.entrySet()) {
+        ((BytesColumnVector) map.keys).setVal(offset, entry.getKey().getBytes());
+        InnerStruct inner = entry.getValue();
+        setInner((StructColumnVector) map.values, offset, inner.int1,
+            inner.string1.toString());
+        offset += 1;
+      }
+      map.childCount = offset;
+    } else {
+      map.isNull[rowId] = true;
+      map.noNulls = false;
+    }
+  }
+
+  private static void setMiddleStruct(StructColumnVector middle, int rowId,
+                                      MiddleStruct value) {
+    if (value != null) {
+      setInnerList((ListColumnVector) middle.fields[0], rowId, value.list);
+    } else {
+      middle.isNull[rowId] = true;
+      middle.noNulls = false;
+    }
+  }
+
+  private static void setBigRow(VectorizedRowBatch batch, int rowId,
+                                Boolean b1, Byte b2, Short s1,
+                                Integer i1, Long l1, Float f1,
+                                Double d1, BytesWritable b3, String s2,
+                                MiddleStruct m1, List<InnerStruct> l2,
+                                Map<String, InnerStruct> m2) {
+    ((LongColumnVector) batch.cols[0]).vector[rowId] = b1 ? 1 : 0;
+    ((LongColumnVector) batch.cols[1]).vector[rowId] = b2;
+    ((LongColumnVector) batch.cols[2]).vector[rowId] = s1;
+    ((LongColumnVector) batch.cols[3]).vector[rowId] = i1;
+    ((LongColumnVector) batch.cols[4]).vector[rowId] = l1;
+    ((DoubleColumnVector) batch.cols[5]).vector[rowId] = f1;
+    ((DoubleColumnVector) batch.cols[6]).vector[rowId] = d1;
+    if (b3 != null) {
+      ((BytesColumnVector) batch.cols[7]).setVal(rowId, b3.getBytes(), 0,
+          b3.getLength());
+    } else {
+      batch.cols[7].isNull[rowId] = true;
+      batch.cols[7].noNulls = false;
+    }
+    if (s2 != null) {
+      ((BytesColumnVector) batch.cols[8]).setVal(rowId, s2.getBytes());
+    } else {
+      batch.cols[8].isNull[rowId] = true;
+      batch.cols[8].noNulls = false;
+    }
+    setMiddleStruct((StructColumnVector) batch.cols[9], rowId, m1);
+    setInnerList((ListColumnVector) batch.cols[10], rowId, l2);
+    setInnerMap((MapColumnVector) batch.cols[11], rowId, m2);
+  }
+
+  private static TypeDescription createInnerSchema() {
+    return TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("string1", TypeDescription.createString());
+  }
+
+  private static TypeDescription createBigRowSchema() {
+    return TypeDescription.createStruct()
+        .addField("boolean1", TypeDescription.createBoolean())
+        .addField("byte1", TypeDescription.createByte())
+        .addField("short1", TypeDescription.createShort())
+        .addField("int1", TypeDescription.createInt())
+        .addField("long1", TypeDescription.createLong())
+        .addField("float1", TypeDescription.createFloat())
+        .addField("double1", TypeDescription.createDouble())
+        .addField("bytes1", TypeDescription.createBinary())
+        .addField("string1", TypeDescription.createString())
+        .addField("middle", TypeDescription.createStruct()
+            .addField("list", TypeDescription.createList(createInnerSchema())))
+        .addField("list", TypeDescription.createList(createInnerSchema()))
+        .addField("map", TypeDescription.createMap(
+            TypeDescription.createString(),
+            createInnerSchema()));
+  }
+
+  static void assertArrayEquals(boolean[] expected, boolean[] actual) {
+    assertEquals(expected.length, actual.length);
+    boolean diff = false;
+    for(int i=0; i < expected.length; ++i) {
+      if (expected[i] != actual[i]) {
+        System.out.println("Difference at " + i + " expected: " + expected[i] +
+          " actual: " + actual[i]);
+        diff = true;
+      }
+    }
+    assertEquals(false, diff);
+  }
+
+  @Test
+  public void test1() throws Exception {
+    TypeDescription schema = createBigRowSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 2;
+    setBigRow(batch, 0, false, (byte) 1, (short) 1024, 65536,
+        Long.MAX_VALUE, (float) 1.0, -15.0, bytes(0, 1, 2, 3, 4), "hi",
+        new MiddleStruct(inner(1, "bye"), inner(2, "sigh")),
+        list(inner(3, "good"), inner(4, "bad")),
+        map());
+    setBigRow(batch, 1, true, (byte) 100, (short) 2048, 65536,
+        Long.MAX_VALUE, (float) 2.0, -5.0, bytes(), "bye",
+        new MiddleStruct(inner(1, "bye"), inner(2, "sigh")),
+        list(inner(100000000, "cat"), inner(-100000, "in"), inner(1234, "hat")),
+        map(inner(5, "chani"), inner(1, "mauddib")));
+    writer.addRowBatch(batch);
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+
+    schema = writer.getSchema();
+    assertEquals(23, schema.getMaximumId());
+    boolean[] expected = new boolean[] {false, false, false, false, false,
+        false, false, false, false, false,
+        false, false, false, false, false,
+        false, false, false, false, false,
+        false, false, false, false};
+    boolean[] included = OrcUtils.includeColumns("", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    expected = new boolean[] {false, true, false, false, false,
+        false, false, false, false, true,
+        true, true, true, true, true,
+        false, false, false, false, true,
+        true, true, true, true};
+    included = OrcUtils.includeColumns("boolean1,string1,middle,map", schema);
+
+    assertArrayEquals(expected, included);
+
+    expected = new boolean[] {false, true, false, false, false,
+        false, false, false, false, true,
+        true, true, true, true, true,
+        false, false, false, false, true,
+        true, true, true, true};
+    included = OrcUtils.includeColumns("boolean1,string1,middle,map", schema);
+    assertArrayEquals(expected, included);
+
+    expected = new boolean[] {false, true, true, true, true,
+        true, true, true, true, true,
+        true, true, true, true, true,
+        true, true, true, true, true,
+        true, true, true, true};
+    included = OrcUtils.includeColumns(
+        "boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map",
+        schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    // check the stats
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(2, stats[1].getNumberOfValues());
+    assertEquals(1, ((BooleanColumnStatistics) stats[1]).getFalseCount());
+    assertEquals(1, ((BooleanColumnStatistics) stats[1]).getTrueCount());
+    assertEquals("count: 2 hasNull: false true: 1", stats[1].toString());
+
+    assertEquals(2048, ((IntegerColumnStatistics) stats[3]).getMaximum());
+    assertEquals(1024, ((IntegerColumnStatistics) stats[3]).getMinimum());
+    assertEquals(true, ((IntegerColumnStatistics) stats[3]).isSumDefined());
+    assertEquals(3072, ((IntegerColumnStatistics) stats[3]).getSum());
+    assertEquals("count: 2 hasNull: false min: 1024 max: 2048 sum: 3072",
+        stats[3].toString());
+
+    StripeStatistics ss = reader.getStripeStatistics().get(0);
+    assertEquals(2, ss.getColumnStatistics()[0].getNumberOfValues());
+    assertEquals(1, ((BooleanColumnStatistics) ss.getColumnStatistics()[1]).getTrueCount());
+    assertEquals(1024, ((IntegerColumnStatistics) ss.getColumnStatistics()[3]).getMinimum());
+    assertEquals(2048, ((IntegerColumnStatistics) ss.getColumnStatistics()[3]).getMaximum());
+    assertEquals(3072, ((IntegerColumnStatistics) ss.getColumnStatistics()[3]).getSum());
+    assertEquals(-15.0, ((DoubleColumnStatistics) stats[7]).getMinimum());
+    assertEquals(-5.0, ((DoubleColumnStatistics) stats[7]).getMaximum());
+    assertEquals(-20.0, ((DoubleColumnStatistics) stats[7]).getSum(), 0.00001);
+    assertEquals("count: 2 hasNull: false min: -15.0 max: -5.0 sum: -20.0",
+        stats[7].toString());
+
+    assertEquals("count: 2 hasNull: false min: bye max: hi sum: 5", stats[9].toString());
+
+    // check the inspectors
+    StructObjectInspector readerInspector =
+        (StructObjectInspector) reader.getObjectInspector();
+    assertEquals(ObjectInspector.Category.STRUCT,
+        readerInspector.getCategory());
+    assertEquals("struct<boolean1:boolean,byte1:tinyint,short1:smallint,"
+        + "int1:int,long1:bigint,float1:float,double1:double,bytes1:"
+        + "binary,string1:string,middle:struct<list:array<struct<int1:int,"
+        + "string1:string>>>,list:array<struct<int1:int,string1:string>>,"
+        + "map:map<string,struct<int1:int,string1:string>>>",
+        readerInspector.getTypeName());
+    List<? extends StructField> fields =
+        readerInspector.getAllStructFieldRefs();
+    BooleanObjectInspector bo = (BooleanObjectInspector) readerInspector.
+        getStructFieldRef("boolean1").getFieldObjectInspector();
+    ByteObjectInspector by = (ByteObjectInspector) readerInspector.
+        getStructFieldRef("byte1").getFieldObjectInspector();
+    ShortObjectInspector sh = (ShortObjectInspector) readerInspector.
+        getStructFieldRef("short1").getFieldObjectInspector();
+    IntObjectInspector in = (IntObjectInspector) readerInspector.
+        getStructFieldRef("int1").getFieldObjectInspector();
+    LongObjectInspector lo = (LongObjectInspector) readerInspector.
+        getStructFieldRef("long1").getFieldObjectInspector();
+    FloatObjectInspector fl = (FloatObjectInspector) readerInspector.
+        getStructFieldRef("float1").getFieldObjectInspector();
+    DoubleObjectInspector dbl = (DoubleObjectInspector) readerInspector.
+        getStructFieldRef("double1").getFieldObjectInspector();
+    BinaryObjectInspector bi = (BinaryObjectInspector) readerInspector.
+        getStructFieldRef("bytes1").getFieldObjectInspector();
+    StringObjectInspector st = (StringObjectInspector) readerInspector.
+        getStructFieldRef("string1").getFieldObjectInspector();
+    StructObjectInspector mid = (StructObjectInspector) readerInspector.
+        getStructFieldRef("middle").getFieldObjectInspector();
+    List<? extends StructField> midFields =
+        mid.getAllStructFieldRefs();
+    ListObjectInspector midli =
+        (ListObjectInspector) midFields.get(0).getFieldObjectInspector();
+    StructObjectInspector inner = (StructObjectInspector)
+        midli.getListElementObjectInspector();
+    List<? extends StructField> inFields = inner.getAllStructFieldRefs();
+    ListObjectInspector li = (ListObjectInspector) readerInspector.
+        getStructFieldRef("list").getFieldObjectInspector();
+    MapObjectInspector ma = (MapObjectInspector) readerInspector.
+        getStructFieldRef("map").getFieldObjectInspector();
+    StringObjectInspector mk = (StringObjectInspector)
+        ma.getMapKeyObjectInspector();
+    RecordReader rows = reader.rows();
+    Object row = rows.next(null);
+    assertNotNull(row);
+    // check the contents of the first row
+    assertEquals(false,
+        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals(1, by.get(readerInspector.getStructFieldData(row,
+        fields.get(1))));
+    assertEquals(1024, sh.get(readerInspector.getStructFieldData(row,
+        fields.get(2))));
+    assertEquals(65536, in.get(readerInspector.getStructFieldData(row,
+        fields.get(3))));
+    assertEquals(Long.MAX_VALUE, lo.get(readerInspector.
+        getStructFieldData(row, fields.get(4))));
+    assertEquals(1.0, fl.get(readerInspector.getStructFieldData(row,
+        fields.get(5))), 0.00001);
+    assertEquals(-15.0, dbl.get(readerInspector.getStructFieldData(row,
+        fields.get(6))), 0.00001);
+    assertEquals(bytes(0,1,2,3,4), bi.getPrimitiveWritableObject(
+        readerInspector.getStructFieldData(row, fields.get(7))));
+    assertEquals("hi", st.getPrimitiveJavaObject(readerInspector.
+        getStructFieldData(row, fields.get(8))));
+    List<?> midRow = midli.getList(mid.getStructFieldData(readerInspector.
+        getStructFieldData(row, fields.get(9)), midFields.get(0)));
+    assertNotNull(midRow);
+    assertEquals(2, midRow.size());
+    assertEquals(1, in.get(inner.getStructFieldData(midRow.get(0),
+        inFields.get(0))));
+    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (midRow.get(0), inFields.get(1))));
+    assertEquals(2, in.get(inner.getStructFieldData(midRow.get(1),
+        inFields.get(0))));
+    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (midRow.get(1), inFields.get(1))));
+    List<?> list = li.getList(readerInspector.getStructFieldData(row,
+        fields.get(10)));
+    assertEquals(2, list.size());
+    assertEquals(3, in.get(inner.getStructFieldData(list.get(0),
+        inFields.get(0))));
+    assertEquals("good", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (list.get(0), inFields.get(1))));
+    assertEquals(4, in.get(inner.getStructFieldData(list.get(1),
+        inFields.get(0))));
+    assertEquals("bad", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (list.get(1), inFields.get(1))));
+    Map<?,?> map = ma.getMap(readerInspector.getStructFieldData(row,
+        fields.get(11)));
+    assertEquals(0, map.size());
+
+    // check the contents of second row
+    assertEquals(true, rows.hasNext());
+    row = rows.next(row);
+    assertEquals(true,
+        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
+    assertEquals(100, by.get(readerInspector.getStructFieldData(row,
+        fields.get(1))));
+    assertEquals(2048, sh.get(readerInspector.getStructFieldData(row,
+        fields.get(2))));
+    assertEquals(65536, in.get(readerInspector.getStructFieldData(row,
+        fields.get(3))));
+    assertEquals(Long.MAX_VALUE, lo.get(readerInspector.
+        getStructFieldData(row, fields.get(4))));
+    assertEquals(2.0, fl.get(readerInspector.getStructFieldData(row,
+        fields.get(5))), 0.00001);
+    assertEquals(-5.0, dbl.get(readerInspector.getStructFieldData(row,
+        fields.get(6))), 0.00001);
+    assertEquals(bytes(), bi.getPrimitiveWritableObject(
+        readerInspector.getStructFieldData(row, fields.get(7))));
+    assertEquals("bye", st.getPrimitiveJavaObject(readerInspector.
+        getStructFieldData(row, fields.get(8))));
+    midRow = midli.getList(mid.getStructFieldData(readerInspector.
+        getStructFieldData(row, fields.get(9)), midFields.get(0)));
+    assertNotNull(midRow);
+    assertEquals(2, midRow.size());
+    assertEquals(1, in.get(inner.getStructFieldData(midRow.get(0),
+        inFields.get(0))));
+    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (midRow.get(0), inFields.get(1))));
+    assertEquals(2, in.get(inner.getStructFieldData(midRow.get(1),
+        inFields.get(0))));
+    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (midRow.get(1), inFields.get(1))));
+    list = li.getList(readerInspector.getStructFieldData(row,
+        fields.get(10)));
+    assertEquals(3, list.size());
+    assertEquals(100000000, in.get(inner.getStructFieldData(list.get(0),
+        inFields.get(0))));
+    assertEquals("cat", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (list.get(0), inFields.get(1))));
+    assertEquals(-100000, in.get(inner.getStructFieldData(list.get(1),
+        inFields.get(0))));
+    assertEquals("in", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (list.get(1), inFields.get(1))));
+    assertEquals(1234, in.get(inner.getStructFieldData(list.get(2),
+        inFields.get(0))));
+    assertEquals("hat", st.getPrimitiveJavaObject(inner.getStructFieldData
+        (list.get(2), inFields.get(1))));
+    map = ma.getMap(readerInspector.getStructFieldData(row,
+        fields.get(11)));
+    assertEquals(2, map.size());
+    boolean[] found = new boolean[2];
+    for(Object key: map.keySet()) {
+      String str = mk.getPrimitiveJavaObject(key);
+      if (str.equals("chani")) {
+        assertEquals(false, found[0]);
+        assertEquals(5, in.get(inner.getStructFieldData(map.get(key),
+            inFields.get(0))));
+        assertEquals(str, st.getPrimitiveJavaObject(
+            inner.getStructFieldData(map.get(key), inFields.get(1))));
+        found[0] = true;
+      } else if (str.equals("mauddib")) {
+        assertEquals(false, found[1]);
+        assertEquals(1, in.get(inner.getStructFieldData(map.get(key),
+            inFields.get(0))));
+        assertEquals(str, st.getPrimitiveJavaObject(
+            inner.getStructFieldData(map.get(key), inFields.get(1))));
+        found[1] = true;
+      } else {
+        throw new IllegalArgumentException("Unknown key " + str);
+      }
+    }
+    assertEquals(true, found[0]);
+    assertEquals(true, found[1]);
+
+    // handle the close up
+    assertEquals(false, rows.hasNext());
+    rows.close();
+  }
+
+  @Test
+  public void testColumnProjection() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(1000)
+                                         .compress(CompressionKind.NONE)
+                                         .bufferSize(100)
+                                         .rowIndexStride(1000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Random r1 = new Random(1);
+    Random r2 = new Random(2);
+    int x;
+    int minInt=0, maxInt=0;
+    String y;
+    String minStr = null, maxStr = null;
+    batch.size = 1000;
+    boolean first = true;
+    for(int b=0; b < 21; ++b) {
+      for(int r=0; r < 1000; ++r) {
+        x = r1.nextInt();
+        y = Long.toHexString(r2.nextLong());
+        if (first || x < minInt) {
+          minInt = x;
+        }
+        if (first || x > maxInt) {
+          maxInt = x;
+        }
+        if (first || y.compareTo(minStr) < 0) {
+          minStr = y;
+        }
+        if (first || y.compareTo(maxStr) > 0) {
+          maxStr = y;
+        }
+        first = false;
+        ((LongColumnVector) batch.cols[0]).vector[r] = x;
+        ((BytesColumnVector) batch.cols[1]).setVal(r, y.getBytes());
+      }
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+
+    // check out the statistics
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(3, stats.length);
+    for(ColumnStatistics s: stats) {
+      assertEquals(21000, s.getNumberOfValues());
+      if (s instanceof IntegerColumnStatistics) {
+        assertEquals(minInt, ((IntegerColumnStatistics) s).getMinimum());
+        assertEquals(maxInt, ((IntegerColumnStatistics) s).getMaximum());
+      } else if (s instanceof  StringColumnStatistics) {
+        assertEquals(maxStr, ((StringColumnStatistics) s).getMaximum());
+        assertEquals(minStr, ((StringColumnStatistics) s).getMinimum());
+      }
+    }
+
+    // check out the types
+    List<OrcProto.Type> types = reader.getTypes();
+    assertEquals(3, types.size());
+    assertEquals(OrcProto.Type.Kind.STRUCT, types.get(0).getKind());
+    assertEquals(2, types.get(0).getSubtypesCount());
+    assertEquals(1, types.get(0).getSubtypes(0));
+    assertEquals(2, types.get(0).getSubtypes(1));
+    assertEquals(OrcProto.Type.Kind.INT, types.get(1).getKind());
+    assertEquals(0, types.get(1).getSubtypesCount());
+    assertEquals(OrcProto.Type.Kind.STRING, types.get(2).getKind());
+    assertEquals(0, types.get(2).getSubtypesCount());
+
+    // read the contents and make sure they match
+    RecordReader rows1 = reader.rows(new boolean[]{true, true, false});
+    RecordReader rows2 = reader.rows(new boolean[]{true, false, true});
+    r1 = new Random(1);
+    r2 = new Random(2);
+    OrcStruct row1 = null;
+    OrcStruct row2 = null;
+    for(int i = 0; i < 21000; ++i) {
+      assertEquals(true, rows1.hasNext());
+      assertEquals(true, rows2.hasNext());
+      row1 = (OrcStruct) rows1.next(row1);
+      row2 = (OrcStruct) rows2.next(row2);
+      assertEquals(r1.nextInt(), ((IntWritable) row1.getFieldValue(0)).get());
+      assertEquals(Long.toHexString(r2.nextLong()),
+          row2.getFieldValue(1).toString());
+    }
+    assertEquals(false, rows1.hasNext());
+    assertEquals(false, rows2.hasNext());
+    rows1.close();
+    rows2.close();
+  }
+
+  @Test
+  public void testEmptyFile() throws Exception {
+    TypeDescription schema = createBigRowSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(1000)
+                                         .compress(CompressionKind.NONE)
+                                         .bufferSize(100));
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(false, reader.rows().hasNext());
+    assertEquals(CompressionKind.NONE, reader.getCompression());
+    assertEquals(0, reader.getNumberOfRows());
+    assertEquals(0, reader.getCompressionSize());
+    assertEquals(false, reader.getMetadataKeys().iterator().hasNext());
+    assertEquals(3, reader.getContentLength());
+    assertEquals(false, reader.getStripes().iterator().hasNext());
+  }
+
+  @Test
+  public void metaData() throws Exception {
+    TypeDescription schema = createBigRowSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(1000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(100));
+    writer.addUserMetadata("my.meta", byteBuf(1, 2, 3, 4, 5, 6, 7, -1, -2, 127,
+                                              -128));
+    writer.addUserMetadata("clobber", byteBuf(1, 2, 3));
+    writer.addUserMetadata("clobber", byteBuf(4, 3, 2, 1));
+    ByteBuffer bigBuf = ByteBuffer.allocate(40000);
+    Random random = new Random(0);
+    random.nextBytes(bigBuf.array());
+    writer.addUserMetadata("big", bigBuf);
+    bigBuf.position(0);
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 1;
+    setBigRow(batch, 0, true, (byte) 127, (short) 1024, 42,
+        42L * 1024 * 1024 * 1024, (float) 3.1415, -2.713, null,
+        null, null, null, null);
+    writer.addRowBatch(batch);
+    writer.addUserMetadata("clobber", byteBuf(5,7,11,13,17,19));
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(byteBuf(5,7,11,13,17,19), reader.getMetadataValue("clobber"));
+    assertEquals(byteBuf(1,2,3,4,5,6,7,-1,-2,127,-128),
+        reader.getMetadataValue("my.meta"));
+    assertEquals(bigBuf, reader.getMetadataValue("big"));
+    try {
+      reader.getMetadataValue("unknown");
+      assertTrue(false);
+    } catch (IllegalArgumentException iae) {
+      // PASS
+    }
+    int i = 0;
+    for(String key: reader.getMetadataKeys()) {
+      if ("my.meta".equals(key) ||
+          "clobber".equals(key) ||
+          "big".equals(key)) {
+        i += 1;
+      } else {
+        throw new IllegalArgumentException("unknown key " + key);
+      }
+    }
+    assertEquals(3, i);
+    int numStripes = reader.getStripeStatistics().size();
+    assertEquals(1, numStripes);
+  }
+
+  /**
+   * Generate an ORC file with a range of dates and times.
+   */
+  public void createOrcDateFile(Path file, int minYear, int maxYear
+                                ) throws IOException {
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("time", TypeDescription.createTimestamp())
+        .addField("date", TypeDescription.createDate());
+    Writer writer = OrcFile.createWriter(file,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .blockPadding(false));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 1000;
+    for (int year = minYear; year < maxYear; ++year) {
+      for (int ms = 1000; ms < 2000; ++ms) {
+        ((LongColumnVector) batch.cols[0]).vector[ms - 1000] =
+            TimestampUtils.getTimeNanoSec(Timestamp.valueOf(year +
+                "-05-05 12:34:56." + ms));
+        ((LongColumnVector) batch.cols[1]).vector[ms - 1000] =
+            new DateWritable(new Date(year - 1900, 11, 25)).getDays();
+      }
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(file,
+        OrcFile.readerOptions(conf));
+    RecordReader rows = reader.rows();
+    OrcStruct row = null;
+    for (int year = minYear; year < maxYear; ++year) {
+      for(int ms = 1000; ms < 2000; ++ms) {
+        row = (OrcStruct) rows.next(row);
+        assertEquals(new TimestampWritable
+                (Timestamp.valueOf(year + "-05-05 12:34:56." + ms)),
+            row.getFieldValue(0));
+        assertEquals(new DateWritable(new Date(year - 1900, 11, 25)),
+            row.getFieldValue(1));
+      }
+    }
+  }
+
+  @Test
+  public void testDate1900() throws Exception {
+    createOrcDateFile(testFilePath, 1900, 1970);
+  }
+
+  @Test
+  public void testDate2038() throws Exception {
+    createOrcDateFile(testFilePath, 2038, 2250);
+  }
+
+  private static void setUnion(VectorizedRowBatch batch, int rowId,
+                               Timestamp ts, Integer tag, Integer i, String s,
+                               HiveDecimalWritable dec) {
+    UnionColumnVector union = (UnionColumnVector) batch.cols[1];
+    if (ts != null) {
+      ((LongColumnVector) batch.cols[0]).vector[rowId] =
+          TimestampUtils.getTimeNanoSec(ts);
+    } else {
+      batch.cols[0].isNull[rowId] = true;
+      batch.cols[0].noNulls = false;
+    }
+    if (tag != null) {
+      union.tags[rowId] = tag;
+      if (tag == 0) {
+        if (i != null) {
+          ((LongColumnVector) union.fields[tag]).vector[rowId] = i;
+        } else {
+          union.fields[tag].isNull[rowId] = true;
+          union.fields[tag].noNulls = false;
+        }
+      } else if (tag == 1) {
+        if (s != null) {
+          ((BytesColumnVector) union.fields[tag]).setVal(rowId, s.getBytes());
+        } else {
+          union.fields[tag].isNull[rowId] = true;
+          union.fields[tag].noNulls = false;
+        }
+      } else {
+        throw new IllegalArgumentException("Bad tag " + tag);
+      }
+    } else {
+      batch.cols[1].isNull[rowId] = true;
+      batch.cols[1].noNulls = false;
+    }
+    if (dec != null) {
+      ((DecimalColumnVector) batch.cols[2]).vector[rowId] = dec;
+    } else {
+      batch.cols[2].isNull[rowId] = true;
+      batch.cols[2].noNulls = false;
+    }
+  }
+
+  /**
+     * We test union, timestamp, and decimal separately since we need to make the
+     * object inspector manually. (The Hive reflection-based doesn't handle
+     * them properly.)
+     */
+  @Test
+  public void testUnionAndTimestamp() throws Exception {
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("time", TypeDescription.createTimestamp())
+        .addField("union", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createInt())
+            .addUnionChild(TypeDescription.createString()))
+        .addField("decimal", TypeDescription.createDecimal()
+            .withPrecision(38)
+            .withScale(18));
+    HiveDecimal maxValue = HiveDecimal.create("10000000000000000000");
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(1000)
+                                         .compress(CompressionKind.NONE)
+                                         .bufferSize(100)
+                                         .blockPadding(false));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 6;
+    setUnion(batch, 0, Timestamp.valueOf("2000-03-12 15:00:00"), 0, 42, null,
+             new HiveDecimalWritable("12345678.6547456"));
+    setUnion(batch, 1, Timestamp.valueOf("2000-03-20 12:00:00.123456789"),
+        1, null, "hello", new HiveDecimalWritable("-5643.234"));
+
+    setUnion(batch, 2, null, null, null, null, null);
+    setUnion(batch, 3, null, 0, null, null, null);
+    setUnion(batch, 4, null, 1, null, null, null);
+
+    setUnion(batch, 5, Timestamp.valueOf("1970-01-01 00:00:00"), 0, 200000,
+        null, new HiveDecimalWritable("10000000000000000000"));
+    writer.addRowBatch(batch);
+
+    batch.reset();
+    Random rand = new Random(42);
+    for(int i=1970; i < 2038; ++i) {
+      Timestamp ts = Timestamp.valueOf(i + "-05-05 12:34:56." + i);
+      HiveDecimal dec =
+          HiveDecimal.create(new BigInteger(64, rand), rand.nextInt(18));
+      if ((i & 1) == 0) {
+        setUnion(batch, batch.size++, ts, 0, i*i, null,
+            new HiveDecimalWritable(dec));
+      } else {
+        setUnion(batch, batch.size++, ts, 1, null, Integer.toString(i*i),
+            new HiveDecimalWritable(dec));
+      }
+      if (maxValue.compareTo(dec) < 0) {
+        maxValue = dec;
+      }
+    }
+    writer.addRowBatch(batch);
+    batch.reset();
+
+    // let's add a lot of constant rows to test the rle
+    batch.size = 1000;
+    for(int c=0; c < batch.cols.length; ++c) {
+      batch.cols[c].setRepeating(true);
+    }
+    setUnion(batch, 0, null, 0, 1732050807, null, null);
+    for(int i=0; i < 5; ++i) {
+      writer.addRowBatch(batch);
+    }
+
+    batch.reset();
+    batch.size = 3;
+    setUnion(batch, 0, null, 0, 0, null, null);
+    setUnion(batch, 1, null, 0, 10, null, null);
+    setUnion(batch, 2, null, 0, 138, null, null);
+    writer.addRowBatch(batch);
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+
+    schema = writer.getSchema();
+    assertEquals(5, schema.getMaximumId());
+    boolean[] expected = new boolean[] {false, false, false, false, false, false};
+    boolean[] included = OrcUtils.includeColumns("", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    expected = new boolean[] {false, true, false, false, false, true};
+    included = OrcUtils.includeColumns("time,decimal", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    expected = new boolean[] {false, false, true, true, true, false};
+    included = OrcUtils.includeColumns("union", schema);
+    assertEquals(true, Arrays.equals(expected, included));
+
+    assertEquals(false, reader.getMetadataKeys().iterator().hasNext());
+    assertEquals(5077, reader.getNumberOfRows());
+    DecimalColumnStatistics stats =
+        (DecimalColumnStatistics) reader.getStatistics()[5];
+    assertEquals(71, stats.getNumberOfValues());
+    assertEquals(HiveDecimal.create("-5643.234"), stats.getMinimum());
+    assertEquals(maxValue, stats.getMaximum());
+    // TODO: fix this
+//    assertEquals(null,stats.getSum());
+    int stripeCount = 0;
+    int rowCount = 0;
+    long currentOffset = -1;
+    for(StripeInformation stripe: reader.getStripes()) {
+      stripeCount += 1;
+      rowCount += stripe.getNumberOfRows();
+      if (currentOffset < 0) {
+        currentOffset = stripe.getOffset() + stripe.getLength();
+      } else {
+        assertEquals(currentOffset, stripe.getOffset());
+        currentOffset += stripe.getLength();
+      }
+    }
+    assertEquals(reader.getNumberOfRows(), rowCount);
+    assertEquals(2, stripeCount);
+    assertEquals(reader.getContentLength(), currentOffset);
+    RecordReader rows = reader.rows();
+    assertEquals(0, rows.getRowNumber());
+    assertEquals(0.0, rows.getProgress(), 0.000001);
+    assertEquals(true, rows.hasNext());
+    OrcStruct row = (OrcStruct) rows.next(null);
+    assertEquals(1, rows.getRowNumber());
+    ObjectInspector inspector = reader.getObjectInspector();
+    assertEquals("struct<time:timestamp,union:uniontype<int,string>,decimal:decimal(38,18)>",
+        inspector.getTypeName());
+    assertEquals(new TimestampWritable(Timestamp.valueOf("2000-03-12 15:00:00")),
+        row.getFieldValue(0));
+    OrcUnion union = (OrcUnion) row.getFieldValue(1);
+    assertEquals(0, union.getTag());
+    assertEquals(new IntWritable(42), union.getObject());
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("12345678.6547456")),
+        row.getFieldValue(2));
+    row = (OrcStruct) rows.next(row);
+    assertEquals(2, rows.getRowNumber());
+    assertEquals(new TimestampWritable(Timestamp.valueOf("2000-03-20 12:00:00.123456789")),
+        row.getFieldValue(0));
+    assertEquals(1, union.getTag());
+    assertEquals(new Text("hello"), union.getObject());
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")),
+        row.getFieldValue(2));
+    row = (OrcStruct) rows.next(row);
+    assertEquals(null, row.getFieldValue(0));
+    assertEquals(null, row.getFieldValue(1));
+    assertEquals(null, row.getFieldValue(2));
+    row = (OrcStruct) rows.next(row);
+    assertEquals(null, row.getFieldValue(0));
+    union = (OrcUnion) row.getFieldValue(1);
+    assertEquals(0, union.getTag());
+    assertEquals(null, union.getObject());
+    assertEquals(null, row.getFieldValue(2));
+    row = (OrcStruct) rows.next(row);
+    assertEquals(null, row.getFieldValue(0));
+    assertEquals(1, union.getTag());
+    assertEquals(null, union.getObject());
+    assertEquals(null, row.getFieldValue(2));
+    row = (OrcStruct) rows.next(row);
+    assertEquals(new TimestampWritable(Timestamp.valueOf("1970-01-01 00:00:00")),
+        row.getFieldValue(0));
+    assertEquals(new IntWritable(200000), union.getObject());
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("10000000000000000000")),
+                 row.getFieldValue(2));
+    rand = new Random(42);
+    for(int i=1970; i < 2038; ++i) {
+      row = (OrcStruct) rows.next(row);
+      assertEquals(new TimestampWritable(Timestamp.valueOf(i + "-05-05 12:34:56." + i)),
+          row.getFieldValue(0));
+      if ((i & 1) == 0) {
+        assertEquals(0, union.getTag());
+        assertEquals(new IntWritable(i*i), union.getObject());
+      } else {
+        assertEquals(1, union.getTag());
+        assertEquals(new Text(Integer.toString(i * i)), union.getObject());
+      }
+      assertEquals(new HiveDecimalWritable(HiveDecimal.create(new BigInteger(64, rand),
+                                   rand.nextInt(18))), row.getFieldValue(2));
+    }
+    for(int i=0; i < 5000; ++i) {
+      row = (OrcStruct) rows.next(row);
+      assertEquals(new IntWritable(1732050807), union.getObject());
+    }
+    row = (OrcStruct) rows.next(row);
+    assertEquals(new IntWritable(0), union.getObject());
+    row = (OrcStruct) rows.next(row);
+    assertEquals(new IntWritable(10), union.getObject());
+    row = (OrcStruct) rows.next(row);
+    assertEquals(new IntWritable(138), union.getObject());
+    assertEquals(false, rows.hasNext());
+    assertEquals(1.0, rows.getProgress(), 0.00001);
+    assertEquals(reader.getNumberOfRows(), rows.getRowNumber());
+    rows.seekToRow(1);
+    row = (OrcStruct) rows.next(row);
+    assertEquals(new TimestampWritable(Timestamp.valueOf("2000-03-20 12:00:00.123456789")),
+        row.getFieldValue(0));
+    assertEquals(1, union.getTag());
+    assertEquals(new Text("hello"), union.getObject());
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")), row.getFieldValue(2));
+    rows.close();
+  }
+
+  /**
+   * Read and write a randomly generated snappy file.
+   * @throws Exception
+   */
+  @Test
+  public void testSnappy() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(1000)
+                                         .compress(CompressionKind.SNAPPY)
+                                         .bufferSize(100));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Random rand = new Random(12);
+    batch.size = 1000;
+    for(int b=0; b < 10; ++b) {
+      for (int r=0; r < 1000; ++r) {
+        ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+        ((BytesColumnVector) batch.cols[1]).setVal(r,
+            Integer.toHexString(rand.nextInt()).getBytes());
+      }
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    rand = new Random(12);
+    OrcStruct row = null;
+    for(int i=0; i < 10000; ++i) {
+      assertEquals(true, rows.hasNext());
+      row = (OrcStruct) rows.next(row);
+      assertEquals(rand.nextInt(), ((IntWritable) row.getFieldValue(0)).get());
+      assertEquals(Integer.toHexString(rand.nextInt()),
+          row.getFieldValue(1).toString());
+    }
+    assertEquals(false, rows.hasNext());
+    rows.close();
+  }
+
+  /**
+   * Read and write a randomly generated snappy file.
+   * @throws Exception
+   */
+  @Test
+  public void testWithoutIndex() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(5000)
+                                         .compress(CompressionKind.SNAPPY)
+                                         .bufferSize(1000)
+                                         .rowIndexStride(0));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Random rand = new Random(24);
+    batch.size = 5;
+    for(int c=0; c < batch.cols.length; ++c) {
+      batch.cols[c].setRepeating(true);
+    }
+    for(int i=0; i < 10000; ++i) {
+      ((LongColumnVector) batch.cols[0]).vector[0] = rand.nextInt();
+      ((BytesColumnVector) batch.cols[1])
+          .setVal(0, Integer.toBinaryString(rand.nextInt()).getBytes());
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(50000, reader.getNumberOfRows());
+    assertEquals(0, reader.getRowIndexStride());
+    StripeInformation stripe = reader.getStripes().iterator().next();
+    assertEquals(true, stripe.getDataLength() != 0);
+    assertEquals(0, stripe.getIndexLength());
+    RecordReader rows = reader.rows();
+    rand = new Random(24);
+    OrcStruct row = null;
+    for(int i=0; i < 10000; ++i) {
+      int intVal = rand.nextInt();
+      String strVal = Integer.toBinaryString(rand.nextInt());
+      for(int j=0; j < 5; ++j) {
+        assertEquals(true, rows.hasNext());
+        row = (OrcStruct) rows.next(row);
+        assertEquals(intVal, ((IntWritable) row.getFieldValue(0)).get());
+        assertEquals(strVal, row.getFieldValue(1).toString());
+      }
+    }
+    assertEquals(false, rows.hasNext());
+    rows.close();
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    TypeDescription schema = createBigRowSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(200000)
+                                         .bufferSize(65536)
+                                         .rowIndexStride(1000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Random rand = new Random(42);
+    final int COUNT=32768;
+    long[] intValues= new long[COUNT];
+    double[] doubleValues = new double[COUNT];
+    String[] stringValues = new String[COUNT];
+    BytesWritable[] byteValues = new BytesWritable[COUNT];
+    String[] words = new String[128];
+    for(int i=0; i < words.length; ++i) {
+      words[i] = Integer.toHexString(rand.nextInt());
+    }
+    for(int i=0; i < COUNT/2; ++i) {
+      intValues[2*i] = rand.nextLong();
+      intValues[2*i+1] = intValues[2*i];
+      stringValues[2*i] = words[rand.nextInt(words.length)];
+      stringValues[2*i+1] = stringValues[2*i];
+    }
+    for(int i=0; i < COUNT; ++i) {
+      doubleValues[i] = rand.nextDouble();
+      byte[] buf = new byte[20];
+      rand.nextBytes(buf);
+      byteValues[i] = new BytesWritable(buf);
+    }
+    for(int i=0; i < COUNT; ++i) {
+      appendRandomRow(batch, intValues, doubleValues, stringValues,
+          byteValues, words, i);
+      if (batch.size == 1024) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(COUNT, reader.getNumberOfRows());
+    RecordReader rows = reader.rows();
+    // get the row index
+    MetadataReader meta = ((RecordReaderImpl) rows).getMetadataReader();
+    RecordReaderImpl.Index index =
+        meta.readRowIndex(reader.getStripes().get(0), null, null, null, null,
+            null);
+    // check the primitive columns to make sure they have the right number of
+    // items in the first row group
+    for(int c=1; c < 9; ++c) {
+      OrcProto.RowIndex colIndex = index.getRowGroupIndex()[c];
+      assertEquals(1000,
+          colIndex.getEntry(0).getStatistics().getNumberOfValues());
+    }
+    OrcStruct row = null;
+    for(int i=COUNT-1; i >= 0; --i) {
+      rows.seekToRow(i);
+      row = (OrcStruct) rows.next(row);
+      BigRow expected = createRandomRow(intValues, doubleValues,
+          stringValues, byteValues, words, i);
+      assertEquals(expected.boolean1.booleanValue(),
+          ((BooleanWritable) row.getFieldValue(0)).get());
+      assertEquals(expected.byte1.byteValue(),
+          ((ByteWritable) row.getFieldValue(1)).get());
+      assertEquals(expected.short1.shortValue(),
+          ((ShortWritable) row.getFieldValue(2)).get());
+      assertEquals(expected.int1.intValue(),
+          ((IntWritable) row.getFieldValue(3)).get());
+      assertEquals(expected.long1.longValue(),
+          ((LongWritable) row.getFieldValue(4)).get());
+      assertEquals(expected.float1,
+          ((FloatWritable) row.getFieldValue(5)).get(), 0.0001);
+      assertEquals(expected.double1,
+          ((DoubleWritable) row.getFieldValue(6)).get(), 0.0001);
+      assertEquals(expected.bytes1, row.getFieldValue(7));
+      assertEquals(expected.string1, row.getFieldValue(8));
+      List<InnerStruct> expectedList = expected.middle.list;
+      List<OrcStruct> actualList =
+          (List<OrcStruct>) ((OrcStruct) row.getFieldValue(9)).getFieldValue(0);
+      compareList(expectedList, actualList, "middle list " + i);
+      compareList(expected.list, (List<OrcStruct>) row.getFieldValue(10),
+          "list " + i);
+    }
+    rows.close();
+    Iterator<StripeInformation> stripeIterator =
+      reader.getStripes().iterator();
+    long offsetOfStripe2 = 0;
+    long offsetOfStripe4 = 0;
+    long lastRowOfStripe2 = 0;
+    for(int i = 0; i < 5; ++i) {
+      StripeInformation stripe = stripeIterator.next();
+      if (i < 2) {
+        lastRowOfStripe2 += stripe.getNumberOfRows();
+      } else if (i == 2) {
+        offsetOfStripe2 = stripe.getOffset();
+        lastRowOfStripe2 += stripe.getNumberOfRows() - 1;
+      } else if (i == 4) {
+        offsetOfStripe4 = stripe.getOffset();
+      }
+    }
+    boolean[] columns = new boolean[reader.getStatistics().length];
+    columns[5] = true; // long colulmn
+    columns[9] = true; // text column
+    rows = reader.rowsOptions(new Reader.Options()
+        .range(offsetOfStripe2, offsetOfStripe4 - offsetOfStripe2)
+        .include(columns));
+    rows.seekToRow(lastRowOfStripe2);
+    for(int i = 0; i < 2; ++i) {
+      row = (OrcStruct) rows.next(row);
+      BigRow expected = createRandomRow(intValues, doubleValues,
+                                        stringValues, byteValues, words,
+                                        (int) (lastRowOfStripe2 + i));
+
+      assertEquals(expected.long1.longValue(),
+          ((LongWritable) row.getFieldValue(4)).get());
+      assertEquals(expected.string1, row.getFieldValue(8));
+    }
+    rows.close();
+  }
+
+  private void compareInner(InnerStruct expect,
+                            OrcStruct actual,
+                            String context) throws Exception {
+    if (expect == null || actual == null) {
+      assertEquals(context, null, expect);
+      assertEquals(context, null, actual);
+    } else {
+      assertEquals(context, expect.int1,
+          ((IntWritable) actual.getFieldValue(0)).get());
+      assertEquals(context, expect.string1, actual.getFieldValue(1));
+    }
+  }
+
+  private void compareList(List<InnerStruct> expect,
+                           List<OrcStruct> actual,
+                           String context) throws Exception {
+    assertEquals(context, expect.size(), actual.size());
+    for(int j=0; j < expect.size(); ++j) {
+      compareInner(expect.get(j), actual.get(j), context + " at " + j);
+    }
+  }
+
+  private void appendRandomRow(VectorizedRowBatch batch,
+                               long[] intValues, double[] doubleValues,
+                               String[] stringValues,
+                               BytesWritable[] byteValues,
+                               String[] words, int i) {
+    InnerStruct inner = new InnerStruct((int) intValues[i], stringValues[i]);
+    InnerStruct inner2 = new InnerStruct((int) (intValues[i] >> 32),
+        words[i % words.length] + "-x");
+    setBigRow(batch, batch.size++, (intValues[i] & 1) == 0, (byte) intValues[i],
+        (short) intValues[i], (int) intValues[i], intValues[i],
+        (float) doubleValues[i], doubleValues[i], byteValues[i], stringValues[i],
+        new MiddleStruct(inner, inner2), list(), map(inner, inner2));
+  }
+
+  private BigRow createRandomRow(long[] intValues, double[] doubleValues,
+                                 String[] stringValues,
+                                 BytesWritable[] byteValues,
+                                 String[] words, int i) {
+    InnerStruct inner = new InnerStruct((int) intValues[i], stringValues[i]);
+    InnerStruct inner2 = new InnerStruct((int) (intValues[i] >> 32),
+        words[i % words.length] + "-x");
+    return new BigRow((intValues[i] & 1) == 0, (byte) intValues[i],
+        (short) intValues[i], (int) intValues[i], intValues[i],
+        (float) doubleValues[i], doubleValues[i], byteValues[i],stringValues[i],
+        new MiddleStruct(inner, inner2), list(), map(inner,inner2));
+  }
+
+  private static class MyMemoryManager extends MemoryManager {
+    final long totalSpace;
+    double rate;
+    Path path = null;
+    long lastAllocation = 0;
+    int rows = 0;
+    Callback callback;
+
+    MyMemoryManager(Configuration conf, long totalSpace, double rate) {
+      super(conf);
+      this.totalSpace = totalSpace;
+      this.rate = rate;
+    }
+
+    @Override
+    void addWriter(Path path, long requestedAllocation,
+                   Callback callback) {
+      this.path = path;
+      this.lastAllocation = requestedAllocation;
+      this.callback = callback;
+    }
+
+    @Override
+    synchronized void removeWriter(Path path) {
+      this.path = null;
+      this.lastAllocation = 0;
+    }
+
+    @Override
+    long getTotalMemoryPool() {
+      return totalSpace;
+    }
+
+    @Override
+    double getAllocationScale() {
+      return rate;
+    }
+
+    @Override
+    void addedRow(int count) throws IOException {
+      rows += count;
+      if (rows % 100 == 0) {
+        callback.checkMemory(rate);
+      }
+    }
+  }
+
+  @Test
+  public void testMemoryManagementV11() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .compress(CompressionKind.NONE)
+            .stripeSize(50000)
+            .bufferSize(100)
+            .rowIndexStride(0)
+            .memory(memory)
+            .version(Version.V_0_11));
+    assertEquals(testFilePath, memory.path);
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 1;
+    for(int i=0; i < 2500; ++i) {
+      ((LongColumnVector) batch.cols[0]).vector[0] = i * 300;
+      ((BytesColumnVector) batch.cols[1]).setVal(0,
+          Integer.toHexString(10*i).getBytes());
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    assertEquals(null, memory.path);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    int i = 0;
+    for(StripeInformation stripe: reader.getStripes()) {
+      i += 1;
+      assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
+          stripe.getDataLength() < 5000);
+    }
+    assertEquals(25, i);
+    assertEquals(2500, reader.getNumberOfRows());
+  }
+
+  @Test
+  public void testMemoryManagementV12() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .compress(CompressionKind.NONE)
+                                         .stripeSize(50000)
+                                         .bufferSize(100)
+                                         .rowIndexStride(0)
+                                         .memory(memory)
+                                         .version(Version.V_0_12));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    assertEquals(testFilePath, memory.path);
+    batch.size = 1;
+    for(int i=0; i < 2500; ++i) {
+      ((LongColumnVector) batch.cols[0]).vector[0] = i * 300;
+      ((BytesColumnVector) batch.cols[1]).setVal(0,
+          Integer.toHexString(10*i).getBytes());
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+    assertEquals(null, memory.path);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    int i = 0;
+    for(StripeInformation stripe: reader.getStripes()) {
+      i += 1;
+      assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
+          stripe.getDataLength() < 5000);
+    }
+    // with HIVE-7832, the dictionaries will be disabled after writing the first
+    // stripe as there are too many distinct values. Hence only 4 stripes as
+    // compared to 25 stripes in version 0.11 (above test case)
+    assertEquals(4, i);
+    assertEquals(2500, reader.getNumberOfRows());
+  }
+
+  @Test
+  public void testPredicatePushdown() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(400000L)
+            .compress(CompressionKind.NONE)
+            .bufferSize(500)
+            .rowIndexStride(1000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.ensureSize(3500);
+    batch.size = 3500;
+    for(int i=0; i < 3500; ++i) {
+      ((LongColumnVector) batch.cols[0]).vector[i] = i * 300;
+      ((BytesColumnVector) batch.cols[1]).setVal(i,
+          Integer.toHexString(10*i).getBytes());
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(3500, reader.getNumberOfRows());
+
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+        .startAnd()
+          .startNot()
+             .lessThan("int1", PredicateLeaf.Type.LONG, 300000L)
+          .end()
+          .lessThan("int1", PredicateLeaf.Type.LONG, 600000L)
+        .end()
+        .build();
+    RecordReader rows = reader.rowsOptions(new Reader.Options()
+        .range(0L, Long.MAX_VALUE)
+        .include(new boolean[]{true, true, true})
+        .searchArgument(sarg, new String[]{null, "int1", "string1"}));
+    assertEquals(1000L, rows.getRowNumber());
+    OrcStruct row = null;
+    for(int i=1000; i < 2000; ++i) {
+      assertTrue(rows.hasNext());
+      row = (OrcStruct) rows.next(row);
+      assertEquals(300 * i, ((IntWritable) row.getFieldValue(0)).get());
+      assertEquals(Integer.toHexString(10*i), row.getFieldValue(1).toString());
+    }
+    assertTrue(!rows.hasNext());
+    assertEquals(3500, rows.getRowNumb

<TRUNCATED>