You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/04/26 17:17:57 UTC

[1/5] hive git commit: HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)

Repository: hive
Updated Branches:
  refs/heads/master bf44f3ee3 -> 0ac424f0a


http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
index adb52f0..a52b3ef 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
@@ -151,12 +151,11 @@ public class TestVectorizedORCReader {
         OrcFile.readerOptions(conf));
     RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows();
     RecordReaderImpl rr = (RecordReaderImpl) reader.rows();
-    VectorizedRowBatch batch = null;
+    VectorizedRowBatch batch = reader.getSchema().createRowBatch();
     OrcStruct row = null;
 
     // Check Vectorized ORC reader against ORC row reader
-    while (vrr.hasNext()) {
-      batch = vrr.nextBatch(batch);
+    while (vrr.nextBatch(batch)) {
       for (int i = 0; i < batch.size; i++) {
         row = (OrcStruct) rr.next(row);
         for (int j = 0; j < batch.cols.length; j++) {
@@ -239,6 +238,6 @@ public class TestVectorizedORCReader {
       Assert.assertEquals(false, batch.cols[8].noNulls);
       Assert.assertEquals(false, batch.cols[9].noNulls);
     }
-    Assert.assertEquals(false, rr.hasNext());
+    Assert.assertEquals(false, rr.nextBatch(batch));
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 99744cd..a6d932c 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -338,6 +338,17 @@ public class BytesColumnVector extends ColumnVector {
     initBuffer(0);
   }
 
+  public String toString(int row) {
+    if (isRepeating) {
+      row = 0;
+    }
+    if (noNulls || !isNull[row]) {
+      return new String(vector[row], start[row], length[row]);
+    } else {
+      return null;
+    }
+  }
+
   @Override
   public void stringifyValue(StringBuilder buffer, int row) {
     if (isRepeating) {
@@ -354,8 +365,8 @@ public class BytesColumnVector extends ColumnVector {
 
   @Override
   public void ensureSize(int size, boolean preserveData) {
+    super.ensureSize(size, preserveData);
     if (size > vector.length) {
-      super.ensureSize(size, preserveData);
       int[] oldStart = start;
       start = new int[size];
       int[] oldLength = length;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
index 1523ff6..0c52210 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
@@ -140,8 +140,8 @@ public class DecimalColumnVector extends ColumnVector {
 
   @Override
   public void ensureSize(int size, boolean preserveData) {
+    super.ensureSize(size, preserveData);
     if (size > vector.length) {
-      super.ensureSize(size, preserveData);
       HiveDecimalWritable[] oldArray = vector;
       vector = new HiveDecimalWritable[size];
       if (preserveData) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
index 41dc3e1..bd421f4 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
@@ -162,8 +162,8 @@ public class DoubleColumnVector extends ColumnVector {
 
   @Override
   public void ensureSize(int size, boolean preserveData) {
+    super.ensureSize(size, preserveData);
     if (size > vector.length) {
-      super.ensureSize(size, preserveData);
       double[] oldArray = vector;
       vector = new double[size];
       if (preserveData) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
index 0afe5db..80d4731 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
@@ -208,8 +208,8 @@ public class LongColumnVector extends ColumnVector {
 
   @Override
   public void ensureSize(int size, boolean preserveData) {
+    super.ensureSize(size, preserveData);
     if (size > vector.length) {
-      super.ensureSize(size, preserveData);
       long[] oldArray = vector;
       vector = new long[size];
       if (preserveData) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
index d8451f0..1aeff83 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
@@ -111,8 +111,8 @@ public abstract class MultiValuedColumnVector extends ColumnVector {
 
   @Override
   public void ensureSize(int size, boolean preserveData) {
+    super.ensureSize(size, preserveData);
     if (size > offsets.length) {
-      super.ensureSize(size, preserveData);
       long[] oldOffsets = offsets;
       offsets = new long[size];
       long oldLengths[] = lengths;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
index c0dd5ed..d971339 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
@@ -392,4 +392,4 @@ public class TimestampColumnVector extends ColumnVector {
       buffer.append("null");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
index 298d588..0c61243 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.util.Arrays;
-
 /**
  * The representation of a vectorized column of struct objects.
  *


[5/5] hive git commit: HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)

Posted by om...@apache.org.
HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley,
reviewed by Matt McCline and Prasanth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ac424f0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ac424f0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ac424f0

Branch: refs/heads/master
Commit: 0ac424f0a17b341efe299da167791112e4a953e9
Parents: bf44f3e
Author: Owen O'Malley <om...@apache.org>
Authored: Tue Apr 26 08:15:00 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue Apr 26 08:17:24 2016 -0700

----------------------------------------------------------------------
 itests/qtest/pom.xml                            |    2 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   46 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |   62 +-
 .../llap/io/metadata/OrcStripeMetadata.java     |    6 +-
 .../TestIncrementalObjectSizeEstimator.java     |   31 +-
 orc/src/java/org/apache/orc/DataReader.java     |   24 +-
 .../java/org/apache/orc/DataReaderFactory.java  |    9 -
 .../org/apache/orc/MetadataReaderFactory.java   |   12 -
 orc/src/java/org/apache/orc/OrcUtils.java       |   78 +
 orc/src/java/org/apache/orc/Reader.java         |   29 +-
 orc/src/java/org/apache/orc/RecordReader.java   |    8 +-
 .../java/org/apache/orc/TypeDescription.java    |   62 +-
 .../org/apache/orc/impl/BitFieldReader.java     |    5 +-
 .../apache/orc/impl/DataReaderProperties.java   |   41 +-
 .../orc/impl/DefaultMetadataReaderFactory.java  |   14 -
 orc/src/java/org/apache/orc/impl/InStream.java  |    4 +-
 .../java/org/apache/orc/impl/IntegerReader.java |   26 +-
 .../org/apache/orc/impl/MetadataReader.java     |   33 -
 .../org/apache/orc/impl/MetadataReaderImpl.java |  120 --
 .../orc/impl/MetadataReaderProperties.java      |   96 -
 .../apache/orc/impl/RunLengthByteReader.java    |   36 +-
 .../apache/orc/impl/RunLengthIntegerReader.java |   31 +-
 .../orc/impl/RunLengthIntegerReaderV2.java      |   33 +-
 .../java/org/apache/orc/impl/WriterImpl.java    |   47 +-
 .../orc/impl/TestDataReaderProperties.java      |   12 +-
 .../orc/impl/TestMetadataReaderProperties.java  |   72 -
 .../ql/exec/vector/VectorizedRowBatchCtx.java   |   13 +-
 .../ql/io/orc/DefaultDataReaderFactory.java     |   14 -
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   43 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |    3 +-
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   65 +-
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |  231 +--
 .../hive/ql/io/orc/RecordReaderUtils.java       |  127 +-
 .../hadoop/hive/ql/io/orc/SchemaEvolution.java  |  234 +--
 .../hive/ql/io/orc/TreeReaderFactory.java       |  838 +++++----
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   32 +-
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |    2 -
 .../hive/ql/io/orc/TestRecordReaderImpl.java    |   55 +-
 .../hive/ql/io/orc/TestTypeDescription.java     |    4 +-
 .../hive/ql/io/orc/TestVectorOrcFile.java       | 1647 +++++++++---------
 .../hive/ql/io/orc/TestVectorizedORCReader.java |    7 +-
 .../hive/ql/exec/vector/BytesColumnVector.java  |   13 +-
 .../ql/exec/vector/DecimalColumnVector.java     |    2 +-
 .../hive/ql/exec/vector/DoubleColumnVector.java |    2 +-
 .../hive/ql/exec/vector/LongColumnVector.java   |    2 +-
 .../ql/exec/vector/MultiValuedColumnVector.java |    2 +-
 .../ql/exec/vector/TimestampColumnVector.java   |    2 +-
 .../hive/ql/exec/vector/UnionColumnVector.java  |    2 -
 48 files changed, 2133 insertions(+), 2146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index b042774..a479557 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -431,7 +431,7 @@
                   templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" template="TestCliDriver.vm"
                   queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/"
                   queryFile="${qfile}"
-                  excludeQueryFile="${minillap.query.files},${minimr.query.files},${minitez.query.files},${encrypted.query.files},${spark.only.query.files},${disabled.query.files},regexp_extract.q"
+                  excludeQueryFile="${minillap.query.files},${minimr.query.files},${minitez.query.files},${encrypted.query.files},${spark.only.query.files},${disabled.query.files}"
                   queryFileRegex="${qfile_regex}"
                   clusterMode="${clustermode}"
                   runDisabled="${run_disabled}"

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 0651557..a689f10 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -27,7 +28,12 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
@@ -71,6 +77,35 @@ public class OrcEncodedDataConsumer
     stripes[m.getStripeIx()] = m;
   }
 
+  private static ColumnVector createColumn(OrcProto.Type type,
+                                           int batchSize) {
+    switch (type.getKind()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case DATE:
+        return new LongColumnVector(batchSize);
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleColumnVector(batchSize);
+      case BINARY:
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return new BytesColumnVector(batchSize);
+      case TIMESTAMP:
+        return new TimestampColumnVector(batchSize);
+      case DECIMAL:
+        return new DecimalColumnVector(batchSize, type.getPrecision(),
+            type.getScale());
+      default:
+        throw new IllegalArgumentException("LLAP does not support " +
+            type.getKind());
+    }
+  }
+
   @Override
   protected void decodeBatch(OrcEncodedColumnBatch batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
@@ -112,9 +147,16 @@ public class OrcEncodedDataConsumer
         ColumnVectorBatch cvb = cvbPool.take();
         assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
         cvb.size = batchSize;
-
+        List<OrcProto.Type> types = fileMetadata.getTypes();
+        int[] columnMapping = batch.getColumnIxs();
         for (int idx = 0; idx < batch.getColumnIxs().length; idx++) {
-          cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize);
+          if (cvb.cols[idx] == null) {
+            // skip over the top level struct, but otherwise assume no complex
+            // types
+            cvb.cols[idx] = createColumn(types.get(columnMapping[idx]), batchSize);
+          }
+          cvb.cols[idx].ensureSize(batchSize, false);
+          columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize);
         }
 
         // we are done reading a batch, send it to consumer for processing

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 406f8f6..83011fb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -27,6 +27,8 @@ import java.util.List;
 
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.OrcIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +60,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
 import org.apache.orc.OrcConf;
@@ -144,8 +145,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   // Read state.
   private int stripeIxFrom;
   private OrcFileMetadata fileMetadata;
+  private Path path;
   private Reader orcReader;
-  private MetadataReader metadataReader;
+  private DataReader metadataReader;
   private EncodedReader stripeReader;
   private Object fileKey;
   private FileSystem fs;
@@ -555,13 +557,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
    * Closes the stripe readers (on error).
    */
   private void cleanupReaders() {
-    if (metadataReader != null) {
-      try {
-        metadataReader.close();
-      } catch (IOException ex) {
-        // Ignore.
-      }
-    }
     if (stripeReader != null) {
       try {
         stripeReader.close();
@@ -576,7 +571,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
    */
   private void ensureOrcReader() throws IOException {
     if (orcReader != null) return;
-    Path path = split.getPath();
+    path = split.getPath();
     if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
       path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
     }
@@ -661,7 +656,16 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     ensureOrcReader();
     if (metadataReader != null) return;
     long startTime = counters.startTimeCounter();
-    metadataReader = orcReader.metadata();
+    boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
+    metadataReader = RecordReaderUtils.createDefaultDataReader(
+        DataReaderProperties.builder()
+        .withBufferSize(orcReader.getCompressionSize())
+        .withCompression(orcReader.getCompressionKind())
+        .withFileSystem(fs)
+        .withPath(path)
+        .withTypeCount(orcReader.getSchema().getMaximumId() + 1)
+        .withZeroCopy(useZeroCopy)
+        .build());
     counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
@@ -805,13 +809,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private class DataWrapperForOrc implements DataReader, DataCache {
     private final DataReader orcDataReader;
 
-    public DataWrapperForOrc() {
-      boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
-      if (useZeroCopy && !getAllocator().isDirectAlloc()) {
-        throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
-            + "buffers; either disable zero-copy or enable direct cache allocation");
-      }
-      this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy);
+    private DataWrapperForOrc(DataWrapperForOrc other) {
+      orcDataReader = other.orcDataReader.clone();
+    }
+
+    public DataWrapperForOrc() throws IOException {
+      ensureMetadataReader();
+      this.orcDataReader = metadataReader.clone();
     }
 
     @Override
@@ -881,10 +885,32 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     }
 
     @Override
+    public DataWrapperForOrc clone() {
+      return new DataWrapperForOrc(this);
+    }
+
+    @Override
     public void open() throws IOException {
       long startTime = counters.startTimeCounter();
       orcDataReader.open();
       counters.recordHdfsTime(startTime);
     }
+
+    @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      return orcDataReader.readRowIndex(stripe, footer, included, indexes,
+          sargColumns, bloomFilterIndices);
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+      return orcDataReader.readStripeFooter(stripe);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 82187bd..6874586 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -28,9 +28,9 @@ import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.orc.DataReader;
 import org.apache.orc.OrcProto;
 import org.apache.orc.StripeInformation;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.impl.OrcIndex;
 
 public class OrcStripeMetadata extends LlapCacheableBuffer {
@@ -54,7 +54,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
     SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class);
   }
 
-  public OrcStripeMetadata(OrcBatchKey stripeKey, MetadataReader mr, StripeInformation stripe,
+  public OrcStripeMetadata(OrcBatchKey stripeKey, DataReader mr, StripeInformation stripe,
       boolean[] includes, boolean[] sargColumns) throws IOException {
     this.stripeKey = stripeKey;
     OrcProto.StripeFooter footer = mr.readStripeFooter(stripe);
@@ -95,7 +95,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
     return true;
   }
 
-  public void loadMissingIndexes(MetadataReader mr, StripeInformation stripe, boolean[] includes,
+  public void loadMissingIndexes(DataReader mr, StripeInformation stripe, boolean[] includes,
       boolean[] sargColumns) throws IOException {
     // TODO: should we save footer to avoid a read here?
     rowIndex = mr.readRowIndex(stripe, null, includes, rowIndex.getRowGroupIndex(),

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
index a078f73..183fb1b 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
@@ -21,18 +21,20 @@ import static org.junit.Assert.*;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.DataReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
@@ -46,11 +48,16 @@ import com.google.protobuf.CodedOutputStream;
 public class TestIncrementalObjectSizeEstimator {
   private static final Logger LOG = LoggerFactory.getLogger(TestIncrementalObjectSizeEstimator.class);
 
-  private static class DummyMetadataReader implements MetadataReader  {
+  private static class DummyMetadataReader implements DataReader {
     public boolean doStreamStep = false;
     public boolean isEmpty;
 
     @Override
+    public void open() throws IOException {
+
+    }
+
+    @Override
     public OrcIndex readRowIndex(StripeInformation stripe,
                               OrcProto.StripeFooter footer,
         boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
@@ -118,6 +125,26 @@ public class TestIncrementalObjectSizeEstimator {
     }
 
     @Override
+    public DiskRangeList readFileData(DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean isTrackingDiskRanges() {
+      return false;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer toRelease) {
+
+    }
+
+    @Override
+    public DataReader clone() {
+      return null;
+    }
+
+    @Override
     public void close() throws IOException {
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/DataReader.java b/orc/src/java/org/apache/orc/DataReader.java
index b70f26b..a5dbb76 100644
--- a/orc/src/java/org/apache/orc/DataReader.java
+++ b/orc/src/java/org/apache/orc/DataReader.java
@@ -18,18 +18,27 @@
 
 package org.apache.orc;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.impl.OrcIndex;
 
 /** An abstract data reader that IO formats can use to read bytes from underlying storage. */
-public interface DataReader extends Closeable {
+public interface DataReader extends AutoCloseable {
 
   /** Opens the DataReader, making it ready to use. */
   void open() throws IOException;
 
+  OrcIndex readRowIndex(StripeInformation stripe,
+                        OrcProto.StripeFooter footer,
+                        boolean[] included, OrcProto.RowIndex[] indexes,
+                        boolean[] sargColumns,
+                        OrcProto.BloomFilterIndex[] bloomFilterIndices
+                        ) throws IOException;
+
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
+
   /** Reads the data.
    *
    * Note that for the cases such as zero-copy read, caller must release the disk ranges
@@ -53,4 +62,15 @@ public interface DataReader extends Closeable {
    * @param toRelease The buffer to release.
    */
   void releaseBuffer(ByteBuffer toRelease);
+
+  /**
+   * Clone the entire state of the DataReader with the assumption that the
+   * clone will be closed at a different time. Thus, any file handles in the
+   * implementation need to be cloned.
+   * @return a new instance
+   */
+  DataReader clone();
+
+  @Override
+  public void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/DataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/DataReaderFactory.java b/orc/src/java/org/apache/orc/DataReaderFactory.java
deleted file mode 100644
index ec3a0e9..0000000
--- a/orc/src/java/org/apache/orc/DataReaderFactory.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.orc;
-
-import org.apache.orc.impl.DataReaderProperties;
-
-public interface DataReaderFactory {
-
-  DataReader create(DataReaderProperties properties);
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/MetadataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/MetadataReaderFactory.java b/orc/src/java/org/apache/orc/MetadataReaderFactory.java
deleted file mode 100644
index 64629da..0000000
--- a/orc/src/java/org/apache/orc/MetadataReaderFactory.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.orc;
-
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderProperties;
-
-import java.io.IOException;
-
-public interface MetadataReaderFactory {
-
-  MetadataReader create(MetadataReaderProperties properties) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index 2e93254..5845ba6 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -449,4 +449,82 @@ public class OrcUtils {
     return columnId;
   }
 
+  /**
+   * Translate the given rootColumn from the list of types to a TypeDescription.
+   * @param types all of the types
+   * @param rootColumn translate this type
+   * @return a new TypeDescription that matches the given rootColumn
+   */
+  public static
+        TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types,
+                                                int rootColumn) {
+    OrcProto.Type type = types.get(rootColumn);
+    switch (type.getKind()) {
+      case BOOLEAN:
+        return TypeDescription.createBoolean();
+      case BYTE:
+        return TypeDescription.createByte();
+      case SHORT:
+        return TypeDescription.createShort();
+      case INT:
+        return TypeDescription.createInt();
+      case LONG:
+        return TypeDescription.createLong();
+      case FLOAT:
+        return TypeDescription.createFloat();
+      case DOUBLE:
+        return TypeDescription.createDouble();
+      case STRING:
+        return TypeDescription.createString();
+      case CHAR:
+      case VARCHAR: {
+        TypeDescription result = type.getKind() == OrcProto.Type.Kind.CHAR ?
+            TypeDescription.createChar() : TypeDescription.createVarchar();
+        if (type.hasMaximumLength()) {
+          result.withMaxLength(type.getMaximumLength());
+        }
+        return result;
+      }
+      case BINARY:
+        return TypeDescription.createBinary();
+      case TIMESTAMP:
+        return TypeDescription.createTimestamp();
+      case DATE:
+        return TypeDescription.createDate();
+      case DECIMAL: {
+        TypeDescription result = TypeDescription.createDecimal();
+        if (type.hasScale()) {
+          result.withScale(type.getScale());
+        }
+        if (type.hasPrecision()) {
+          result.withPrecision(type.getPrecision());
+        }
+        return result;
+      }
+      case LIST:
+        return TypeDescription.createList(
+            convertTypeFromProtobuf(types, type.getSubtypes(0)));
+      case MAP:
+        return TypeDescription.createMap(
+            convertTypeFromProtobuf(types, type.getSubtypes(0)),
+            convertTypeFromProtobuf(types, type.getSubtypes(1)));
+      case STRUCT: {
+        TypeDescription result = TypeDescription.createStruct();
+        for(int f=0; f < type.getSubtypesCount(); ++f) {
+          result.addField(type.getFieldNames(f),
+              convertTypeFromProtobuf(types, type.getSubtypes(f)));
+        }
+        return result;
+      }
+      case UNION: {
+        TypeDescription result = TypeDescription.createUnion();
+        for(int f=0; f < type.getSubtypesCount(); ++f) {
+          result.addUnionChild(
+              convertTypeFromProtobuf(types, type.getSubtypes(f)));
+        }
+        return result;
+      }
+    }
+    throw new IllegalArgumentException("Unknown ORC type " + type.getKind());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/Reader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/Reader.java b/orc/src/java/org/apache/orc/Reader.java
index be722b5..39de763 100644
--- a/orc/src/java/org/apache/orc/Reader.java
+++ b/orc/src/java/org/apache/orc/Reader.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.orc.impl.MetadataReader;
 
 /**
  * The interface for reading ORC files.
@@ -116,9 +115,15 @@ public interface Reader {
   ColumnStatistics[] getStatistics();
 
   /**
+   * Get the type of rows in this ORC file.
+   */
+  TypeDescription getSchema();
+
+  /**
    * Get the list of types contained in the file. The root type is the first
    * type in the list.
    * @return the list of flattened types
+   * @deprecated use getSchema instead
    */
   List<OrcProto.Type> getTypes();
 
@@ -144,6 +149,7 @@ public interface Reader {
     private Boolean useZeroCopy = null;
     private Boolean skipCorruptRecords = null;
     private TypeDescription schema = null;
+    private DataReader dataReader = null;
 
     /**
      * Set the list of columns to read.
@@ -197,6 +203,11 @@ public interface Reader {
       return this;
     }
 
+    public Options dataReader(DataReader value) {
+      this.dataReader = value;
+      return this;
+    }
+
     /**
      * Set whether to skip corrupt records.
      * @param value the new skip corrupt records flag
@@ -247,6 +258,10 @@ public interface Reader {
       return skipCorruptRecords;
     }
 
+    public DataReader getDataReader() {
+      return dataReader;
+    }
+
     public Options clone() {
       Options result = new Options();
       result.include = include;
@@ -257,6 +272,7 @@ public interface Reader {
       result.columnNames = columnNames;
       result.useZeroCopy = useZeroCopy;
       result.skipCorruptRecords = skipCorruptRecords;
+      result.dataReader = dataReader == null ? null : dataReader.clone();
       return result;
     }
 
@@ -321,11 +337,6 @@ public interface Reader {
   RecordReader rowsOptions(Options options) throws IOException;
 
   /**
-   * @return Metadata reader used to read file metadata.
-   */
-  MetadataReader metadata() throws IOException;
-
-  /**
    * @return List of integers representing version of the file, in order from major to minor.
    */
   List<Integer> getVersionList();
@@ -351,12 +362,6 @@ public interface Reader {
   List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics();
 
   /**
-   * @param useZeroCopy Whether zero-copy read should be used.
-   * @return The default data reader that ORC is using to read bytes from disk.
-   */
-  DataReader createDefaultDataReader(boolean useZeroCopy);
-
-  /**
    * @return Serialized file metadata read from disk for the purposes of caching, etc.
    */
   ByteBuffer getSerializedFileFooter();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/RecordReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/RecordReader.java b/orc/src/java/org/apache/orc/RecordReader.java
index 7229dda..09ba0f0 100644
--- a/orc/src/java/org/apache/orc/RecordReader.java
+++ b/orc/src/java/org/apache/orc/RecordReader.java
@@ -30,13 +30,11 @@ public interface RecordReader {
    * controlled by the callers. Caller need to look at
    * VectorizedRowBatch.size of the retunred object to know the batch
    * size read.
-   * @param previousBatch a row batch object that can be reused by the reader
-   * @return the row batch that was read. The batch will have a non-zero row
-   *         count if the pointer isn't at the end of the file
+   * @param batch a row batch object to read into
+   * @return were more rows available to read?
    * @throws java.io.IOException
    */
-  VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch
-			       ) throws IOException;
+  boolean nextBatch(VectorizedRowBatch batch) throws IOException;
 
   /**
    * Get the row number of the row that will be returned by the following

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java
index bd900ac..b8e057e 100644
--- a/orc/src/java/org/apache/orc/TypeDescription.java
+++ b/orc/src/java/org/apache/orc/TypeDescription.java
@@ -61,7 +61,7 @@ public class TypeDescription {
     LIST("array", false),
     MAP("map", false),
     STRUCT("struct", false),
-    UNION("union", false);
+    UNION("uniontype", false);
 
     Category(String name, boolean isPrimitive) {
       this.name = name;
@@ -258,6 +258,66 @@ public class TypeDescription {
     return id;
   }
 
+  public TypeDescription clone() {
+    TypeDescription result = new TypeDescription(category);
+    result.maxLength = maxLength;
+    result.precision = precision;
+    result.scale = scale;
+    if (fieldNames != null) {
+      result.fieldNames.addAll(fieldNames);
+    }
+    if (children != null) {
+      for(TypeDescription child: children) {
+        TypeDescription clone = child.clone();
+        clone.parent = result;
+        result.children.add(clone);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public int hashCode() {
+    return getId();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || other.getClass() != TypeDescription.class) {
+      return false;
+    }
+    if (other == this) {
+      return true;
+    }
+    TypeDescription castOther = (TypeDescription) other;
+    if (category != castOther.category ||
+        getId() != castOther.getId() ||
+        getMaximumId() != castOther.getMaximumId() ||
+        maxLength != castOther.maxLength ||
+        scale != castOther.scale ||
+        precision != castOther.precision) {
+      return false;
+    }
+    if (children != null) {
+      if (children.size() != castOther.children.size()) {
+        return false;
+      }
+      for (int i = 0; i < children.size(); ++i) {
+        if (!children.get(i).equals(castOther.children.get(i))) {
+          return false;
+        }
+      }
+    }
+    if (category == Category.STRUCT) {
+      for(int i=0; i < fieldNames.size(); ++i) {
+        if (!fieldNames.get(i).equals(castOther.fieldNames.get(i))) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   /**
    * Get the maximum id assigned to this type or its children.
    * The first call will cause all of the the ids in tree to be assigned, so

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/BitFieldReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/BitFieldReader.java b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
index 8d9d3cb..dda7355 100644
--- a/orc/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -137,7 +137,7 @@ public class BitFieldReader {
                          long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
-      if (!previous.isNull[i]) {
+      if (previous.noNulls || !previous.isNull[i]) {
         previous.vector[i] = next();
       } else {
         // The default value of null for int types in vectorized
@@ -150,7 +150,8 @@ public class BitFieldReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+          && ((previous.vector[0] != previous.vector[i]) ||
+          (previous.isNull[0] != previous.isNull[i]))) {
         previous.isRepeating = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
index 49f47d6..bb73d53 100644
--- a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -3,7 +3,7 @@ package org.apache.orc.impl;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 
 import javax.annotation.Nullable;
 
@@ -11,14 +11,18 @@ public final class DataReaderProperties {
 
   private final FileSystem fileSystem;
   private final Path path;
-  private final CompressionCodec codec;
+  private final CompressionKind compression;
   private final boolean zeroCopy;
+  private final int typeCount;
+  private final int bufferSize;
 
   private DataReaderProperties(Builder builder) {
     this.fileSystem = builder.fileSystem;
     this.path = builder.path;
-    this.codec = builder.codec;
+    this.compression = builder.compression;
     this.zeroCopy = builder.zeroCopy;
+    this.typeCount = builder.typeCount;
+    this.bufferSize = builder.bufferSize;
   }
 
   public FileSystem getFileSystem() {
@@ -29,15 +33,22 @@ public final class DataReaderProperties {
     return path;
   }
 
-  @Nullable
-  public CompressionCodec getCodec() {
-    return codec;
+  public CompressionKind getCompression() {
+    return compression;
   }
 
   public boolean getZeroCopy() {
     return zeroCopy;
   }
 
+  public int getTypeCount() {
+    return typeCount;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -46,8 +57,10 @@ public final class DataReaderProperties {
 
     private FileSystem fileSystem;
     private Path path;
-    private CompressionCodec codec;
+    private CompressionKind compression;
     private boolean zeroCopy;
+    private int typeCount;
+    private int bufferSize;
 
     private Builder() {
 
@@ -63,8 +76,8 @@ public final class DataReaderProperties {
       return this;
     }
 
-    public Builder withCodec(CompressionCodec codec) {
-      this.codec = codec;
+    public Builder withCompression(CompressionKind value) {
+      this.compression = value;
       return this;
     }
 
@@ -73,6 +86,16 @@ public final class DataReaderProperties {
       return this;
     }
 
+    public Builder withTypeCount(int value) {
+      this.typeCount = value;
+      return this;
+    }
+
+    public Builder withBufferSize(int value) {
+      this.bufferSize = value;
+      return this;
+    }
+
     public DataReaderProperties build() {
       Preconditions.checkNotNull(fileSystem);
       Preconditions.checkNotNull(path);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
deleted file mode 100644
index fc0d141..0000000
--- a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.orc.impl;
-
-import org.apache.orc.MetadataReaderFactory;
-
-import java.io.IOException;
-
-public final class DefaultMetadataReaderFactory implements MetadataReaderFactory {
-
-  @Override
-  public MetadataReader create(MetadataReaderProperties properties) throws IOException {
-    return new MetadataReaderImpl(properties);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
index 1893afe..851f645 100644
--- a/orc/src/java/org/apache/orc/impl/InStream.java
+++ b/orc/src/java/org/apache/orc/impl/InStream.java
@@ -53,6 +53,9 @@ public abstract class InStream extends InputStream {
     return length;
   }
 
+  @Override
+  public abstract void close();
+
   public static class UncompressedStream extends InStream {
     private List<DiskRange> bytes;
     private long length;
@@ -423,7 +426,6 @@ public abstract class InStream extends InputStream {
 
   /**
    * Create an input stream from a list of buffers.
-   * @param fileName name of the file
    * @param streamName the name of the stream
    * @param buffers the list of ranges of bytes for the stream
    * @param offsets a list of offsets (the same length as input) that must

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index 7dfd289..8bef0f1 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
  * Interface for reading integers.
@@ -57,9 +57,25 @@ public interface IntegerReader {
 
   /**
    * Return the next available vector for values.
-   * @return
+   * @param column the column being read
+   * @param data the vector to read into
+   * @param length the number of numbers to read
+   * @throws IOException
+   */
+   void nextVector(ColumnVector column,
+                   long[] data,
+                   int length
+                   ) throws IOException;
+
+  /**
+   * Return the next available vector for values. Does not change the
+   * value of column.isRepeating.
+   * @param column the column being read
+   * @param data the vector to read into
+   * @param length the number of numbers to read
    * @throws IOException
    */
-   void nextVector(LongColumnVector previous, final int previousLen)
-      throws IOException;
-}
+  void nextVector(ColumnVector column,
+                  int[] data,
+                  int length
+                  ) throws IOException;}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java b/orc/src/java/org/apache/orc/impl/MetadataReader.java
deleted file mode 100644
index 500239d..0000000
--- a/orc/src/java/org/apache/orc/impl/MetadataReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.orc.OrcProto;
-import org.apache.orc.StripeInformation;
-
-public interface MetadataReader extends Closeable {
-  OrcIndex readRowIndex(StripeInformation stripe,
-                                      OrcProto.StripeFooter footer,
-      boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
-      OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException;
-
-  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
deleted file mode 100644
index c3ea74f..0000000
--- a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.OrcProto;
-import org.apache.orc.StripeInformation;
-
-import com.google.common.collect.Lists;
-
-public class MetadataReaderImpl implements MetadataReader {
-  private final FSDataInputStream file;
-  private final CompressionCodec codec;
-  private final int bufferSize;
-  private final int typeCount;
-
-  MetadataReaderImpl(MetadataReaderProperties properties) throws IOException {
-    this.file = properties.getFileSystem().open(properties.getPath());
-    this.codec = properties.getCodec();
-    this.bufferSize = properties.getBufferSize();
-    this.typeCount = properties.getTypeCount();
-  }
-
-  @Override
-  public OrcIndex readRowIndex(StripeInformation stripe,
-      OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes,
-      boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException {
-    if (footer == null) {
-      footer = readStripeFooter(stripe);
-    }
-    if (indexes == null) {
-      indexes = new OrcProto.RowIndex[typeCount];
-    }
-    if (bloomFilterIndices == null) {
-      bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
-    }
-    long offset = stripe.getOffset();
-    List<OrcProto.Stream> streams = footer.getStreamsList();
-    for (int i = 0; i < streams.size(); i++) {
-      OrcProto.Stream stream = streams.get(i);
-      OrcProto.Stream nextStream = null;
-      if (i < streams.size() - 1) {
-        nextStream = streams.get(i+1);
-      }
-      int col = stream.getColumn();
-      int len = (int) stream.getLength();
-      // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
-      // filter and combine the io to read row index and bloom filters for that column together
-      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
-        boolean readBloomFilter = false;
-        if (sargColumns != null && sargColumns[col] &&
-            nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
-          len += nextStream.getLength();
-          i += 1;
-          readBloomFilter = true;
-        }
-        if ((included == null || included[col]) && indexes[col] == null) {
-          byte[] buffer = new byte[len];
-          file.readFully(offset, buffer, 0, buffer.length);
-          ByteBuffer bb = ByteBuffer.wrap(buffer);
-          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-              Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
-              codec, bufferSize));
-          if (readBloomFilter) {
-            bb.position((int) stream.getLength());
-            bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
-                "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
-                nextStream.getLength(), codec, bufferSize));
-          }
-        }
-      }
-      offset += len;
-    }
-
-    OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
-    return index;
-  }
-
-  @Override
-  public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-    long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
-    int tailLength = (int) stripe.getFooterLength();
-
-    // read the footer
-    ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
-    file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
-        Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
-        tailLength, codec, bufferSize));
-  }
-
-  @Override
-  public void close() throws IOException {
-    file.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
deleted file mode 100644
index 321931c..0000000
--- a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.orc.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionCodec;
-
-import javax.annotation.Nullable;
-
-public final class MetadataReaderProperties {
-
-  private final FileSystem fileSystem;
-  private final Path path;
-  private final CompressionCodec codec;
-  private final int bufferSize;
-  private final int typeCount;
-
-  private MetadataReaderProperties(Builder builder) {
-    this.fileSystem = builder.fileSystem;
-    this.path = builder.path;
-    this.codec = builder.codec;
-    this.bufferSize = builder.bufferSize;
-    this.typeCount = builder.typeCount;
-  }
-
-  public FileSystem getFileSystem() {
-    return fileSystem;
-  }
-
-  public Path getPath() {
-    return path;
-  }
-
-  @Nullable
-  public CompressionCodec getCodec() {
-    return codec;
-  }
-
-  public int getBufferSize() {
-    return bufferSize;
-  }
-
-  public int getTypeCount() {
-    return typeCount;
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder {
-
-    private FileSystem fileSystem;
-    private Path path;
-    private CompressionCodec codec;
-    private int bufferSize;
-    private int typeCount;
-
-    private Builder() {
-
-    }
-
-    public Builder withFileSystem(FileSystem fileSystem) {
-      this.fileSystem = fileSystem;
-      return this;
-    }
-
-    public Builder withPath(Path path) {
-      this.path = path;
-      return this;
-    }
-
-    public Builder withCodec(CompressionCodec codec) {
-      this.codec = codec;
-      return this;
-    }
-
-    public Builder withBufferSize(int bufferSize) {
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    public Builder withTypeCount(int typeCount) {
-      this.typeCount = typeCount;
-      return this;
-    }
-
-    public MetadataReaderProperties build() {
-      Preconditions.checkNotNull(fileSystem);
-      Preconditions.checkNotNull(path);
-
-      return new MetadataReaderProperties(this);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 380f3391..24bd051 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
  * A reader that reads a sequence of bytes. A control byte is read before
@@ -92,16 +92,16 @@ public class RunLengthByteReader {
     return result;
   }
 
-  public void nextVector(LongColumnVector previous, long previousLen)
+  public void nextVector(ColumnVector previous, long[] data, long size)
       throws IOException {
     previous.isRepeating = true;
-    for (int i = 0; i < previousLen; i++) {
+    for (int i = 0; i < size; i++) {
       if (!previous.isNull[i]) {
-        previous.vector[i] = next();
+        data[i] = next();
       } else {
         // The default value of null for int types in vectorized
         // processing is 1, so set that if the value is null
-        previous.vector[i] = 1;
+        data[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -109,12 +109,36 @@ public class RunLengthByteReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+          && ((data[0] != data[i]) ||
+              (previous.isNull[0] != previous.isNull[i]))) {
         previous.isRepeating = false;
       }
     }
   }
 
+  /**
+   * Read the next size bytes into the data array, skipping over any slots
+   * where isNull is true.
+   * @param isNull if non-null, skip any rows where isNull[r] is true
+   * @param data the array to read into
+   * @param size the number of elements to read
+   * @throws IOException
+   */
+  public void nextVector(boolean[] isNull, int[] data,
+                         long size) throws IOException {
+    if (isNull == null) {
+      for(int i=0; i < size; ++i) {
+        data[i] = next();
+      }
+    } else {
+      for(int i=0; i < size; ++i) {
+        if (!isNull[i]) {
+          data[i] = next();
+        }
+      }
+    }
+  }
+
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
index 0c90cde..b91a263 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
  * A reader that reads a sequence of integers.
@@ -99,15 +99,17 @@ public class RunLengthIntegerReader implements IntegerReader {
   }
 
   @Override
-  public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        previous.vector[i] = next();
+        data[i] = next();
       } else {
         // The default value of null for int type in vectorized
         // processing is 1, so set that if the value is null
-        previous.vector[i] = 1;
+        data[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -115,13 +117,32 @@ public class RunLengthIntegerReader implements IntegerReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) {
+          && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) {
         previous.isRepeating = false;
       }
     }
   }
 
   @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
+
+  @Override
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index c6d685a..610d9b5 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -21,9 +21,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * A reader that reads a sequence of light weight compressed integers. Refer
@@ -360,15 +360,17 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
   }
 
   @Override
-  public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        previous.vector[i] = next();
+        data[i] = next();
       } else {
         // The default value of null for int type in vectorized
         // processing is 1, so set that if the value is null
-        previous.vector[i] = 1;
+        data[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -376,10 +378,29 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && (previous.vector[i - 1] != previous.vector[i] ||
-          previous.isNull[i - 1] != previous.isNull[i])) {
+          && (data[0] != data[i] ||
+          previous.isNull[0] != previous.isNull[i])) {
         previous.isRepeating = false;
       }
     }
   }
+
+  @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index f8afe06..b2966e0 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -1693,9 +1693,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
+  public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+  public static long NANOS_PER_MILLI = 1000000;
   public static final int MILLIS_PER_SECOND = 1000;
   static final int NANOS_PER_SECOND = 1000000000;
-  static final int MILLIS_PER_NANO  = 1000000;
   public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
 
   private static class TimestampTreeWriter extends TreeWriter {
@@ -2261,32 +2262,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         }
       } else {
         // write the records in runs of the same tag
-        byte prevTag = 0;
-        int currentRun = 0;
-        boolean started = false;
+        int[] currentStart = new int[vec.fields.length];
+        int[] currentLength = new int[vec.fields.length];
         for(int i=0; i < length; ++i) {
-          if (!vec.isNull[i + offset]) {
+          // only need to deal with the non-nulls, since the nulls were dealt
+          // with in the super method.
+          if (vec.noNulls || !vec.isNull[i + offset]) {
             byte tag = (byte) vec.tags[offset + i];
             tags.write(tag);
-            if (!started) {
-              started = true;
-              currentRun = i;
-              prevTag = tag;
-            } else if (tag != prevTag) {
-              childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
-                  offset + currentRun, i - currentRun);
-              currentRun = i;
-              prevTag = tag;
+            if (currentLength[tag] == 0) {
+              // start a new sequence
+              currentStart[tag] = i + offset;
+              currentLength[tag] = 1;
+            } else if (currentStart[tag] + currentLength[tag] == i + offset) {
+              // ok, we are extending the current run for that tag.
+              currentLength[tag] += 1;
+            } else {
+              // otherwise, we need to close off the old run and start a new one
+              childrenWriters[tag].writeBatch(vec.fields[tag],
+                  currentStart[tag], currentLength[tag]);
+              currentStart[tag] = i + offset;
+              currentLength[tag] = 1;
             }
-          } else if (started) {
-            started = false;
-            childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
-                offset + currentRun, i - currentRun);
           }
         }
-        if (started) {
-          childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
-              offset + currentRun, length - currentRun);
+        // write out any left over sequences
+        for(int tag=0; tag < currentStart.length; ++tag) {
+          if (currentLength[tag] != 0) {
+            childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
+                currentLength[tag]);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
index 9ec08f3..b9918f2 100644
--- a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
+++ b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
@@ -3,6 +3,7 @@ package org.apache.orc.impl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -14,7 +15,6 @@ public class TestDataReaderProperties {
 
   private FileSystem mockedFileSystem = mock(FileSystem.class);
   private Path mockedPath = mock(Path.class);
-  private CompressionCodec mockedCodec = mock(CompressionCodec.class);
   private boolean mockedZeroCopy = false;
 
   @Test
@@ -22,12 +22,12 @@ public class TestDataReaderProperties {
     DataReaderProperties properties = DataReaderProperties.builder()
       .withFileSystem(mockedFileSystem)
       .withPath(mockedPath)
-      .withCodec(mockedCodec)
+      .withCompression(CompressionKind.ZLIB)
       .withZeroCopy(mockedZeroCopy)
       .build();
     assertEquals(mockedFileSystem, properties.getFileSystem());
     assertEquals(mockedPath, properties.getPath());
-    assertEquals(mockedCodec, properties.getCodec());
+    assertEquals(CompressionKind.ZLIB, properties.getCompression());
     assertEquals(mockedZeroCopy, properties.getZeroCopy());
   }
 
@@ -39,7 +39,7 @@ public class TestDataReaderProperties {
       .build();
     assertEquals(mockedFileSystem, properties.getFileSystem());
     assertEquals(mockedPath, properties.getPath());
-    assertNull(properties.getCodec());
+    assertNull(properties.getCompression());
     assertFalse(properties.getZeroCopy());
   }
 
@@ -52,7 +52,7 @@ public class TestDataReaderProperties {
   public void testMissingPath() {
     DataReaderProperties.builder()
       .withFileSystem(mockedFileSystem)
-      .withCodec(mockedCodec)
+      .withCompression(CompressionKind.NONE)
       .withZeroCopy(mockedZeroCopy)
       .build();
   }
@@ -61,7 +61,7 @@ public class TestDataReaderProperties {
   public void testMissingFileSystem() {
     DataReaderProperties.builder()
       .withPath(mockedPath)
-      .withCodec(mockedCodec)
+      .withCompression(CompressionKind.NONE)
       .withZeroCopy(mockedZeroCopy)
       .build();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
deleted file mode 100644
index 12e8eb4..0000000
--- a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionCodec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-public class TestMetadataReaderProperties {
-
-  private FileSystem mockedFileSystem = mock(FileSystem.class);
-  private Path mockedPath = mock(Path.class);
-  private CompressionCodec mockedCodec = mock(CompressionCodec.class);
-  private int mockedBufferSize = 0;
-  private int mockedTypeCount = 0;
-
-  @Test
-  public void testCompleteBuild() {
-    MetadataReaderProperties properties = MetadataReaderProperties.builder()
-      .withFileSystem(mockedFileSystem)
-      .withPath(mockedPath)
-      .withCodec(mockedCodec)
-      .withBufferSize(mockedBufferSize)
-      .withTypeCount(mockedTypeCount)
-      .build();
-    assertEquals(mockedFileSystem, properties.getFileSystem());
-    assertEquals(mockedPath, properties.getPath());
-    assertEquals(mockedCodec, properties.getCodec());
-    assertEquals(mockedBufferSize, properties.getBufferSize());
-    assertEquals(mockedTypeCount, properties.getTypeCount());
-  }
-
-  @Test
-  public void testMissingNonRequiredArgs() {
-    MetadataReaderProperties properties = MetadataReaderProperties.builder()
-      .withFileSystem(mockedFileSystem)
-      .withPath(mockedPath)
-      .build();
-    assertEquals(mockedFileSystem, properties.getFileSystem());
-    assertEquals(mockedPath, properties.getPath());
-    assertNull(properties.getCodec());
-    assertEquals(0, properties.getBufferSize());
-    assertEquals(0, properties.getTypeCount());
-  }
-
-  @Test(expected = java.lang.NullPointerException.class)
-  public void testEmptyBuild() {
-    MetadataReaderProperties.builder().build();
-  }
-
-  @Test(expected = java.lang.NullPointerException.class)
-  public void testMissingPath() {
-    MetadataReaderProperties.builder()
-      .withFileSystem(mockedFileSystem)
-      .withCodec(mockedCodec)
-      .withBufferSize(mockedBufferSize)
-      .build();
-  }
-
-  @Test(expected = java.lang.NullPointerException.class)
-  public void testMissingFileSystem() {
-    MetadataReaderProperties.builder()
-      .withPath(mockedPath)
-      .withCodec(mockedCodec)
-      .withBufferSize(mockedBufferSize)
-      .build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 0724191..82a97e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -215,12 +215,9 @@ public class VectorizedRowBatchCtx {
     LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
     int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
     VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
-
-    for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
-      if (columnsToIncludeTruncated[i]) {
-        TypeInfo typeInfo = rowColumnTypeInfos[i];
-        result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
-      }
+    for (int i = 0; i < dataColumnCount; i++) {
+      TypeInfo typeInfo = rowColumnTypeInfos[i];
+      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
     }
 
     for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
@@ -476,8 +473,8 @@ public class VectorizedRowBatchCtx {
             bcv.isNull[0] = true;
             bcv.isRepeating = true;
           } else {
-            bcv.fill(sVal.getBytes());
-            bcv.isNull[0] = false;
+            bcv.setVal(0, sVal.getBytes());
+            bcv.isRepeating = true;
           }
         }
         break;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
deleted file mode 100644
index de3471c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.orc.DataReader;
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.impl.DataReaderProperties;
-
-public final class DefaultDataReaderFactory implements DataReaderFactory {
-
-  @Override
-  public DataReader create(DataReaderProperties properties) {
-    return RecordReaderUtils.createDefaultDataReader(properties);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fe0be7b..fcb8ca4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -301,7 +301,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ false);
+    TypeDescription schema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE);
 
     Reader.Options options = new Reader.Options().range(offset, length);
     options.schema(schema);
@@ -1743,7 +1743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ true);
+    TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
 
     final Reader reader;
     final int bucket;
@@ -1994,10 +1994,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   /**
    * Convert a Hive type property string that contains separated type names into a list of
    * TypeDescription objects.
+   * @param hiveTypeProperty the desired types from hive
+   * @param maxColumns the maximum number of desired columns
    * @return the list of TypeDescription objects.
    */
-  public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
-      String hiveTypeProperty) {
+  public static ArrayList<TypeDescription>
+      typeDescriptionsFromHiveTypeProperty(String hiveTypeProperty,
+                                           int maxColumns) {
 
     // CONSDIER: We need a type name parser for TypeDescription.
 
@@ -2005,6 +2008,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
     for (TypeInfo typeInfo : typeInfoList) {
       typeDescrList.add(convertTypeInfo(typeInfo));
+      if (typeDescrList.size() >= maxColumns) {
+        break;
+      }
     }
     return typeDescrList;
   }
@@ -2091,8 +2097,18 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcidRead)
-      throws IOException {
+  /**
+   * Generate the desired schema for reading the file.
+   * @param conf the configuration
+   * @param isAcidRead is this an acid format?
+   * @param dataColumns the desired number of data columns for vectorized read
+   * @return the desired schema or null if schema evolution isn't enabled
+   * @throws IOException
+   */
+  public static TypeDescription getDesiredRowTypeDescr(Configuration conf,
+                                                       boolean isAcidRead,
+                                                       int dataColumns
+                                                       ) throws IOException {
 
     String columnNameProperty = null;
     String columnTypeProperty = null;
@@ -2115,8 +2131,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           haveSchemaEvolutionProperties = false;
         } else {
           schemaEvolutionTypeDescrs =
-              typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
-          if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+              typeDescriptionsFromHiveTypeProperty(columnTypeProperty,
+                  dataColumns);
+          if (schemaEvolutionTypeDescrs.size() !=
+              Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
             haveSchemaEvolutionProperties = false;
           }
         }
@@ -2147,8 +2165,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         return null;
       }
       schemaEvolutionTypeDescrs =
-          typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
-      if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+          typeDescriptionsFromHiveTypeProperty(columnTypeProperty, dataColumns);
+      if (schemaEvolutionTypeDescrs.size() !=
+          Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
         return null;
       }
 
@@ -2162,7 +2181,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         }
         columnNum++;
       }
-      if (virtualColumnClipNum != -1) {
+      if (virtualColumnClipNum != -1 && virtualColumnClipNum < dataColumns) {
         schemaEvolutionColumnNames =
             Lists.newArrayList(schemaEvolutionColumnNames.subList(0, virtualColumnClipNum));
         schemaEvolutionTypeDescrs = Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum));
@@ -2179,7 +2198,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     // Desired schema does not include virtual columns or partition columns.
     TypeDescription result = TypeDescription.createStruct();
-    for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+    for (int i = 0; i < schemaEvolutionTypeDescrs.size(); i++) {
       result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 1fce282..0dd58b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -447,7 +447,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.length = options.getLength();
     this.validTxnList = validTxnList;
 
-    TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ true);
+    TypeDescription typeDescr =
+        OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
 
     objectInspector = OrcRecordUpdater.createEventSchema
         (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 822ef14..b7437be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -23,23 +23,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.MetadataReaderFactory;
+import com.google.common.collect.Lists;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.impl.ColumnStatisticsImpl;
 import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
 import org.apache.orc.FileMetaInfo;
 import org.apache.orc.FileMetadata;
-import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.DefaultMetadataReaderFactory;
 import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderProperties;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.StripeStatistics;
 import org.slf4j.Logger;
@@ -56,8 +53,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcProto;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.protobuf.CodedInputStream;
 
 public class ReaderImpl implements Reader {
@@ -75,13 +70,12 @@ public class ReaderImpl implements Reader {
   private final List<OrcProto.StripeStatistics> stripeStats;
   private final int metadataSize;
   protected final List<OrcProto.Type> types;
+  private final TypeDescription schema;
   private final List<OrcProto.UserMetadataItem> userMetadata;
   private final List<OrcProto.ColumnStatistics> fileStats;
   private final List<StripeInformation> stripes;
   protected final int rowIndexStride;
   private final long contentLength, numberOfRows;
-  private final MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory();
-  private final DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory();
 
   private final ObjectInspector inspector;
   private long deserializedSize = -1;
@@ -248,6 +242,11 @@ public class ReaderImpl implements Reader {
     return result;
   }
 
+  @Override
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
   /**
    * Ensure this is an ORC file to prevent users from trying to read text
    * files or RC files as ORC files.
@@ -391,11 +390,13 @@ public class ReaderImpl implements Reader {
       this.writerVersion = footerMetaData.writerVersion;
       this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
     }
+    this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
   }
+
   /**
    * Get the WriterVersion based on the ORC file postscript.
    * @param writerVersion the integer writer version
-   * @return
+   * @return the writer version of the file
    */
   static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
     for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
@@ -672,20 +673,7 @@ public class ReaderImpl implements Reader {
       Arrays.fill(include, true);
       options.include(include);
     }
-
-    return RecordReaderImpl.builder()
-        .withMetadataReaderFactory(metadataReaderFactory)
-        .withDataReaderFactory(dataReaderFactory)
-        .withStripes(this.getStripes())
-        .withFileSystem(fileSystem)
-        .withPath(path)
-        .withOptions(options)
-        .withTypes(types)
-        .withCodec(codec)
-        .withBufferSize(bufferSize)
-        .withStrideRate(rowIndexStride)
-        .withConf(conf)
-        .build();
+    return new RecordReaderImpl(this, options);
   }
 
 
@@ -837,7 +825,7 @@ public class ReaderImpl implements Reader {
   }
 
   private int getLastIdx() {
-    Set<Integer> indices = Sets.newHashSet();
+    Set<Integer> indices = new HashSet<>();
     for (OrcProto.Type type : types) {
       indices.addAll(type.getSubtypesList());
     }
@@ -856,7 +844,7 @@ public class ReaderImpl implements Reader {
 
   @Override
   public List<StripeStatistics> getStripeStatistics() {
-    List<StripeStatistics> result = Lists.newArrayList();
+    List<StripeStatistics> result = new ArrayList<>();
     for (OrcProto.StripeStatistics ss : stripeStats) {
       result.add(new StripeStatistics(ss.getColStatsList()));
     }
@@ -868,17 +856,6 @@ public class ReaderImpl implements Reader {
   }
 
   @Override
-  public MetadataReader metadata() throws IOException {
-    return metadataReaderFactory.create(MetadataReaderProperties.builder()
-      .withBufferSize(bufferSize)
-      .withCodec(codec)
-      .withFileSystem(fileSystem)
-      .withPath(path)
-      .withTypeCount(types.size())
-      .build());
-  }
-
-  @Override
   public List<Integer> getVersionList() {
     return versionList;
   }
@@ -889,16 +866,6 @@ public class ReaderImpl implements Reader {
   }
 
   @Override
-  public DataReader createDefaultDataReader(boolean useZeroCopy) {
-    return dataReaderFactory.create(DataReaderProperties.builder()
-      .withFileSystem(fileSystem)
-      .withPath(path)
-      .withCodec(codec)
-      .withZeroCopy(useZeroCopy)
-      .build());
-  }
-
-  @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder();
     buffer.append("ORC Reader(");


[3/5] hive git commit: HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
index cc7182f..fe87794 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
@@ -28,11 +28,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hive.common.util.HiveTestUtils;
 import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -53,17 +54,12 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.orc.ColumnStatistics;
-import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.MetadataReaderFactory;
 import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.ColumnStatisticsImpl;
 import org.apache.orc.OrcProto;
 
-import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderProperties;
 import org.junit.Test;
 import org.mockito.MockSettings;
 import org.mockito.Mockito;
@@ -1642,51 +1638,42 @@ public class TestRecordReaderImpl {
   @Test
   public void testClose() throws Exception {
     DataReader mockedDataReader = mock(DataReader.class);
-    MetadataReader mockedMetadataReader = mock(MetadataReader.class);
-
-    closeMockedRecordReader(mockedDataReader, mockedMetadataReader);
+    closeMockedRecordReader(mockedDataReader);
 
     verify(mockedDataReader, atLeastOnce()).close();
-    verify(mockedMetadataReader, atLeastOnce()).close();
   }
 
   @Test
   public void testCloseWithException() throws Exception {
     DataReader mockedDataReader = mock(DataReader.class);
-    MetadataReader mockedMetadataReader = mock(MetadataReader.class);
     doThrow(IOException.class).when(mockedDataReader).close();
 
     try {
-      closeMockedRecordReader(mockedDataReader, mockedMetadataReader);
+      closeMockedRecordReader(mockedDataReader);
       fail("Exception should have been thrown when Record Reader was closed");
     } catch (IOException expected) {
 
     }
 
-    verify(mockedMetadataReader, atLeastOnce()).close();
     verify(mockedDataReader, atLeastOnce()).close();
   }
 
-  private void closeMockedRecordReader(DataReader mockedDataReader,
-                                       MetadataReader mockedMetadataReader) throws IOException {
-    DataReaderFactory mockedDataReaderFactory = mock(DataReaderFactory.class);
-    MetadataReaderFactory mockedMetadataReaderFactory = mock(MetadataReaderFactory.class);
-    when(mockedDataReaderFactory.create(any(DataReaderProperties.class))).thenReturn(mockedDataReader);
-    when(mockedMetadataReaderFactory.create(any(MetadataReaderProperties.class))).thenReturn(mockedMetadataReader);
-
-    RecordReader recordReader = RecordReaderImpl.builder()
-      .withBufferSize(0)
-      .withCodec(mock(CompressionCodec.class))
-      .withConf(mock(Configuration.class))
-      .withFileSystem(mock(FileSystem.class))
-      .withOptions(mock(Reader.Options.class))
-      .withPath(mock(Path.class))
-      .withStrideRate(0)
-      .withStripes(Collections.singletonList(mock(StripeInformation.class)))
-      .withTypes(Collections.singletonList(OrcProto.Type.getDefaultInstance()))
-      .withDataReaderFactory(mockedDataReaderFactory)
-      .withMetadataReaderFactory(mockedMetadataReaderFactory)
-      .build();
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+
+  private void closeMockedRecordReader(DataReader mockedDataReader) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    fs.delete(workDir, true);
+    fs.mkdirs(workDir);
+    Path path = new Path(workDir, "empty.orc");
+    Writer writer = OrcFile.createWriter(path, OrcFile.writerOptions(conf)
+        .setSchema(TypeDescription.createLong()));
+    writer.close();
+    Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+
+    RecordReader recordReader = reader.rowsOptions(new Reader.Options()
+        .dataReader(mockedDataReader));
 
     recordReader.close();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java
index 2a82092..96af65a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java
@@ -51,11 +51,11 @@ public class TestTypeDescription {
             .addField("f4", TypeDescription.createDouble())
             .addField("f5", TypeDescription.createBoolean()))
         .addField("f6", TypeDescription.createChar().withMaxLength(100));
-    assertEquals("struct<f1:union<tinyint,decimal(20,10)>,f2:struct<f3:date,f4:double,f5:boolean>,f6:char(100)>",
+    assertEquals("struct<f1:uniontype<tinyint,decimal(20,10)>,f2:struct<f3:date,f4:double,f5:boolean>,f6:char(100)>",
         struct.toString());
     assertEquals(
         "{\"category\": \"struct\", \"id\": 0, \"max\": 8, \"fields\": [\n" +
-            "  \"f1\": {\"category\": \"union\", \"id\": 1, \"max\": 3, \"children\": [\n" +
+            "  \"f1\": {\"category\": \"uniontype\", \"id\": 1, \"max\": 3, \"children\": [\n" +
             "    {\"category\": \"tinyint\", \"id\": 2, \"max\": 2},\n" +
             "    {\"category\": \"decimal\", \"id\": 3, \"max\": 3, \"precision\": 20, \"scale\": 10}]},\n" +
             "  \"f2\": {\"category\": \"struct\", \"id\": 4, \"max\": 7, \"fields\": [\n" +


[4/5] hive git commit: HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 9cfcc0e..2199b11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -27,12 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closer;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.MetadataReaderFactory;
-import org.apache.orc.OrcUtils;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -42,12 +38,9 @@ import org.apache.orc.DateColumnStatistics;
 import org.apache.orc.DecimalColumnStatistics;
 import org.apache.orc.DoubleColumnStatistics;
 import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.DefaultMetadataReaderFactory;
 import org.apache.orc.impl.InStream;
 import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.OrcConf;
-import org.apache.orc.impl.MetadataReaderProperties;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.PositionProvider;
 import org.apache.orc.impl.StreamName;
@@ -56,14 +49,11 @@ import org.apache.orc.StripeInformation;
 import org.apache.orc.TimestampColumnStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -103,8 +93,6 @@ public class RecordReaderImpl implements RecordReader {
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
-  private final Configuration conf;
-  private final MetadataReader metadata;
   private final DataReader dataReader;
 
   /**
@@ -146,130 +134,36 @@ public class RecordReaderImpl implements RecordReader {
     return result;
   }
 
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder {
-    private Reader.Options options;
-    private CompressionCodec codec;
-    private List<OrcProto.Type> types;
-    private List<StripeInformation> stripes;
-    private int bufferSize;
-    private FileSystem fileSystem;
-    private Path path;
-    private Configuration conf;
-    private long strideRate;
-    private MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory();
-    private DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory();
-
-    private Builder() {
-
-    }
-
-    public Builder withOptions(Reader.Options options) {
-      this.options = options;
-      return this;
-    }
-
-    public Builder withCodec(CompressionCodec codec) {
-      this.codec = codec;
-      return this;
-    }
-
-    public Builder withTypes(List<OrcProto.Type> types) {
-      this.types = types;
-      return this;
-    }
-
-    public Builder withStripes(List<StripeInformation> stripes) {
-      this.stripes = stripes;
-      return this;
-    }
-
-    public Builder withBufferSize(int bufferSize) {
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    public Builder withFileSystem(FileSystem fileSystem) {
-      this.fileSystem = fileSystem;
-      return this;
-    }
-
-    public Builder withPath(Path path) {
-      this.path = path;
-      return this;
-    }
-
-    public Builder withConf(Configuration conf) {
-      this.conf = conf;
-      return this;
-    }
-
-    public Builder withStrideRate(long strideRate) {
-      this.strideRate = strideRate;
-      return this;
-    }
-
-    public Builder withMetadataReaderFactory(MetadataReaderFactory metadataReaderFactory) {
-      this.metadataReaderFactory = metadataReaderFactory;
-      return this;
-    }
-
-    public Builder withDataReaderFactory(DataReaderFactory dataReaderFactory) {
-      this.dataReaderFactory = dataReaderFactory;
-      return this;
-    }
-
-    public RecordReaderImpl build() throws IOException {
-      Preconditions.checkNotNull(metadataReaderFactory);
-      Preconditions.checkNotNull(dataReaderFactory);
-      Preconditions.checkNotNull(options);
-      Preconditions.checkNotNull(types);
-      Preconditions.checkNotNull(stripes);
-      Preconditions.checkNotNull(fileSystem);
-      Preconditions.checkNotNull(path);
-      Preconditions.checkNotNull(conf);
-
-      return new RecordReaderImpl(this);
-    }
-  }
-
-  private RecordReaderImpl(Builder builder) throws IOException {
-    Reader.Options options = builder.options;
-    this.types = builder.types;
-    TreeReaderFactory.TreeReaderSchema treeReaderSchema;
+  protected RecordReaderImpl(ReaderImpl fileReader,
+                             Reader.Options options) throws IOException {
+    SchemaEvolution treeReaderSchema;
+    this.included = options.getInclude();
+    included[0] = true;
     if (options.getSchema() == null) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("Schema on read not provided -- using file schema " + types.toString());
+        LOG.info("Schema on read not provided -- using file schema " +
+            fileReader.getSchema());
       }
-      treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types);
+      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
     } else {
 
       // Now that we are creating a record reader for a file, validate that the schema to read
       // is compatible with the file schema.
       //
-      List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
-      treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
-    }
-    this.path = builder.path;
-    this.codec = builder.codec;
-    this.bufferSize = builder.bufferSize;
-    this.included = options.getInclude();
-    this.conf = builder.conf;
-    this.rowIndexStride = builder.strideRate;
-    this.metadata = builder.metadataReaderFactory.create(MetadataReaderProperties.builder()
-        .withFileSystem(builder.fileSystem)
-        .withPath(path)
-        .withCodec(codec)
-        .withBufferSize(bufferSize)
-        .withTypeCount(types.size())
-        .build());
+      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
+          options.getSchema(),
+          included);
+    }
+    this.path = fileReader.path;
+    this.codec = fileReader.codec;
+    this.types = fileReader.types;
+    this.bufferSize = fileReader.bufferSize;
+    this.rowIndexStride = fileReader.rowIndexStride;
+    FileSystem fileSystem = fileReader.fileSystem;
     SearchArgument sarg = options.getSearchArgument();
-    if (sarg != null && builder.strideRate != 0) {
+    if (sarg != null && rowIndexStride != 0) {
       sargApp = new SargApplier(
-          sarg, options.getColumnNames(), builder.strideRate, types, included.length);
+          sarg, options.getColumnNames(), rowIndexStride, types, included.length);
     } else {
       sargApp = null;
     }
@@ -277,7 +171,7 @@ public class RecordReaderImpl implements RecordReader {
     long skippedRows = 0;
     long offset = options.getOffset();
     long maxOffset = options.getMaxOffset();
-    for(StripeInformation stripe: builder.stripes) {
+    for(StripeInformation stripe: fileReader.getStripes()) {
       long stripeStart = stripe.getOffset();
       if (offset > stripeStart) {
         skippedRows += stripe.getNumberOfRows();
@@ -289,25 +183,30 @@ public class RecordReaderImpl implements RecordReader {
 
     Boolean zeroCopy = options.getUseZeroCopy();
     if (zeroCopy == null) {
-      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf);
+      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+    }
+    if (options.getDataReader() == null) {
+      dataReader = RecordReaderUtils.createDefaultDataReader(
+          DataReaderProperties.builder()
+              .withBufferSize(bufferSize)
+              .withCompression(fileReader.compressionKind)
+              .withFileSystem(fileSystem)
+              .withPath(path)
+              .withTypeCount(types.size())
+              .withZeroCopy(zeroCopy)
+              .build());
+    } else {
+      dataReader = options.getDataReader();
     }
-    // TODO: we could change the ctor to pass this externally
-    this.dataReader = builder.dataReaderFactory.create(DataReaderProperties.builder()
-      .withFileSystem(builder.fileSystem)
-      .withCodec(codec)
-      .withPath(path)
-      .withZeroCopy(zeroCopy)
-      .build());
-    this.dataReader.open();
-
     firstRow = skippedRows;
     totalRowCount = rows;
     Boolean skipCorrupt = options.getSkipCorruptRecords();
     if (skipCorrupt == null) {
-      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
+      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
     }
 
-    reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
+    reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
+        treeReaderSchema, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
@@ -333,10 +232,10 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-    return metadata.readStripeFooter(stripe);
+    return dataReader.readStripeFooter(stripe);
   }
 
-  static enum Location {
+  enum Location {
     BEFORE, MIN, MIDDLE, MAX, AFTER
   }
 
@@ -895,7 +794,7 @@ public class RecordReaderImpl implements RecordReader {
     return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
   }
 
-  private void clearStreams() throws IOException {
+  private void clearStreams()  {
     // explicit close of all streams to de-ref ByteBuffers
     for (InStream is : streams.values()) {
       is.close();
@@ -1149,31 +1048,27 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   @Override
-  public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
     try {
-      final VectorizedRowBatch result;
       if (rowInStripe >= rowCountInStripe) {
         currentStripe += 1;
+        if (currentStripe >= stripes.size()) {
+          batch.size = 0;
+          return false;
+        }
         readStripe();
       }
 
-      final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
+      int batchSize = computeBatchSize(batch.getMaxSize());
 
       rowInStripe += batchSize;
-      if (previous == null) {
-        ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
-        result = new VectorizedRowBatch(cols.length);
-        result.cols = cols;
-      } else {
-        result = previous;
-        result.selectedInUse = false;
-        reader.setVectorColumnCount(result.getDataColumnCount());
-        reader.nextVector(result.cols, batchSize);
-      }
+      reader.setVectorColumnCount(batch.getDataColumnCount());
+      reader.nextBatch(batch, batchSize);
 
-      result.size = batchSize;
+      batch.size = (int) batchSize;
+      batch.selectedInUse = false;
       advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return result;
+      return batch.size  != 0;
     } catch (IOException e) {
       // Rethrow exception with file name in log message
       throw new IOException("Error reading file: " + path, e);
@@ -1216,16 +1111,8 @@ public class RecordReaderImpl implements RecordReader {
 
   @Override
   public void close() throws IOException {
-    Closer closer = Closer.create();
-    try {
-      closer.register(metadata);
-      closer.register(dataReader);
-      clearStreams();
-    } catch (IOException e) {
-      throw closer.rethrow(e);
-    } finally {
-      closer.close();
-    }
+    clearStreams();
+    dataReader.close();
   }
 
   @Override
@@ -1244,10 +1131,6 @@ public class RecordReaderImpl implements RecordReader {
     return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
   }
 
-  MetadataReader getMetadataReader() {
-    return metadata;
-  }
-
   private int findStripe(long rowNumber) {
     for (int i = 0; i < stripes.size(); i++) {
       StripeInformation stripe = stripes.get(i);
@@ -1276,8 +1159,8 @@ public class RecordReaderImpl implements RecordReader {
       sargColumns = sargColumns == null ?
           (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
     }
-    return metadata.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
-        bloomFilterIndex);
+    return dataReader.readRowIndex(stripe, stripeFooter, included, indexes,
+        sargColumns, bloomFilterIndex);
   }
 
   private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
index 177721d..4192588 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
@@ -44,6 +46,8 @@ import org.apache.orc.impl.DirectDecompressionCodec;
 import org.apache.orc.OrcProto;
 
 import com.google.common.collect.ComparisonChain;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.OutStream;
 
 /**
@@ -53,34 +57,130 @@ public class RecordReaderUtils {
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   private static class DefaultDataReader implements DataReader {
-    private FSDataInputStream file;
-    private ByteBufferAllocatorPool pool;
-    private ZeroCopyReaderShim zcr;
-    private FileSystem fs;
-    private Path path;
-    private boolean useZeroCopy;
-    private CompressionCodec codec;
+    private FSDataInputStream file = null;
+    private final ByteBufferAllocatorPool pool;
+    private ZeroCopyReaderShim zcr = null;
+    private final FileSystem fs;
+    private final Path path;
+    private final boolean useZeroCopy;
+    private final CompressionCodec codec;
+    private final int bufferSize;
+    private final int typeCount;
+
+    private DefaultDataReader(DefaultDataReader other) {
+      this.pool = other.pool;
+      this.zcr = other.zcr;
+      this.bufferSize = other.bufferSize;
+      this.typeCount = other.typeCount;
+      this.fs = other.fs;
+      this.path = other.path;
+      this.useZeroCopy = other.useZeroCopy;
+      this.codec = other.codec;
+    }
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
       this.useZeroCopy = properties.getZeroCopy();
-      this.codec = properties.getCodec();
+      this.codec = WriterImpl.createCodec(properties.getCompression());
+      this.bufferSize = properties.getBufferSize();
+      this.typeCount = properties.getTypeCount();
+      if (useZeroCopy) {
+        this.pool = new ByteBufferAllocatorPool();
+      } else {
+        this.pool = null;
+      }
     }
 
     @Override
     public void open() throws IOException {
       this.file = fs.open(path);
       if (useZeroCopy) {
-        pool = new ByteBufferAllocatorPool();
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
-        pool = null;
         zcr = null;
       }
     }
 
     @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      if (file == null) {
+        open();
+      }
+      if (footer == null) {
+        footer = readStripeFooter(stripe);
+      }
+      if (indexes == null) {
+        indexes = new OrcProto.RowIndex[typeCount];
+      }
+      if (bloomFilterIndices == null) {
+        bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+      }
+      long offset = stripe.getOffset();
+      List<OrcProto.Stream> streams = footer.getStreamsList();
+      for (int i = 0; i < streams.size(); i++) {
+        OrcProto.Stream stream = streams.get(i);
+        OrcProto.Stream nextStream = null;
+        if (i < streams.size() - 1) {
+          nextStream = streams.get(i+1);
+        }
+        int col = stream.getColumn();
+        int len = (int) stream.getLength();
+        // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+        // filter and combine the io to read row index and bloom filters for that column together
+        if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+          boolean readBloomFilter = false;
+          if (sargColumns != null && sargColumns[col] &&
+              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+            len += nextStream.getLength();
+            i += 1;
+            readBloomFilter = true;
+          }
+          if ((included == null || included[col]) && indexes[col] == null) {
+            byte[] buffer = new byte[len];
+            file.readFully(offset, buffer, 0, buffer.length);
+            ByteBuffer bb = ByteBuffer.wrap(buffer);
+            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+                Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+                codec, bufferSize));
+            if (readBloomFilter) {
+              bb.position((int) stream.getLength());
+              bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+                  "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+                  nextStream.getLength(), codec, bufferSize));
+            }
+          }
+        }
+        offset += len;
+      }
+
+      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+      return index;
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+      if (file == null) {
+        open();
+      }
+      long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+      int tailLength = (int) stripe.getFooterLength();
+
+      // read the footer
+      ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+      file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+      return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+          Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+          tailLength, codec, bufferSize));
+    }
+
+    @Override
     public DiskRangeList readFileData(
         DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
       return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
@@ -106,9 +206,14 @@ public class RecordReaderUtils {
       zcr.releaseBuffer(buffer);
     }
 
+    @Override
+    public DataReader clone() {
+      return new DefaultDataReader(this);
+    }
+
   }
 
-  static DataReader createDefaultDataReader(DataReaderProperties properties) {
+  public static DataReader createDefaultDataReader(DataReaderProperties properties) {
     return new DefaultDataReader(properties);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
index f28ca13..6747691 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
 import org.apache.orc.TypeDescription;
 
 /**
@@ -34,103 +33,134 @@ import org.apache.orc.TypeDescription;
  * has been schema evolution.
  */
 public class SchemaEvolution {
-
+  private final Map<TypeDescription, TypeDescription> readerToFile;
+  private final boolean[] included;
+  private final TypeDescription readerSchema;
   private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
 
-  public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
-      List<OrcProto.Type> schemaTypes) throws IOException {
+  public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
+    this.included = included;
+    readerToFile = null;
+    this.readerSchema = readerSchema;
+  }
 
-    // For ACID, the row is the ROW field in the outer STRUCT.
-    final boolean isAcid = checkAcidSchema(fileTypes);
-    final List<OrcProto.Type> rowSchema;
-    int rowSubtype;
-    if (isAcid) {
-      rowSubtype = OrcRecordUpdater.ROW + 1;
-      rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+  public SchemaEvolution(TypeDescription fileSchema,
+                         TypeDescription readerSchema,
+                         boolean[] included) throws IOException {
+    readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
+    this.included = included;
+    if (checkAcidSchema(fileSchema)) {
+      this.readerSchema = createEventSchema(readerSchema);
     } else {
-      rowSubtype = 0;
-      rowSchema = fileTypes;
+      this.readerSchema = readerSchema;
     }
+    buildMapping(fileSchema, this.readerSchema);
+  }
 
-    // Do checking on the overlap.  Additional columns will be defaulted to NULL.
-
-    int numFileColumns = rowSchema.get(0).getSubtypesCount();
-    int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
-
-    int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
-
-    /**
-     * Check type promotion.
-     *
-     * Currently, we only support integer type promotions that can be done "implicitly".
-     * That is, we know that using a bigger integer tree reader on the original smaller integer
-     * column will "just work".
-     *
-     * In the future, other type promotions might require type conversion.
-     */
-    // short -> int -> bigint as same integer readers are used for the above types.
-
-    for (int i = 0; i < numReadColumns; i++) {
-      OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
-      OrcProto.Type rColType = schemaTypes.get(i);
-      if (!fColType.getKind().equals(rColType.getKind())) {
-
-        boolean ok = false;
-        if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+  public TypeDescription getReaderSchema() {
+    return readerSchema;
+  }
 
-          if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
-              rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-            // type promotion possible, converting SHORT to INT/LONG requested type
-            ok = true;
-          }
-        } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+  public TypeDescription getFileType(TypeDescription readerType) {
+    TypeDescription result;
+    if (readerToFile == null) {
+      if (included == null || included[readerType.getId()]) {
+        result = readerType;
+      } else {
+        result = null;
+      }
+    } else {
+      result = readerToFile.get(readerType);
+    }
+    return result;
+  }
 
-          if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-            // type promotion possible, converting INT to LONG requested type
-            ok = true;
+  void buildMapping(TypeDescription fileType,
+                    TypeDescription readerType) throws IOException {
+    // if the column isn't included, don't map it
+    if (included != null && !included[readerType.getId()]) {
+      return;
+    }
+    boolean isOk = true;
+    // check the easy case first
+    if (fileType.getCategory() == readerType.getCategory()) {
+      switch (readerType.getCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case FLOAT:
+        case STRING:
+        case TIMESTAMP:
+        case BINARY:
+        case DATE:
+          // these are always a match
+          break;
+        case CHAR:
+        case VARCHAR:
+          isOk = fileType.getMaxLength() == readerType.getMaxLength();
+          break;
+        case DECIMAL:
+          // TODO we don't enforce scale and precision checks, but probably should
+          break;
+        case UNION:
+        case MAP:
+        case LIST: {
+          // these must be an exact match
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() == readerChildren.size()) {
+            for(int i=0; i < fileChildren.size(); ++i) {
+              buildMapping(fileChildren.get(i), readerChildren.get(i));
+            }
+          } else {
+            isOk = false;
           }
+          break;
         }
-
-        if (!ok) {
-          throw new IOException("ORC does not support type conversion from " +
-              fColType.getKind().name() + " to " + rColType.getKind().name());
+        case STRUCT: {
+          // allow either side to have fewer fields than the other
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
+          for(int i=0; i < jointSize; ++i) {
+            buildMapping(fileChildren.get(i), readerChildren.get(i));
+          }
+          break;
         }
+        default:
+          throw new IllegalArgumentException("Unknown type " + readerType);
       }
-    }
-
-    List<OrcProto.Type> fullSchemaTypes;
-
-    if (isAcid) {
-      fullSchemaTypes = new ArrayList<OrcProto.Type>();
-
-      // This copies the ACID struct type which is subtype = 0.
-      // It has field names "operation" through "row".
-      // And we copy the types for all fields EXCEPT ROW (which must be last!).
-
-      for (int i = 0; i < rowSubtype; i++) {
-        fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+    } else {
+      switch (fileType.getCategory()) {
+        case SHORT:
+          if (readerType.getCategory() != TypeDescription.Category.INT &&
+              readerType.getCategory() != TypeDescription.Category.LONG) {
+            isOk = false;
+          }
+          break;
+        case INT:
+          if (readerType.getCategory() != TypeDescription.Category.LONG) {
+            isOk = false;
+          }
+          break;
+        default:
+          isOk = false;
       }
-
-      // Add the row struct type.
-      OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+    }
+    if (isOk) {
+      readerToFile.put(readerType, fileType);
     } else {
-      fullSchemaTypes = schemaTypes;
+      throw new IOException("ORC does not support type conversion from " +
+          fileType + " to " + readerType);
     }
-
-    int innerStructSubtype = rowSubtype;
-
-    // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
-    //     " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
-
-    return new TreeReaderSchema().
-        fileTypes(fileTypes).
-        schemaTypes(fullSchemaTypes).
-        innerStructSubtype(innerStructSubtype);
   }
 
-  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
-    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
-      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+  private static boolean checkAcidSchema(TypeDescription type) {
+    if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      List<String> rootFields = type.getFieldNames();
       if (acidEventFieldNames.equals(rootFields)) {
         return true;
       }
@@ -142,26 +172,14 @@ public class SchemaEvolution {
    * @param typeDescr
    * @return ORC types for the ACID event based on the row's type description
    */
-  public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) {
-
-    List<OrcProto.Type> result = new ArrayList<OrcProto.Type>();
-
-    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    type.setKind(OrcProto.Type.Kind.STRUCT);
-    type.addAllFieldNames(acidEventFieldNames);
-    for (int i = 0; i < acidEventFieldNames.size(); i++) {
-      type.addSubtypes(i + 1);
-    }
-    result.add(type.build());
-
-    // Automatically add all fields except the last (ROW).
-    for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
-      type.clear();
-      type.setKind(acidEventOrcTypeKinds.get(i));
-      result.add(type.build());
-    }
-
-    OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+  public static TypeDescription createEventSchema(TypeDescription typeDescr) {
+    TypeDescription result = TypeDescription.createStruct()
+        .addField("operation", TypeDescription.createInt())
+        .addField("originalTransaction", TypeDescription.createLong())
+        .addField("bucket", TypeDescription.createInt())
+        .addField("rowId", TypeDescription.createLong())
+        .addField("currentTransaction", TypeDescription.createLong())
+        .addField("row", typeDescr.clone());
     return result;
   }
 
@@ -174,14 +192,4 @@ public class SchemaEvolution {
     acidEventFieldNames.add("currentTransaction");
     acidEventFieldNames.add("row");
   }
-  public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
-      new ArrayList<OrcProto.Type.Kind>();
-  static {
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 8bb32ea..8ee8cd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -35,9 +36,12 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -56,8 +60,7 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.BitFieldReader;
 import org.apache.orc.impl.DynamicByteArray;
 import org.apache.orc.impl.InStream;
@@ -75,60 +78,6 @@ import org.apache.orc.impl.StreamName;
  */
 public class TreeReaderFactory {
 
-  private static final Logger LOG =
-    LoggerFactory.getLogger(TreeReaderFactory.class);
-
-  public static class TreeReaderSchema {
-
-    /**
-     * The types in the ORC file.
-     */
-    List<OrcProto.Type> fileTypes;
-
-    /**
-     * The treeReaderSchema that the reader should read as.
-     */
-    List<OrcProto.Type> schemaTypes;
-
-    /**
-     * The subtype of the row STRUCT.  Different than 0 for ACID.
-     */
-    int innerStructSubtype;
-
-    public TreeReaderSchema() {
-      fileTypes = null;
-      schemaTypes = null;
-      innerStructSubtype = -1;
-    }
-
-    public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
-      this.fileTypes = fileTypes;
-      return this;
-    }
-
-    public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
-      this.schemaTypes = schemaTypes;
-      return this;
-    }
-
-    public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
-      this.innerStructSubtype = innerStructSubtype;
-      return this;
-    }
-
-    public List<OrcProto.Type> getFileTypes() {
-      return fileTypes;
-    }
-
-    public List<OrcProto.Type> getSchemaTypes() {
-      return schemaTypes;
-    }
-
-    public int getInnerStructSubtype() {
-      return innerStructSubtype;
-    }
-  }
-
   public abstract static class TreeReader {
     protected final int columnId;
     protected BitFieldReader present = null;
@@ -230,36 +179,60 @@ public class TreeReaderFactory {
     }
 
     /**
+     * Called at the top level to read into the given batch.
+     * @param batch the batch to read into
+     * @param batchSize the number of rows to read
+     * @throws IOException
+     */
+    public void nextBatch(VectorizedRowBatch batch,
+                          int batchSize) throws IOException {
+      batch.cols[0].reset();
+      batch.cols[0].ensureSize(batchSize, false);
+      nextVector(batch.cols[0], null, batchSize);
+    }
+
+    /**
      * Populates the isNull vector array in the previousVector object based on
      * the present stream values. This function is called from all the child
      * readers, and they all set the values based on isNull field value.
      *
-     * @param previousVector The columnVector object whose isNull value is populated
+     * @param previous The columnVector object whose isNull value is populated
+     * @param isNull Whether the each value was null at a higher level. If
+     *               isNull is null, all values are non-null.
      * @param batchSize      Size of the column vector
-     * @return next column vector
      * @throws IOException
      */
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      ColumnVector result = (ColumnVector) previousVector;
-      if (present != null) {
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      if (present != null || isNull != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
         // present stream
-        result.noNulls = true;
+        previous.noNulls = true;
+        boolean allNull = true;
         for (int i = 0; i < batchSize; i++) {
-          result.isNull[i] = (present.next() != 1);
-          if (result.noNulls && result.isNull[i]) {
-            result.noNulls = false;
+          if (isNull == null || !isNull[i]) {
+            if (present != null && present.next() != 1) {
+              previous.noNulls = false;
+              previous.isNull[i] = true;
+            } else {
+              previous.isNull[i] = false;
+              allNull = false;
+            }
+          } else {
+            previous.noNulls = false;
+            previous.isNull[i] = true;
           }
         }
+        previous.isRepeating = !previous.noNulls && allNull;
       } else {
-        // There is not present stream, this means that all the values are
+        // There is no present stream, this means that all the values are
         // present.
-        result.noNulls = true;
+        previous.noNulls = true;
         for (int i = 0; i < batchSize; i++) {
-          result.isNull[i] = false;
+          previous.isNull[i] = false;
         }
       }
-      return previousVector;
     }
 
     public BitFieldReader getPresent() {
@@ -267,6 +240,46 @@ public class TreeReaderFactory {
     }
   }
 
+  public static class NullTreeReader extends TreeReader {
+
+    public NullTreeReader(int columnId) throws IOException {
+      super(columnId);
+    }
+
+    @Override
+    public void startStripe(Map<StreamName, InStream> streams,
+                            OrcProto.StripeFooter footer) {
+      // PASS
+    }
+
+    @Override
+    void skipRows(long rows) {
+      // PASS
+    }
+
+    @Override
+    public void seek(PositionProvider position) {
+      // PASS
+    }
+
+    @Override
+    public void seek(PositionProvider[] position) {
+      // PASS
+    }
+
+    @Override
+    Object next(Object previous) {
+      return null;
+    }
+
+    @Override
+    public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
+      vector.noNulls = false;
+      vector.isNull[0] = true;
+      vector.isRepeating = true;
+    }
+  }
+
   public static class BooleanTreeReader extends TreeReader {
     protected BitFieldReader reader = null;
 
@@ -322,20 +335,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, batchSize);
-      return result;
     }
   }
 
@@ -387,20 +396,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -473,20 +478,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -559,20 +560,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -646,20 +643,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -719,16 +712,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final DoubleColumnVector result;
-      if (previousVector == null) {
-        result = new DoubleColumnVector();
-      } else {
-        result = (DoubleColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
@@ -768,7 +758,6 @@ public class TreeReaderFactory {
         }
         result.isRepeating = repeating;
       }
-      return result;
     }
 
     @Override
@@ -832,16 +821,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final DoubleColumnVector result;
-      if (previousVector == null) {
-        result = new DoubleColumnVector();
-      } else {
-        result = (DoubleColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
@@ -881,8 +867,6 @@ public class TreeReaderFactory {
         }
         result.isRepeating = repeating;
       }
-
-      return result;
     }
 
     @Override
@@ -974,19 +958,15 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final BytesColumnVector result;
-      if (previousVector == null) {
-        result = new BytesColumnVector();
-      } else {
-        result = (BytesColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
-      return result;
     }
 
     @Override
@@ -1011,7 +991,6 @@ public class TreeReaderFactory {
     private final TimeZone readerTimeZone;
     private TimeZone writerTimeZone;
     private boolean hasSameTZRules;
-    private TimestampWritable scratchTimestampWritable;
 
     TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
       this(columnId, null, null, null, null, skipCorrupt);
@@ -1115,9 +1094,9 @@ public class TreeReaderFactory {
         int newNanos = parseNanos(nanos.next());
         // fix the rounding when we divided by 1000.
         if (millis >= 0) {
-          millis += newNanos / 1000000;
+          millis += newNanos / WriterImpl.NANOS_PER_MILLI;
         } else {
-          millis -= newNanos / 1000000;
+          millis -= newNanos / WriterImpl.NANOS_PER_MILLI;
         }
         long offset = 0;
         // If reader and writer time zones have different rules, adjust the timezone difference
@@ -1144,31 +1123,45 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final TimestampColumnVector result;
-      if (previousVector == null) {
-        result = new TimestampColumnVector();
-      } else {
-        result = (TimestampColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      TimestampColumnVector result = (TimestampColumnVector) previousVector;
+      super.nextVector(previousVector, isNull, batchSize);
 
-      result.reset();
-      if (scratchTimestampWritable == null) {
-        scratchTimestampWritable = new TimestampWritable();
-      }
-      Object obj;
       for (int i = 0; i < batchSize; i++) {
-        obj = next(scratchTimestampWritable);
-        if (obj == null) {
-          result.noNulls = false;
-          result.isNull[i] = true;
-        } else {
-          TimestampWritable writable = (TimestampWritable) obj;
-          result.set(i, writable.getTimestamp());
+        if (result.noNulls || !result.isNull[i]) {
+          long millis = data.next() + base_timestamp;
+          int newNanos = parseNanos(nanos.next());
+          if (millis < 0 && newNanos != 0) {
+            millis -= 1;
+          }
+          millis *= WriterImpl.MILLIS_PER_SECOND;
+          long offset = 0;
+          // If reader and writer time zones have different rules, adjust the timezone difference
+          // between reader and writer taking day light savings into account.
+          if (!hasSameTZRules) {
+            offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
+          }
+          long adjustedMillis = millis + offset;
+          // Sometimes the reader timezone might have changed after adding the adjustedMillis.
+          // To account for that change, check for any difference in reader timezone after
+          // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
+          if (!hasSameTZRules &&
+              (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
+            long newOffset =
+                writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
+            adjustedMillis = millis + newOffset;
+          }
+          result.time[i] = adjustedMillis;
+          result.nanos[i] = newNanos;
+          if (result.isRepeating && i != 0 &&
+              (result.time[0] != result.time[i] ||
+                  result.nanos[0] != result.nanos[i])) {
+            result.isRepeating = false;
+          }
         }
       }
-
-      return result;
     }
 
     private static int parseNanos(long serialized) {
@@ -1253,20 +1246,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -1278,7 +1267,7 @@ public class TreeReaderFactory {
   public static class DecimalTreeReader extends TreeReader {
     protected InStream valueStream;
     protected IntegerReader scaleReader = null;
-    private LongColumnVector scratchScaleVector;
+    private int[] scratchScaleVector;
 
     private final int precision;
     private final int scale;
@@ -1293,7 +1282,7 @@ public class TreeReaderFactory {
       super(columnId, present);
       this.precision = precision;
       this.scale = scale;
-      this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+      this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
       this.valueStream = valueStream;
       if (scaleStream != null && encoding != null) {
         checkEncoding(encoding);
@@ -1352,46 +1341,34 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final DecimalColumnVector result;
-      if (previousVector == null) {
-        result = new DecimalColumnVector(precision, scale);
-      } else {
-        result = (DecimalColumnVector) previousVector;
-      }
-
-      // Save the reference for isNull in the scratch vector
-      boolean[] scratchIsNull = scratchScaleVector.isNull;
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final DecimalColumnVector result = (DecimalColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
       // Read value entries based on isNull entries
-      if (result.isRepeating) {
-        if (!result.isNull[0]) {
+      if (result.noNulls) {
+        for (int r=0; r < batchSize; ++r) {
           BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-          short scaleInData = (short) scaleReader.next();
-          HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
-          dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale);
-          result.set(0, dec);
+          HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+          result.set(r, dec);
         }
-      } else {
-        // result vector has isNull values set, use the same to read scale vector.
-        scratchScaleVector.isNull = result.isNull;
-        scaleReader.nextVector(scratchScaleVector, batchSize);
-        for (int i = 0; i < batchSize; i++) {
-          if (!result.isNull[i]) {
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        for (int r=0; r < batchSize; ++r) {
+          if (!result.isNull[r]) {
             BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-            short scaleInData = (short) scratchScaleVector.vector[i];
-            HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
-            dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale);
-            result.set(i, dec);
+            HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+            result.set(r, dec);
           }
         }
       }
-      // Switch back the null vector.
-      scratchScaleVector.isNull = scratchIsNull;
-      return result;
     }
 
     @Override
@@ -1481,8 +1458,10 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      return reader.nextVector(previousVector, batchSize);
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      reader.nextVector(previousVector, isNull, batchSize);
     }
 
     @Override
@@ -1501,7 +1480,7 @@ public class TreeReaderFactory {
         BytesColumnVector result, final int batchSize) throws IOException {
       // Read lengths
       scratchlcv.isNull = result.isNull;  // Notice we are replacing the isNull vector here...
-      lengths.nextVector(scratchlcv, batchSize);
+      lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
       int totalLength = 0;
       if (!scratchlcv.isRepeating) {
         for (int i = 0; i < batchSize; i++) {
@@ -1532,31 +1511,35 @@ public class TreeReaderFactory {
     }
 
     // This method has the common code for reading in bytes into a BytesColumnVector.
-    public static void readOrcByteArrays(InStream stream, IntegerReader lengths,
-        LongColumnVector scratchlcv,
-        BytesColumnVector result, final int batchSize) throws IOException {
-
-      byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize);
-
-      // Too expensive to figure out 'repeating' by comparisons.
-      result.isRepeating = false;
-      int offset = 0;
-      if (!scratchlcv.isRepeating) {
-        for (int i = 0; i < batchSize; i++) {
-          if (!scratchlcv.isNull[i]) {
-            result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
-            offset += scratchlcv.vector[i];
-          } else {
-            result.setRef(i, allBytes, 0, 0);
+    public static void readOrcByteArrays(InStream stream,
+                                         IntegerReader lengths,
+                                         LongColumnVector scratchlcv,
+                                         BytesColumnVector result,
+                                         int batchSize) throws IOException {
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
+            result, (int) batchSize);
+
+        // Too expensive to figure out 'repeating' by comparisons.
+        result.isRepeating = false;
+        int offset = 0;
+        if (!scratchlcv.isRepeating) {
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
+              offset += scratchlcv.vector[i];
+            } else {
+              result.setRef(i, allBytes, 0, 0);
+            }
           }
-        }
-      } else {
-        for (int i = 0; i < batchSize; i++) {
-          if (!scratchlcv.isNull[i]) {
-            result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
-            offset += scratchlcv.vector[0];
-          } else {
-            result.setRef(i, allBytes, 0, 0);
+        } else {
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
+              offset += scratchlcv.vector[0];
+            } else {
+              result.setRef(i, allBytes, 0, 0);
+            }
           }
         }
       }
@@ -1641,19 +1624,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final BytesColumnVector result;
-      if (previousVector == null) {
-        result = new BytesColumnVector();
-      } else {
-        result = (BytesColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
-      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
-      return result;
+      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
+          result, batchSize);
     }
 
     @Override
@@ -1816,18 +1796,15 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final BytesColumnVector result;
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
       int offset;
       int length;
-      if (previousVector == null) {
-        result = new BytesColumnVector();
-      } else {
-        result = (BytesColumnVector) previousVector;
-      }
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       if (dictionaryBuffer != null) {
 
@@ -1838,7 +1815,8 @@ public class TreeReaderFactory {
 
         // Read string offsets
         scratchlcv.isNull = result.isNull;
-        reader.nextVector(scratchlcv, batchSize);
+        scratchlcv.ensureSize((int) batchSize, false);
+        reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
         if (!scratchlcv.isRepeating) {
 
           // The vector has non-repeating strings. Iterate thru the batch
@@ -1878,7 +1856,6 @@ public class TreeReaderFactory {
           }
         }
       }
-      return result;
     }
 
     int getDictionaryEntryLength(int entry, int offset) {
@@ -1936,11 +1913,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
-      BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
-
+      super.nextVector(previousVector, isNull, batchSize);
+      BytesColumnVector result = (BytesColumnVector) previousVector;
       int adjustedDownLen;
       if (result.isRepeating) {
         if (result.noNulls || !result.isNull[0]) {
@@ -1973,7 +1952,6 @@ public class TreeReaderFactory {
           }
         }
       }
-      return result;
     }
   }
 
@@ -2010,10 +1988,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
-      BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
+      super.nextVector(previousVector, isNull, batchSize);
+      BytesColumnVector result = (BytesColumnVector) previousVector;
 
       int adjustedDownLen;
       if (result.isRepeating) {
@@ -2045,62 +2026,26 @@ public class TreeReaderFactory {
           }
         }
       }
-      return result;
     }
   }
 
   protected static class StructTreeReader extends TreeReader {
-    private final int readColumnCount;
-    private final int resultColumnCount;
     protected final TreeReader[] fields;
-    private final String[] fieldNames;
 
-    protected StructTreeReader(
-        int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
+    protected StructTreeReader(int columnId,
+                               TypeDescription readerSchema,
+                               SchemaEvolution treeReaderSchema,
+                               boolean[] included,
+                               boolean skipCorrupt) throws IOException {
       super(columnId);
 
-      OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
-
-      OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+      TypeDescription fileSchema = treeReaderSchema.getFileType(readerSchema);
 
-      readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount());
-
-      if (columnId == treeReaderSchema.getInnerStructSubtype()) {
-        // If there are more result columns than reader columns, we will default those additional
-        // columns to NULL.
-        resultColumnCount = schemaStructType.getFieldNamesCount();
-      } else {
-        resultColumnCount = readColumnCount;
-      }
-
-      this.fields = new TreeReader[readColumnCount];
-      this.fieldNames = new String[readColumnCount];
-
-      if (included == null) {
-        for (int i = 0; i < readColumnCount; ++i) {
-          int subtype = schemaStructType.getSubtypes(i);
-          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
-          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
-          this.fieldNames[i] = schemaStructType.getFieldNames(i);
-        }
-      } else {
-        for (int i = 0; i < readColumnCount; ++i) {
-          int subtype = schemaStructType.getSubtypes(i);
-          if (subtype >= included.length) {
-            throw new IOException("subtype " + subtype + " exceeds the included array size " +
-                included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
-                " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
-                " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
-          }
-          if (included[subtype]) {
-            this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
-          }
-          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
-          this.fieldNames[i] = schemaStructType.getFieldNames(i);
-        }
+      List<TypeDescription> childrenTypes = readerSchema.getChildren();
+      this.fields = new TreeReader[childrenTypes.size()];
+      for (int i = 0; i < fields.length; ++i) {
+        TypeDescription subtype = childrenTypes.get(i);
+        this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
       }
     }
 
@@ -2120,65 +2065,52 @@ public class TreeReaderFactory {
       OrcStruct result = null;
       if (valuePresent) {
         if (previous == null) {
-          result = new OrcStruct(resultColumnCount);
+          result = new OrcStruct(fields.length);
         } else {
           result = (OrcStruct) previous;
 
           // If the input format was initialized with a file with a
           // different number of fields, the number of fields needs to
           // be updated to the correct number
-          if (result.getNumFields() != resultColumnCount) {
-            result.setNumFields(resultColumnCount);
-          }
+          result.setNumFields(fields.length);
         }
-        for (int i = 0; i < readColumnCount; ++i) {
+        for (int i = 0; i < fields.length; ++i) {
           if (fields[i] != null) {
             result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
           }
         }
-        if (resultColumnCount > readColumnCount) {
-          for (int i = readColumnCount; i < resultColumnCount; ++i) {
-            // Default new treeReaderSchema evolution fields to NULL.
-            result.setFieldValue(i, null);
-          }
-        }
       }
       return result;
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final ColumnVector[] result;
-      if (previousVector == null) {
-        result = new ColumnVector[readColumnCount];
-      } else {
-        result = (ColumnVector[]) previousVector;
+    public void nextBatch(VectorizedRowBatch batch,
+                          int batchSize) throws IOException {
+      for(int i=0; i < fields.length &&
+          (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+        batch.cols[i].reset();
+        batch.cols[i].ensureSize((int) batchSize, false);
+        fields[i].nextVector(batch.cols[i], null, batchSize);
       }
+    }
 
-      // Read all the members of struct as column vectors
-      for (int i = 0; i < readColumnCount; i++) {
-        if (fields[i] != null) {
-          if (result[i] == null) {
-            result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
-          } else {
-            fields[i].nextVector(result[i], batchSize);
-          }
-        }
-      }
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      super.nextVector(previousVector, isNull, batchSize);
+      StructColumnVector result = (StructColumnVector) previousVector;
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        result.isRepeating = false;
 
-      // Default additional treeReaderSchema evolution fields to NULL.
-      if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) {
-        for (int i = readColumnCount; i < vectorColumnCount; ++i) {
-          ColumnVector colVector = result[i];
-          if (colVector != null) {
-            colVector.isRepeating = true;
-            colVector.noNulls = false;
-            colVector.isNull[0] = true;
+        // Read all the members of struct as column vectors
+        boolean[] mask = result.noNulls ? null : result.isNull;
+        for (int f = 0; f < fields.length; f++) {
+          if (fields[f] != null) {
+            fields[f].nextVector(result.fields[f], mask, batchSize);
           }
         }
       }
-
-      return result;
     }
 
     @Override
@@ -2208,19 +2140,18 @@ public class TreeReaderFactory {
     protected final TreeReader[] fields;
     protected RunLengthByteReader tags;
 
-    protected UnionTreeReader(int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
-      super(columnId);
-      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-      int fieldCount = type.getSubtypesCount();
+    protected UnionTreeReader(int fileColumn,
+                              TypeDescription readerSchema,
+                              SchemaEvolution treeReaderSchema,
+                              boolean[] included,
+                              boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      List<TypeDescription> childrenTypes = readerSchema.getChildren();
+      int fieldCount = childrenTypes.size();
       this.fields = new TreeReader[fieldCount];
       for (int i = 0; i < fieldCount; ++i) {
-        int subtype = type.getSubtypes(i);
-        if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
-        }
+        TypeDescription subtype = childrenTypes.get(i);
+        this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
       }
     }
 
@@ -2252,9 +2183,25 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      throw new UnsupportedOperationException(
-          "NextVector is not supported operation for Union type");
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      UnionColumnVector result = (UnionColumnVector) previousVector;
+      super.nextVector(result, isNull, batchSize);
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        result.isRepeating = false;
+        tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
+            batchSize);
+        boolean[] ignore = new boolean[(int) batchSize];
+        for (int f = 0; f < result.fields.length; ++f) {
+          // build the ignore list for this tag
+          for (int r = 0; r < batchSize; ++r) {
+            ignore[r] = (!result.noNulls && result.isNull[r]) ||
+                result.tags[r] != f;
+          }
+          fields[f].nextVector(result.fields[f], ignore, batchSize);
+        }
+      }
     }
 
     @Override
@@ -2288,13 +2235,15 @@ public class TreeReaderFactory {
     protected final TreeReader elementReader;
     protected IntegerReader lengths = null;
 
-    protected ListTreeReader(int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
-      super(columnId);
-      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-      elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
+    protected ListTreeReader(int fileColumn,
+                             TypeDescription readerSchema,
+                             SchemaEvolution treeReaderSchema,
+                             boolean[] included,
+                             boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      TypeDescription elementType = readerSchema.getChildren().get(0);
+      elementReader = createTreeReader(elementType, treeReaderSchema, included,
+          skipCorrupt);
     }
 
     @Override
@@ -2335,9 +2284,27 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previous, final int batchSize) throws IOException {
-      throw new UnsupportedOperationException(
-          "NextVector is not supported operation for List type");
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      ListColumnVector result = (ListColumnVector) previous;
+      super.nextVector(result, isNull, batchSize);
+      // if we have some none-null values, then read them
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        lengths.nextVector(result, result.lengths, batchSize);
+        // even with repeating lengths, the list doesn't repeat
+        result.isRepeating = false;
+        // build the offsets vector and figure out how many children to read
+        result.childCount = 0;
+        for (int r = 0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.offsets[r] = result.childCount;
+            result.childCount += result.lengths[r];
+          }
+        }
+        result.child.ensureSize(result.childCount, false);
+        elementReader.nextVector(result.child, null, result.childCount);
+      }
     }
 
     @Override
@@ -2378,24 +2345,16 @@ public class TreeReaderFactory {
     protected final TreeReader valueReader;
     protected IntegerReader lengths = null;
 
-    protected MapTreeReader(int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
-      super(columnId);
-      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-      int keyColumn = type.getSubtypes(0);
-      int valueColumn = type.getSubtypes(1);
-      if (included == null || included[keyColumn]) {
-        keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
-      } else {
-        keyReader = null;
-      }
-      if (included == null || included[valueColumn]) {
-        valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
-      } else {
-        valueReader = null;
-      }
+    protected MapTreeReader(int fileColumn,
+                            TypeDescription readerSchema,
+                            SchemaEvolution treeReaderSchema,
+                            boolean[] included,
+                            boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      TypeDescription keyType = readerSchema.getChildren().get(0);
+      TypeDescription valueType = readerSchema.getChildren().get(1);
+      keyReader = createTreeReader(keyType, treeReaderSchema, included, skipCorrupt);
+      valueReader = createTreeReader(valueType, treeReaderSchema, included, skipCorrupt);
     }
 
     @Override
@@ -2429,9 +2388,28 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previous, final int batchSize) throws IOException {
-      throw new UnsupportedOperationException(
-          "NextVector is not supported operation for Map type");
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      MapColumnVector result = (MapColumnVector) previous;
+      super.nextVector(result, isNull, batchSize);
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        lengths.nextVector(result, result.lengths, batchSize);
+        // even with repeating lengths, the map doesn't repeat
+        result.isRepeating = false;
+        // build the offsets vector and figure out how many children to read
+        result.childCount = 0;
+        for (int r = 0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.offsets[r] = result.childCount;
+            result.childCount += result.lengths[r];
+          }
+        }
+        result.keys.ensureSize(result.childCount, false);
+        result.values.ensureSize(result.childCount, false);
+        keyReader.nextVector(result.keys, null, result.childCount);
+        valueReader.nextVector(result.values, null, result.childCount);
+      }
     }
 
     @Override
@@ -2471,61 +2449,61 @@ public class TreeReaderFactory {
     }
   }
 
-  public static TreeReader createTreeReader(int columnId,
-      TreeReaderSchema treeReaderSchema,
-      boolean[] included,
-      boolean skipCorrupt
-  ) throws IOException {
-    OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-    switch (type.getKind()) {
+  public static TreeReader createTreeReader(TypeDescription readerType,
+                                            SchemaEvolution evolution,
+                                            boolean[] included,
+                                            boolean skipCorrupt
+                                            ) throws IOException {
+    TypeDescription fileType = evolution.getFileType(readerType);
+    if (fileType == null ||
+        (included != null && !included[readerType.getId()])) {
+      return new NullTreeReader(0);
+    }
+    switch (readerType.getCategory()) {
       case BOOLEAN:
-        return new BooleanTreeReader(columnId);
+        return new BooleanTreeReader(fileType.getId());
       case BYTE:
-        return new ByteTreeReader(columnId);
+        return new ByteTreeReader(fileType.getId());
       case DOUBLE:
-        return new DoubleTreeReader(columnId);
+        return new DoubleTreeReader(fileType.getId());
       case FLOAT:
-        return new FloatTreeReader(columnId);
+        return new FloatTreeReader(fileType.getId());
       case SHORT:
-        return new ShortTreeReader(columnId);
+        return new ShortTreeReader(fileType.getId());
       case INT:
-        return new IntTreeReader(columnId);
+        return new IntTreeReader(fileType.getId());
       case LONG:
-        return new LongTreeReader(columnId, skipCorrupt);
+        return new LongTreeReader(fileType.getId(), skipCorrupt);
       case STRING:
-        return new StringTreeReader(columnId);
+        return new StringTreeReader(fileType.getId());
       case CHAR:
-        if (!type.hasMaximumLength()) {
-          throw new IllegalArgumentException("ORC char type has no length specified");
-        }
-        return new CharTreeReader(columnId, type.getMaximumLength());
+        return new CharTreeReader(fileType.getId(), readerType.getMaxLength());
       case VARCHAR:
-        if (!type.hasMaximumLength()) {
-          throw new IllegalArgumentException("ORC varchar type has no length specified");
-        }
-        return new VarcharTreeReader(columnId, type.getMaximumLength());
+        return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength());
       case BINARY:
-        return new BinaryTreeReader(columnId);
+        return new BinaryTreeReader(fileType.getId());
       case TIMESTAMP:
-        return new TimestampTreeReader(columnId, skipCorrupt);
+        return new TimestampTreeReader(fileType.getId(), skipCorrupt);
       case DATE:
-        return new DateTreeReader(columnId);
+        return new DateTreeReader(fileType.getId());
       case DECIMAL:
-        int precision =
-            type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
-        int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
-        return new DecimalTreeReader(columnId, precision, scale);
+        return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(),
+            readerType.getScale());
       case STRUCT:
-        return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new StructTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
       case LIST:
-        return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new ListTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
       case MAP:
-        return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new MapTreeReader(fileType.getId(), readerType, evolution,
+            included, skipCorrupt);
       case UNION:
-        return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new UnionTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
       default:
         throw new IllegalArgumentException("Unsupported type " +
-            type.getKind());
+            readerType.getCategory());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index 816b52d..e4d2e6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -71,14 +71,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
         OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
       }
 
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
       /**
        * Do we have schema on read in the configuration variables?
        */
-      TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ false);
-
       List<OrcProto.Type> types = file.getTypes();
-      Reader.Options options = new Reader.Options();
-      options.schema(schema);
+      int dataColumns = rbCtx.getDataColumnCount();
+      TypeDescription schema =
+          OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns);
+      if (schema == null) {
+        schema = file.getSchema();
+        // Even if the user isn't doing schema evolution, cut the schema
+        // to the desired size.
+        if (schema.getCategory() == TypeDescription.Category.STRUCT &&
+            schema.getChildren().size() > dataColumns) {
+          schema = schema.clone();
+          List<TypeDescription> children = schema.getChildren();
+          for(int c = children.size() - 1; c >= dataColumns; --c) {
+            children.remove(c);
+          }
+        }
+      }
+      Reader.Options options = new Reader.Options().schema(schema);
+
       this.offset = fileSplit.getStart();
       this.length = fileSplit.getLength();
       options.range(offset, length);
@@ -87,8 +102,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
 
       this.reader = file.rowsOptions(options);
 
-      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
-
       columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
 
       int partitionColumnCount = rbCtx.getPartitionColumnCount();
@@ -103,9 +116,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
     @Override
     public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
 
-      if (!reader.hasNext()) {
-        return false;
-      }
       try {
         // Check and update partition cols if necessary. Ideally, this should be done
         // in CreateValue as the partition is constant per split. But since Hive uses
@@ -118,7 +128,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
           }
           addPartitionCols = false;
         }
-        reader.nextBatch(value);
+        if (!reader.nextBatch(value)) {
+          return false;
+        }
       } catch (Exception e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 70fe803..8e52907 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -101,8 +101,6 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
     }
   }
 
-  private static final long NANOS_PER_MILLI = 1000000;
-
   /**
    * Set the value for a given column value within a batch.
    * @param rowId the row to set


[2/5] hive git commit: HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
index a82d672..6589692 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorOrcFile.java
@@ -32,51 +32,26 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hive.common.util.HiveTestUtils;
 import org.apache.orc.BinaryColumnStatistics;
 import org.apache.orc.BooleanColumnStatistics;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionKind;
+import org.apache.orc.DataReader;
 import org.apache.orc.DecimalColumnStatistics;
 import org.apache.orc.DoubleColumnStatistics;
 import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.impl.DataReaderProperties;
 import org.apache.orc.impl.MemoryManager;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcUtils;
@@ -117,6 +92,10 @@ public class TestVectorOrcFile {
   public static class InnerStruct {
     int int1;
     Text string1 = new Text();
+    InnerStruct(int int1, Text string1) {
+      this.int1 = int1;
+      this.string1.set(string1);
+    }
     InnerStruct(int int1, String string1) {
       this.int1 = int1;
       this.string1.set(string1);
@@ -136,50 +115,6 @@ public class TestVectorOrcFile {
     }
   }
 
-  public static class BigRow {
-    Boolean boolean1;
-    Byte byte1;
-    Short short1;
-    Integer int1;
-    Long long1;
-    Float float1;
-    Double double1;
-    BytesWritable bytes1;
-    Text string1;
-    MiddleStruct middle;
-    List<InnerStruct> list = new ArrayList<InnerStruct>();
-    Map<Text, InnerStruct> map = new HashMap<Text, InnerStruct>();
-
-    BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1, Float f1,
-           Double d1,
-           BytesWritable b3, String s2, MiddleStruct m1,
-           List<InnerStruct> l2, Map<String, InnerStruct> m2) {
-      this.boolean1 = b1;
-      this.byte1 = b2;
-      this.short1 = s1;
-      this.int1 = i1;
-      this.long1 = l1;
-      this.float1 = f1;
-      this.double1 = d1;
-      this.bytes1 = b3;
-      if (s2 == null) {
-        this.string1 = null;
-      } else {
-        this.string1 = new Text(s2);
-      }
-      this.middle = m1;
-      this.list = l2;
-      if (m2 != null) {
-        this.map = new HashMap<Text, InnerStruct>();
-        for (Map.Entry<String, InnerStruct> item : m2.entrySet()) {
-          this.map.put(new Text(item.getKey()), item.getValue());
-        }
-      } else {
-        this.map = null;
-      }
-    }
-  }
-
   private static InnerStruct inner(int i, String s) {
     return new InnerStruct(i, s);
   }
@@ -301,206 +236,115 @@ public class TestVectorOrcFile {
     assertEquals("count: 7500 hasNull: true min: bye max: hi sum: 0", stats[9].toString());
 
     // check the inspectors
-    StructObjectInspector readerInspector = (StructObjectInspector) reader
-        .getObjectInspector();
-    assertEquals(ObjectInspector.Category.STRUCT, readerInspector.getCategory());
+    TypeDescription schema = reader.getSchema();
+    assertEquals(TypeDescription.Category.STRUCT, schema.getCategory());
     assertEquals("struct<boolean1:boolean,byte1:tinyint,short1:smallint,"
         + "int1:int,long1:bigint,float1:float,double1:double,bytes1:"
         + "binary,string1:string,middle:struct<list:array<struct<int1:int,"
         + "string1:string>>>,list:array<struct<int1:int,string1:string>>,"
         + "map:map<string,struct<int1:int,string1:string>>,ts:timestamp,"
-        + "decimal1:decimal(38,18)>", readerInspector.getTypeName());
-    List<? extends StructField> fields = readerInspector
-        .getAllStructFieldRefs();
-    BooleanObjectInspector bo = (BooleanObjectInspector) readerInspector
-        .getStructFieldRef("boolean1").getFieldObjectInspector();
-    ByteObjectInspector by = (ByteObjectInspector) readerInspector
-        .getStructFieldRef("byte1").getFieldObjectInspector();
-    ShortObjectInspector sh = (ShortObjectInspector) readerInspector
-        .getStructFieldRef("short1").getFieldObjectInspector();
-    IntObjectInspector in = (IntObjectInspector) readerInspector
-        .getStructFieldRef("int1").getFieldObjectInspector();
-    LongObjectInspector lo = (LongObjectInspector) readerInspector
-        .getStructFieldRef("long1").getFieldObjectInspector();
-    FloatObjectInspector fl = (FloatObjectInspector) readerInspector
-        .getStructFieldRef("float1").getFieldObjectInspector();
-    DoubleObjectInspector dbl = (DoubleObjectInspector) readerInspector
-        .getStructFieldRef("double1").getFieldObjectInspector();
-    BinaryObjectInspector bi = (BinaryObjectInspector) readerInspector
-        .getStructFieldRef("bytes1").getFieldObjectInspector();
-    StringObjectInspector st = (StringObjectInspector) readerInspector
-        .getStructFieldRef("string1").getFieldObjectInspector();
-    StructObjectInspector mid = (StructObjectInspector) readerInspector
-        .getStructFieldRef("middle").getFieldObjectInspector();
-    List<? extends StructField> midFields = mid.getAllStructFieldRefs();
-    ListObjectInspector midli = (ListObjectInspector) midFields.get(0)
-        .getFieldObjectInspector();
-    StructObjectInspector inner = (StructObjectInspector) midli
-        .getListElementObjectInspector();
-    List<? extends StructField> inFields = inner.getAllStructFieldRefs();
-    ListObjectInspector li = (ListObjectInspector) readerInspector
-        .getStructFieldRef("list").getFieldObjectInspector();
-    MapObjectInspector ma = (MapObjectInspector) readerInspector
-        .getStructFieldRef("map").getFieldObjectInspector();
-    TimestampObjectInspector tso = (TimestampObjectInspector) readerInspector
-        .getStructFieldRef("ts").getFieldObjectInspector();
-    HiveDecimalObjectInspector dco = (HiveDecimalObjectInspector) readerInspector
-        .getStructFieldRef("decimal1").getFieldObjectInspector();
-    StringObjectInspector mk = (StringObjectInspector) ma
-        .getMapKeyObjectInspector();
+        + "decimal1:decimal(38,10)>", schema.toString());
+    VectorizedRowBatch batch = schema.createRowBatch();
+
     RecordReader rows = reader.rows();
-    Object row = rows.next(null);
-    assertNotNull(row);
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
+
     // check the contents of the first row
-    assertEquals(false,
-        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals(1,
-        by.get(readerInspector.getStructFieldData(row, fields.get(1))));
-    assertEquals(1024,
-        sh.get(readerInspector.getStructFieldData(row, fields.get(2))));
-    assertEquals(65536,
-        in.get(readerInspector.getStructFieldData(row, fields.get(3))));
-    assertEquals(Long.MAX_VALUE,
-        lo.get(readerInspector.getStructFieldData(row, fields.get(4))));
-    assertEquals(1.0,
-        fl.get(readerInspector.getStructFieldData(row, fields.get(5))), 0.00001);
-    assertEquals(-15.0,
-        dbl.get(readerInspector.getStructFieldData(row, fields.get(6))),
-        0.00001);
-    assertEquals(bytes(0, 1, 2, 3, 4),
-        bi.getPrimitiveWritableObject(readerInspector.getStructFieldData(row,
-            fields.get(7))));
-    assertEquals("hi", st.getPrimitiveJavaObject(readerInspector
-        .getStructFieldData(row, fields.get(8))));
-    List<?> midRow = midli.getList(mid.getStructFieldData(
-        readerInspector.getStructFieldData(row, fields.get(9)),
-        midFields.get(0)));
-    assertNotNull(midRow);
-    assertEquals(2, midRow.size());
-    assertEquals(1,
-        in.get(inner.getStructFieldData(midRow.get(0), inFields.get(0))));
-    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        midRow.get(0), inFields.get(1))));
-    assertEquals(2,
-        in.get(inner.getStructFieldData(midRow.get(1), inFields.get(0))));
-    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        midRow.get(1), inFields.get(1))));
-    List<?> list = li.getList(readerInspector.getStructFieldData(row,
-        fields.get(10)));
-    assertEquals(2, list.size());
-    assertEquals(3,
-        in.get(inner.getStructFieldData(list.get(0), inFields.get(0))));
-    assertEquals("good", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        list.get(0), inFields.get(1))));
-    assertEquals(4,
-        in.get(inner.getStructFieldData(list.get(1), inFields.get(0))));
-    assertEquals("bad", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        list.get(1), inFields.get(1))));
-    Map<?, ?> map = ma.getMap(readerInspector.getStructFieldData(row,
-        fields.get(11)));
-    assertEquals(0, map.size());
+    assertEquals(false, getBoolean(batch, 0));
+    assertEquals(1, getByte(batch, 0));
+    assertEquals(1024, getShort(batch, 0));
+    assertEquals(65536, getInt(batch, 0));
+    assertEquals(Long.MAX_VALUE, getLong(batch, 0));
+    assertEquals(1.0, getFloat(batch, 0), 0.00001);
+    assertEquals(-15.0, getDouble(batch, 0), 0.00001);
+    assertEquals(bytes(0, 1, 2, 3, 4), getBinary(batch, 0));
+    assertEquals("hi", getText(batch, 0).toString());
+
+    StructColumnVector middle = (StructColumnVector) batch.cols[9];
+    ListColumnVector midList = (ListColumnVector) middle.fields[0];
+    StructColumnVector midListStruct = (StructColumnVector) midList.child;
+    LongColumnVector midListInt = (LongColumnVector) midListStruct.fields[0];
+    BytesColumnVector midListStr = (BytesColumnVector) midListStruct.fields[1];
+    ListColumnVector list = (ListColumnVector) batch.cols[10];
+    StructColumnVector listStruct = (StructColumnVector) list.child;
+    LongColumnVector listInts = (LongColumnVector) listStruct.fields[0];
+    BytesColumnVector listStrs = (BytesColumnVector) listStruct.fields[1];
+    MapColumnVector map = (MapColumnVector) batch.cols[11];
+    BytesColumnVector mapKey = (BytesColumnVector) map.keys;
+    StructColumnVector mapValue = (StructColumnVector) map.values;
+    LongColumnVector mapValueInts = (LongColumnVector) mapValue.fields[0];
+    BytesColumnVector mapValueStrs = (BytesColumnVector) mapValue.fields[1];
+    TimestampColumnVector timestamp = (TimestampColumnVector) batch.cols[12];
+    DecimalColumnVector decs = (DecimalColumnVector) batch.cols[13];
+
+    assertEquals(false, middle.isNull[0]);
+    assertEquals(2, midList.lengths[0]);
+    int start = (int) midList.offsets[0];
+    assertEquals(1, midListInt.vector[start]);
+    assertEquals("bye", midListStr.toString(start));
+    assertEquals(2, midListInt.vector[start + 1]);
+    assertEquals("sigh", midListStr.toString(start + 1));
+
+    assertEquals(2, list.lengths[0]);
+    start = (int) list.offsets[0];
+    assertEquals(3, listInts.vector[start]);
+    assertEquals("good", listStrs.toString(start));
+    assertEquals(4, listInts.vector[start + 1]);
+    assertEquals("bad", listStrs.toString(start + 1));
+    assertEquals(0, map.lengths[0]);
     assertEquals(Timestamp.valueOf("2000-03-12 15:00:00"),
-        tso.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
-            fields.get(12))));
-    assertEquals(HiveDecimal.create("12345678.6547456"),
-        dco.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
-            fields.get(13))));
+        timestamp.asScratchTimestamp(0));
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("12345678.6547456")),
+        decs.vector[0]);
 
-    // check the contents of second row
-    assertEquals(true, rows.hasNext());
+    // check the contents of row 7499
     rows.seekToRow(7499);
-    row = rows.next(null);
-    assertEquals(true,
-        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals(100,
-        by.get(readerInspector.getStructFieldData(row, fields.get(1))));
-    assertEquals(2048,
-        sh.get(readerInspector.getStructFieldData(row, fields.get(2))));
-    assertEquals(65536,
-        in.get(readerInspector.getStructFieldData(row, fields.get(3))));
-    assertEquals(Long.MAX_VALUE,
-        lo.get(readerInspector.getStructFieldData(row, fields.get(4))));
-    assertEquals(2.0,
-        fl.get(readerInspector.getStructFieldData(row, fields.get(5))), 0.00001);
-    assertEquals(-5.0,
-        dbl.get(readerInspector.getStructFieldData(row, fields.get(6))),
-        0.00001);
-    assertEquals(bytes(), bi.getPrimitiveWritableObject(readerInspector
-        .getStructFieldData(row, fields.get(7))));
-    assertEquals("bye", st.getPrimitiveJavaObject(readerInspector
-        .getStructFieldData(row, fields.get(8))));
-    midRow = midli.getList(mid.getStructFieldData(
-        readerInspector.getStructFieldData(row, fields.get(9)),
-        midFields.get(0)));
-    assertNotNull(midRow);
-    assertEquals(2, midRow.size());
-    assertEquals(1,
-        in.get(inner.getStructFieldData(midRow.get(0), inFields.get(0))));
-    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        midRow.get(0), inFields.get(1))));
-    assertEquals(2,
-        in.get(inner.getStructFieldData(midRow.get(1), inFields.get(0))));
-    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        midRow.get(1), inFields.get(1))));
-    list = li.getList(readerInspector.getStructFieldData(row, fields.get(10)));
-    assertEquals(3, list.size());
-    assertEquals(100000000,
-        in.get(inner.getStructFieldData(list.get(0), inFields.get(0))));
-    assertEquals("cat", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        list.get(0), inFields.get(1))));
-    assertEquals(-100000,
-        in.get(inner.getStructFieldData(list.get(1), inFields.get(0))));
-    assertEquals("in", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        list.get(1), inFields.get(1))));
-    assertEquals(1234,
-        in.get(inner.getStructFieldData(list.get(2), inFields.get(0))));
-    assertEquals("hat", st.getPrimitiveJavaObject(inner.getStructFieldData(
-        list.get(2), inFields.get(1))));
-    map = ma.getMap(readerInspector.getStructFieldData(row, fields.get(11)));
-    assertEquals(2, map.size());
-    boolean[] found = new boolean[2];
-    for(Object key : map.keySet()) {
-      String str = mk.getPrimitiveJavaObject(key);
-      if (str.equals("chani")) {
-        assertEquals(false, found[0]);
-        assertEquals(5,
-            in.get(inner.getStructFieldData(map.get(key), inFields.get(0))));
-        assertEquals(str, st.getPrimitiveJavaObject(inner.getStructFieldData(
-            map.get(key), inFields.get(1))));
-        found[0] = true;
-      } else if (str.equals("mauddib")) {
-        assertEquals(false, found[1]);
-        assertEquals(1,
-            in.get(inner.getStructFieldData(map.get(key), inFields.get(0))));
-        assertEquals(str, st.getPrimitiveJavaObject(inner.getStructFieldData(
-            map.get(key), inFields.get(1))));
-        found[1] = true;
-      } else {
-        throw new IllegalArgumentException("Unknown key " + str);
-      }
-    }
-    assertEquals(true, found[0]);
-    assertEquals(true, found[1]);
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(true, getBoolean(batch, 0));
+    assertEquals(100, getByte(batch, 0));
+    assertEquals(2048, getShort(batch, 0));
+    assertEquals(65536, getInt(batch, 0));
+    assertEquals(Long.MAX_VALUE, getLong(batch, 0));
+    assertEquals(2.0, getFloat(batch, 0), 0.00001);
+    assertEquals(-5.0, getDouble(batch, 0), 0.00001);
+    assertEquals(bytes(), getBinary(batch, 0));
+    assertEquals("bye", getText(batch, 0).toString());
+    assertEquals(false, middle.isNull[0]);
+    assertEquals(2, midList.lengths[0]);
+    start = (int) midList.offsets[0];
+    assertEquals(1, midListInt.vector[start]);
+    assertEquals("bye", midListStr.toString(start));
+    assertEquals(2, midListInt.vector[start + 1]);
+    assertEquals("sigh", midListStr.toString(start + 1));
+    assertEquals(3, list.lengths[0]);
+    start = (int) list.offsets[0];
+    assertEquals(100000000, listInts.vector[start]);
+    assertEquals("cat", listStrs.toString(start));
+    assertEquals(-100000, listInts.vector[start + 1]);
+    assertEquals("in", listStrs.toString(start + 1));
+    assertEquals(1234, listInts.vector[start + 2]);
+    assertEquals("hat", listStrs.toString(start + 2));
+    assertEquals(2, map.lengths[0]);
+    start = (int) map.offsets[0];
+    assertEquals("chani", mapKey.toString(start));
+    assertEquals(5, mapValueInts.vector[start]);
+    assertEquals("chani", mapValueStrs.toString(start));
+    assertEquals("mauddib", mapKey.toString(start + 1));
+    assertEquals(1, mapValueInts.vector[start + 1]);
+    assertEquals("mauddib", mapValueStrs.toString(start + 1));
     assertEquals(Timestamp.valueOf("2000-03-12 15:00:01"),
-        tso.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
-            fields.get(12))));
-    assertEquals(HiveDecimal.create("12345678.6547457"),
-        dco.getPrimitiveJavaObject(readerInspector.getStructFieldData(row,
-            fields.get(13))));
+        timestamp.asScratchTimestamp(0));
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("12345678.6547457")),
+        decs.vector[0]);
 
     // handle the close up
-    assertEquals(false, rows.hasNext());
+    assertEquals(false, rows.nextBatch(batch));
     rows.close();
   }
 
   @Test
   public void testTimestamp() throws Exception {
-    ObjectInspector inspector;
-    synchronized (TestVectorOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector(Timestamp.class,
-          ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
     TypeDescription schema = TypeDescription.createTimestamp();
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
@@ -533,11 +377,15 @@ public class TestVectorOrcFile {
 
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
-    RecordReader rows = reader.rows(null);
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    TimestampColumnVector timestamps = (TimestampColumnVector) batch.cols[0];
     int idx = 0;
-    while (rows.hasNext()) {
-      Object row = rows.next(null);
-      assertEquals(tslist.get(idx++).getNanos(), ((TimestampWritable) row).getNanos());
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(tslist.get(idx++).getNanos(),
+            timestamps.asScratchTimestamp(r).getNanos());
+      }
     }
     assertEquals(tslist.size(), rows.getRowNumber());
     assertEquals(0, writer.getSchema().getMaximumId());
@@ -608,50 +456,28 @@ public class TestVectorOrcFile {
         stats[2].toString());
 
     // check the inspectors
-    StructObjectInspector readerInspector =
-        (StructObjectInspector) reader.getObjectInspector();
-    assertEquals(ObjectInspector.Category.STRUCT,
-        readerInspector.getCategory());
-    assertEquals("struct<bytes1:binary,string1:string>",
-        readerInspector.getTypeName());
-    List<? extends StructField> fields =
-        readerInspector.getAllStructFieldRefs();
-    BinaryObjectInspector bi = (BinaryObjectInspector) readerInspector.
-        getStructFieldRef("bytes1").getFieldObjectInspector();
-    StringObjectInspector st = (StringObjectInspector) readerInspector.
-        getStructFieldRef("string1").getFieldObjectInspector();
+    batch = reader.getSchema().createRowBatch();
+    BytesColumnVector bytes = (BytesColumnVector) batch.cols[0];
+    BytesColumnVector strs = (BytesColumnVector) batch.cols[1];
     RecordReader rows = reader.rows();
-    Object row = rows.next(null);
-    assertNotNull(row);
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(4, batch.size);
+
     // check the contents of the first row
-    assertEquals(bytes(0,1,2,3,4), bi.getPrimitiveWritableObject(
-        readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals("foo", st.getPrimitiveJavaObject(readerInspector.
-        getStructFieldData(row, fields.get(1))));
+    assertEquals(bytes(0,1,2,3,4), getBinary(bytes, 0));
+    assertEquals("foo", strs.toString(0));
 
     // check the contents of second row
-    assertEquals(true, rows.hasNext());
-    row = rows.next(row);
-    assertEquals(bytes(0,1,2,3), bi.getPrimitiveWritableObject(
-        readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals("bar", st.getPrimitiveJavaObject(readerInspector.
-        getStructFieldData(row, fields.get(1))));
+    assertEquals(bytes(0,1,2,3), getBinary(bytes, 1));
+    assertEquals("bar", strs.toString(1));
 
     // check the contents of third row
-    assertEquals(true, rows.hasNext());
-    row = rows.next(row);
-    assertEquals(bytes(0,1,2,3,4,5), bi.getPrimitiveWritableObject(
-        readerInspector.getStructFieldData(row, fields.get(0))));
-    assertNull(st.getPrimitiveJavaObject(readerInspector.
-        getStructFieldData(row, fields.get(1))));
+    assertEquals(bytes(0,1,2,3,4,5), getBinary(bytes, 2));
+    assertNull(strs.toString(2));
 
     // check the contents of fourth row
-    assertEquals(true, rows.hasNext());
-    row = rows.next(row);
-    assertNull(bi.getPrimitiveWritableObject(
-        readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals("hi", st.getPrimitiveJavaObject(readerInspector.
-        getStructFieldData(row, fields.get(1))));
+    assertNull(getBinary(bytes, 3));
+    assertEquals("hi", strs.toString(3));
 
     // handle the close up
     assertEquals(false, rows.hasNext());
@@ -767,6 +593,19 @@ public class TestVectorOrcFile {
     }
   }
 
+  private static void checkInner(StructColumnVector inner, int rowId,
+                                 int rowInBatch, int i, String value) {
+    assertEquals("row " + rowId, i,
+        ((LongColumnVector) inner.fields[0]).vector[rowInBatch]);
+    if (value != null) {
+      assertEquals("row " + rowId, value,
+          ((BytesColumnVector) inner.fields[1]).toString(rowInBatch));
+    } else {
+      assertEquals("row " + rowId, true, inner.fields[1].isNull[rowInBatch]);
+      assertEquals("row " + rowId, false, inner.fields[1].noNulls);
+    }
+  }
+
   private static void setInnerList(ListColumnVector list, int rowId,
                                    List<InnerStruct> value) {
     if (value != null) {
@@ -787,6 +626,23 @@ public class TestVectorOrcFile {
     }
   }
 
+  private static void checkInnerList(ListColumnVector list, int rowId,
+                                     int rowInBatch, List<InnerStruct> value) {
+    if (value != null) {
+      assertEquals("row " + rowId, value.size(), list.lengths[rowInBatch]);
+      int start = (int) list.offsets[rowInBatch];
+      for (int i = 0; i < list.lengths[rowInBatch]; ++i) {
+        InnerStruct inner = value.get(i);
+        checkInner((StructColumnVector) list.child, rowId, i + start,
+            inner.int1, inner.string1.toString());
+      }
+      list.childCount += value.size();
+    } else {
+      assertEquals("row " + rowId, true, list.isNull[rowInBatch]);
+      assertEquals("row " + rowId, false, list.noNulls);
+    }
+  }
+
   private static void setInnerMap(MapColumnVector map, int rowId,
                                   Map<String, InnerStruct> value) {
     if (value != null) {
@@ -812,6 +668,24 @@ public class TestVectorOrcFile {
     }
   }
 
+  private static void checkInnerMap(MapColumnVector map, int rowId,
+                                    int rowInBatch,
+                                    Map<String, InnerStruct> value) {
+    if (value != null) {
+      assertEquals("row " + rowId, value.size(), map.lengths[rowInBatch]);
+      int offset = (int) map.offsets[rowInBatch];
+      for(int i=0; i < value.size(); ++i) {
+        String key = ((BytesColumnVector) map.keys).toString(offset + i);
+        InnerStruct expected = value.get(key);
+        checkInner((StructColumnVector) map.values, rowId, offset + i,
+            expected.int1, expected.string1.toString());
+      }
+    } else {
+      assertEquals("row " + rowId, true, map.isNull[rowId]);
+      assertEquals("row " + rowId, false, map.noNulls);
+    }
+  }
+
   private static void setMiddleStruct(StructColumnVector middle, int rowId,
                                       MiddleStruct value) {
     if (value != null) {
@@ -822,6 +696,17 @@ public class TestVectorOrcFile {
     }
   }
 
+  private static void checkMiddleStruct(StructColumnVector middle, int rowId,
+                                        int rowInBatch, MiddleStruct value) {
+    if (value != null) {
+      checkInnerList((ListColumnVector) middle.fields[0], rowId, rowInBatch,
+          value.list);
+    } else {
+      assertEquals("row " + rowId, true, middle.isNull[rowInBatch]);
+      assertEquals("row " + rowId, false, middle.noNulls);
+    }
+  }
+
   private static void setBigRow(VectorizedRowBatch batch, int rowId,
                                 Boolean b1, Byte b2, Short s1,
                                 Integer i1, Long l1, Float f1,
@@ -853,6 +738,160 @@ public class TestVectorOrcFile {
     setInnerMap((MapColumnVector) batch.cols[11], rowId, m2);
   }
 
+  private static void checkBigRow(VectorizedRowBatch batch,
+                                  int rowInBatch,
+                                  int rowId,
+                                  boolean b1, byte b2, short s1,
+                                  int i1, long l1, float f1,
+                                  double d1, BytesWritable b3, String s2,
+                                  MiddleStruct m1, List<InnerStruct> l2,
+                                  Map<String, InnerStruct> m2) {
+    assertEquals("row " + rowId, b1, getBoolean(batch, rowInBatch));
+    assertEquals("row " + rowId, b2, getByte(batch, rowInBatch));
+    assertEquals("row " + rowId, s1, getShort(batch, rowInBatch));
+    assertEquals("row " + rowId, i1, getInt(batch, rowInBatch));
+    assertEquals("row " + rowId, l1, getLong(batch, rowInBatch));
+    assertEquals("row " + rowId, f1, getFloat(batch, rowInBatch), 0.0001);
+    assertEquals("row " + rowId, d1, getDouble(batch, rowInBatch), 0.0001);
+    if (b3 != null) {
+      BytesColumnVector bytes = (BytesColumnVector) batch.cols[7];
+      assertEquals("row " + rowId, b3.getLength(), bytes.length[rowInBatch]);
+      for(int i=0; i < b3.getLength(); ++i) {
+        assertEquals("row " + rowId + " byte " + i, b3.getBytes()[i],
+            bytes.vector[rowInBatch][bytes.start[rowInBatch] + i]);
+      }
+    } else {
+      assertEquals("row " + rowId, true, batch.cols[7].isNull[rowInBatch]);
+      assertEquals("row " + rowId, false, batch.cols[7].noNulls);
+    }
+    if (s2 != null) {
+      assertEquals("row " + rowId, s2, getText(batch, rowInBatch).toString());
+    } else {
+      assertEquals("row " + rowId, true, batch.cols[8].isNull[rowInBatch]);
+      assertEquals("row " + rowId, false, batch.cols[8].noNulls);
+    }
+    checkMiddleStruct((StructColumnVector) batch.cols[9], rowId, rowInBatch,
+        m1);
+    checkInnerList((ListColumnVector) batch.cols[10], rowId, rowInBatch, l2);
+    checkInnerMap((MapColumnVector) batch.cols[11], rowId, rowInBatch, m2);
+  }
+
+  private static boolean getBoolean(VectorizedRowBatch batch, int rowId) {
+    return ((LongColumnVector) batch.cols[0]).vector[rowId] != 0;
+  }
+
+  private static byte getByte(VectorizedRowBatch batch, int rowId) {
+    return (byte) ((LongColumnVector) batch.cols[1]).vector[rowId];
+  }
+
+  private static short getShort(VectorizedRowBatch batch, int rowId) {
+    return (short) ((LongColumnVector) batch.cols[2]).vector[rowId];
+  }
+
+  private static int getInt(VectorizedRowBatch batch, int rowId) {
+    return (int) ((LongColumnVector) batch.cols[3]).vector[rowId];
+  }
+
+  private static long getLong(VectorizedRowBatch batch, int rowId) {
+    return ((LongColumnVector) batch.cols[4]).vector[rowId];
+  }
+
+  private static float getFloat(VectorizedRowBatch batch, int rowId) {
+    return (float) ((DoubleColumnVector) batch.cols[5]).vector[rowId];
+  }
+
+  private static double getDouble(VectorizedRowBatch batch, int rowId) {
+    return ((DoubleColumnVector) batch.cols[6]).vector[rowId];
+  }
+
+  private static BytesWritable getBinary(BytesColumnVector column, int rowId) {
+    if (column.isRepeating) {
+      rowId = 0;
+    }
+    if (column.noNulls || !column.isNull[rowId]) {
+      return new BytesWritable(Arrays.copyOfRange(column.vector[rowId],
+          column.start[rowId], column.start[rowId] + column.length[rowId]));
+    } else {
+      return null;
+    }
+  }
+
+  private static BytesWritable getBinary(VectorizedRowBatch batch, int rowId) {
+    return getBinary((BytesColumnVector) batch.cols[7], rowId);
+  }
+
+  private static Text getText(BytesColumnVector vector, int rowId) {
+    if (vector.isRepeating) {
+      rowId = 0;
+    }
+    if (vector.noNulls || !vector.isNull[rowId]) {
+      return new Text(Arrays.copyOfRange(vector.vector[rowId],
+          vector.start[rowId], vector.start[rowId] + vector.length[rowId]));
+    } else {
+      return null;
+    }
+  }
+
+  private static Text getText(VectorizedRowBatch batch, int rowId) {
+    return getText((BytesColumnVector) batch.cols[8], rowId);
+  }
+
+  private static InnerStruct getInner(StructColumnVector vector,
+                                      int rowId) {
+    return new InnerStruct(
+        (int) ((LongColumnVector) vector.fields[0]).vector[rowId],
+        getText((BytesColumnVector) vector.fields[1], rowId));
+  }
+
+  private static List<InnerStruct> getList(ListColumnVector cv,
+                                           int rowId) {
+    if (cv.isRepeating) {
+      rowId = 0;
+    }
+    if (cv.noNulls || !cv.isNull[rowId]) {
+      List<InnerStruct> result =
+          new ArrayList<InnerStruct>((int) cv.lengths[rowId]);
+      for(long i=cv.offsets[rowId];
+          i < cv.offsets[rowId] + cv.lengths[rowId]; ++i) {
+        result.add(getInner((StructColumnVector) cv.child, (int) i));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  private static List<InnerStruct> getMidList(VectorizedRowBatch batch,
+                                              int rowId) {
+    return getList((ListColumnVector) ((StructColumnVector) batch.cols[9])
+        .fields[0], rowId);
+  }
+
+  private static List<InnerStruct> getList(VectorizedRowBatch batch,
+                                           int rowId) {
+    return getList((ListColumnVector) batch.cols[10], rowId);
+  }
+
+  private static Map<Text, InnerStruct> getMap(VectorizedRowBatch batch,
+                                               int rowId) {
+    MapColumnVector cv = (MapColumnVector) batch.cols[11];
+    if (cv.isRepeating) {
+      rowId = 0;
+    }
+    if (cv.noNulls || !cv.isNull[rowId]) {
+      Map<Text, InnerStruct> result =
+          new HashMap<Text, InnerStruct>((int) cv.lengths[rowId]);
+      for(long i=cv.offsets[rowId];
+          i < cv.offsets[rowId] + cv.lengths[rowId]; ++i) {
+        result.put(getText((BytesColumnVector) cv.keys, (int) i),
+            getInner((StructColumnVector) cv.values, (int) i));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
   private static TypeDescription createInnerSchema() {
     return TypeDescription.createStruct()
         .addField("int1", TypeDescription.createInt())
@@ -981,178 +1020,114 @@ public class TestVectorOrcFile {
 
     assertEquals("count: 2 hasNull: false min: bye max: hi sum: 5", stats[9].toString());
 
-    // check the inspectors
-    StructObjectInspector readerInspector =
-        (StructObjectInspector) reader.getObjectInspector();
-    assertEquals(ObjectInspector.Category.STRUCT,
-        readerInspector.getCategory());
+    // check the schema
+    TypeDescription readerSchema = reader.getSchema();
+    assertEquals(TypeDescription.Category.STRUCT, readerSchema.getCategory());
     assertEquals("struct<boolean1:boolean,byte1:tinyint,short1:smallint,"
         + "int1:int,long1:bigint,float1:float,double1:double,bytes1:"
         + "binary,string1:string,middle:struct<list:array<struct<int1:int,"
         + "string1:string>>>,list:array<struct<int1:int,string1:string>>,"
         + "map:map<string,struct<int1:int,string1:string>>>",
-        readerInspector.getTypeName());
-    List<? extends StructField> fields =
-        readerInspector.getAllStructFieldRefs();
-    BooleanObjectInspector bo = (BooleanObjectInspector) readerInspector.
-        getStructFieldRef("boolean1").getFieldObjectInspector();
-    ByteObjectInspector by = (ByteObjectInspector) readerInspector.
-        getStructFieldRef("byte1").getFieldObjectInspector();
-    ShortObjectInspector sh = (ShortObjectInspector) readerInspector.
-        getStructFieldRef("short1").getFieldObjectInspector();
-    IntObjectInspector in = (IntObjectInspector) readerInspector.
-        getStructFieldRef("int1").getFieldObjectInspector();
-    LongObjectInspector lo = (LongObjectInspector) readerInspector.
-        getStructFieldRef("long1").getFieldObjectInspector();
-    FloatObjectInspector fl = (FloatObjectInspector) readerInspector.
-        getStructFieldRef("float1").getFieldObjectInspector();
-    DoubleObjectInspector dbl = (DoubleObjectInspector) readerInspector.
-        getStructFieldRef("double1").getFieldObjectInspector();
-    BinaryObjectInspector bi = (BinaryObjectInspector) readerInspector.
-        getStructFieldRef("bytes1").getFieldObjectInspector();
-    StringObjectInspector st = (StringObjectInspector) readerInspector.
-        getStructFieldRef("string1").getFieldObjectInspector();
-    StructObjectInspector mid = (StructObjectInspector) readerInspector.
-        getStructFieldRef("middle").getFieldObjectInspector();
-    List<? extends StructField> midFields =
-        mid.getAllStructFieldRefs();
-    ListObjectInspector midli =
-        (ListObjectInspector) midFields.get(0).getFieldObjectInspector();
-    StructObjectInspector inner = (StructObjectInspector)
-        midli.getListElementObjectInspector();
-    List<? extends StructField> inFields = inner.getAllStructFieldRefs();
-    ListObjectInspector li = (ListObjectInspector) readerInspector.
-        getStructFieldRef("list").getFieldObjectInspector();
-    MapObjectInspector ma = (MapObjectInspector) readerInspector.
-        getStructFieldRef("map").getFieldObjectInspector();
-    StringObjectInspector mk = (StringObjectInspector)
-        ma.getMapKeyObjectInspector();
+        readerSchema.toString());
+    List<String> fieldNames = readerSchema.getFieldNames();
+    List<TypeDescription> fieldTypes = readerSchema.getChildren();
+    assertEquals("boolean1", fieldNames.get(0));
+    assertEquals(TypeDescription.Category.BOOLEAN, fieldTypes.get(0).getCategory());
+    assertEquals("byte1", fieldNames.get(1));
+    assertEquals(TypeDescription.Category.BYTE, fieldTypes.get(1).getCategory());
+    assertEquals("short1", fieldNames.get(2));
+    assertEquals(TypeDescription.Category.SHORT, fieldTypes.get(2).getCategory());
+    assertEquals("int1", fieldNames.get(3));
+    assertEquals(TypeDescription.Category.INT, fieldTypes.get(3).getCategory());
+    assertEquals("long1", fieldNames.get(4));
+    assertEquals(TypeDescription.Category.LONG, fieldTypes.get(4).getCategory());
+    assertEquals("float1", fieldNames.get(5));
+    assertEquals(TypeDescription.Category.FLOAT, fieldTypes.get(5).getCategory());
+    assertEquals("double1", fieldNames.get(6));
+    assertEquals(TypeDescription.Category.DOUBLE, fieldTypes.get(6).getCategory());
+    assertEquals("bytes1", fieldNames.get(7));
+    assertEquals(TypeDescription.Category.BINARY, fieldTypes.get(7).getCategory());
+    assertEquals("string1", fieldNames.get(8));
+    assertEquals(TypeDescription.Category.STRING, fieldTypes.get(8).getCategory());
+    assertEquals("middle", fieldNames.get(9));
+    TypeDescription middle = fieldTypes.get(9);
+    assertEquals(TypeDescription.Category.STRUCT, middle.getCategory());
+    TypeDescription midList = middle.getChildren().get(0);
+    assertEquals(TypeDescription.Category.LIST, midList.getCategory());
+    TypeDescription inner = midList.getChildren().get(0);
+    assertEquals(TypeDescription.Category.STRUCT, inner.getCategory());
+    assertEquals("int1", inner.getFieldNames().get(0));
+    assertEquals("string1", inner.getFieldNames().get(1));
+
     RecordReader rows = reader.rows();
-    Object row = rows.next(null);
-    assertNotNull(row);
+    // create a new batch
+    batch = readerSchema.createRowBatch();
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(2, batch.size);
+    assertEquals(false, rows.hasNext());
+
     // check the contents of the first row
-    assertEquals(false,
-        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals(1, by.get(readerInspector.getStructFieldData(row,
-        fields.get(1))));
-    assertEquals(1024, sh.get(readerInspector.getStructFieldData(row,
-        fields.get(2))));
-    assertEquals(65536, in.get(readerInspector.getStructFieldData(row,
-        fields.get(3))));
-    assertEquals(Long.MAX_VALUE, lo.get(readerInspector.
-        getStructFieldData(row, fields.get(4))));
-    assertEquals(1.0, fl.get(readerInspector.getStructFieldData(row,
-        fields.get(5))), 0.00001);
-    assertEquals(-15.0, dbl.get(readerInspector.getStructFieldData(row,
-        fields.get(6))), 0.00001);
-    assertEquals(bytes(0,1,2,3,4), bi.getPrimitiveWritableObject(
-        readerInspector.getStructFieldData(row, fields.get(7))));
-    assertEquals("hi", st.getPrimitiveJavaObject(readerInspector.
-        getStructFieldData(row, fields.get(8))));
-    List<?> midRow = midli.getList(mid.getStructFieldData(readerInspector.
-        getStructFieldData(row, fields.get(9)), midFields.get(0)));
+    assertEquals(false, getBoolean(batch, 0));
+    assertEquals(1, getByte(batch, 0));
+    assertEquals(1024, getShort(batch, 0));
+    assertEquals(65536, getInt(batch, 0));
+    assertEquals(Long.MAX_VALUE, getLong(batch, 0));
+    assertEquals(1.0, getFloat(batch, 0), 0.00001);
+    assertEquals(-15.0, getDouble(batch, 0), 0.00001);
+    assertEquals(bytes(0,1,2,3,4), getBinary(batch, 0));
+    assertEquals("hi", getText(batch, 0).toString());
+    List<InnerStruct> midRow = getMidList(batch, 0);
     assertNotNull(midRow);
     assertEquals(2, midRow.size());
-    assertEquals(1, in.get(inner.getStructFieldData(midRow.get(0),
-        inFields.get(0))));
-    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (midRow.get(0), inFields.get(1))));
-    assertEquals(2, in.get(inner.getStructFieldData(midRow.get(1),
-        inFields.get(0))));
-    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (midRow.get(1), inFields.get(1))));
-    List<?> list = li.getList(readerInspector.getStructFieldData(row,
-        fields.get(10)));
+    assertEquals(1, midRow.get(0).int1);
+    assertEquals("bye", midRow.get(0).string1.toString());
+    assertEquals(2, midRow.get(1).int1);
+    assertEquals("sigh", midRow.get(1).string1.toString());
+    List<InnerStruct> list = getList(batch, 0);
     assertEquals(2, list.size());
-    assertEquals(3, in.get(inner.getStructFieldData(list.get(0),
-        inFields.get(0))));
-    assertEquals("good", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (list.get(0), inFields.get(1))));
-    assertEquals(4, in.get(inner.getStructFieldData(list.get(1),
-        inFields.get(0))));
-    assertEquals("bad", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (list.get(1), inFields.get(1))));
-    Map<?,?> map = ma.getMap(readerInspector.getStructFieldData(row,
-        fields.get(11)));
+    assertEquals(3, list.get(0).int1);
+    assertEquals("good", list.get(0).string1.toString());
+    assertEquals(4, list.get(1).int1);
+    assertEquals("bad", list.get(1).string1.toString());
+    Map<Text, InnerStruct> map = getMap(batch, 0);
     assertEquals(0, map.size());
 
     // check the contents of second row
-    assertEquals(true, rows.hasNext());
-    row = rows.next(row);
-    assertEquals(true,
-        bo.get(readerInspector.getStructFieldData(row, fields.get(0))));
-    assertEquals(100, by.get(readerInspector.getStructFieldData(row,
-        fields.get(1))));
-    assertEquals(2048, sh.get(readerInspector.getStructFieldData(row,
-        fields.get(2))));
-    assertEquals(65536, in.get(readerInspector.getStructFieldData(row,
-        fields.get(3))));
-    assertEquals(Long.MAX_VALUE, lo.get(readerInspector.
-        getStructFieldData(row, fields.get(4))));
-    assertEquals(2.0, fl.get(readerInspector.getStructFieldData(row,
-        fields.get(5))), 0.00001);
-    assertEquals(-5.0, dbl.get(readerInspector.getStructFieldData(row,
-        fields.get(6))), 0.00001);
-    assertEquals(bytes(), bi.getPrimitiveWritableObject(
-        readerInspector.getStructFieldData(row, fields.get(7))));
-    assertEquals("bye", st.getPrimitiveJavaObject(readerInspector.
-        getStructFieldData(row, fields.get(8))));
-    midRow = midli.getList(mid.getStructFieldData(readerInspector.
-        getStructFieldData(row, fields.get(9)), midFields.get(0)));
+    assertEquals(true, getBoolean(batch, 1));
+    assertEquals(100, getByte(batch, 1));
+    assertEquals(2048, getShort(batch, 1));
+    assertEquals(65536, getInt(batch, 1));
+    assertEquals(Long.MAX_VALUE, getLong(batch, 1));
+    assertEquals(2.0, getFloat(batch, 1), 0.00001);
+    assertEquals(-5.0, getDouble(batch, 1), 0.00001);
+    assertEquals(bytes(), getBinary(batch, 1));
+    assertEquals("bye", getText(batch, 1).toString());
+    midRow = getMidList(batch, 1);
     assertNotNull(midRow);
     assertEquals(2, midRow.size());
-    assertEquals(1, in.get(inner.getStructFieldData(midRow.get(0),
-        inFields.get(0))));
-    assertEquals("bye", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (midRow.get(0), inFields.get(1))));
-    assertEquals(2, in.get(inner.getStructFieldData(midRow.get(1),
-        inFields.get(0))));
-    assertEquals("sigh", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (midRow.get(1), inFields.get(1))));
-    list = li.getList(readerInspector.getStructFieldData(row,
-        fields.get(10)));
+    assertEquals(1, midRow.get(0).int1);
+    assertEquals("bye", midRow.get(0).string1.toString());
+    assertEquals(2, midRow.get(1).int1);
+    assertEquals("sigh", midRow.get(1).string1.toString());
+    list = getList(batch, 1);
     assertEquals(3, list.size());
-    assertEquals(100000000, in.get(inner.getStructFieldData(list.get(0),
-        inFields.get(0))));
-    assertEquals("cat", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (list.get(0), inFields.get(1))));
-    assertEquals(-100000, in.get(inner.getStructFieldData(list.get(1),
-        inFields.get(0))));
-    assertEquals("in", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (list.get(1), inFields.get(1))));
-    assertEquals(1234, in.get(inner.getStructFieldData(list.get(2),
-        inFields.get(0))));
-    assertEquals("hat", st.getPrimitiveJavaObject(inner.getStructFieldData
-        (list.get(2), inFields.get(1))));
-    map = ma.getMap(readerInspector.getStructFieldData(row,
-        fields.get(11)));
+    assertEquals(100000000, list.get(0).int1);
+    assertEquals("cat", list.get(0).string1.toString());
+    assertEquals(-100000, list.get(1).int1);
+    assertEquals("in", list.get(1).string1.toString());
+    assertEquals(1234, list.get(2).int1);
+    assertEquals("hat", list.get(2).string1.toString());
+    map = getMap(batch, 1);
     assertEquals(2, map.size());
-    boolean[] found = new boolean[2];
-    for(Object key: map.keySet()) {
-      String str = mk.getPrimitiveJavaObject(key);
-      if (str.equals("chani")) {
-        assertEquals(false, found[0]);
-        assertEquals(5, in.get(inner.getStructFieldData(map.get(key),
-            inFields.get(0))));
-        assertEquals(str, st.getPrimitiveJavaObject(
-            inner.getStructFieldData(map.get(key), inFields.get(1))));
-        found[0] = true;
-      } else if (str.equals("mauddib")) {
-        assertEquals(false, found[1]);
-        assertEquals(1, in.get(inner.getStructFieldData(map.get(key),
-            inFields.get(0))));
-        assertEquals(str, st.getPrimitiveJavaObject(
-            inner.getStructFieldData(map.get(key), inFields.get(1))));
-        found[1] = true;
-      } else {
-        throw new IllegalArgumentException("Unknown key " + str);
-      }
-    }
-    assertEquals(true, found[0]);
-    assertEquals(true, found[1]);
+    InnerStruct value = map.get(new Text("chani"));
+    assertEquals(5, value.int1);
+    assertEquals("chani", value.string1.toString());
+    value = map.get(new Text("mauddib"));
+    assertEquals(1, value.int1);
+    assertEquals("mauddib", value.string1.toString());
 
     // handle the close up
-    assertEquals(false, rows.hasNext());
+    assertEquals(false, rows.nextBatch(batch));
     rows.close();
   }
 
@@ -1216,35 +1191,36 @@ public class TestVectorOrcFile {
     }
 
     // check out the types
-    List<OrcProto.Type> types = reader.getTypes();
-    assertEquals(3, types.size());
-    assertEquals(OrcProto.Type.Kind.STRUCT, types.get(0).getKind());
-    assertEquals(2, types.get(0).getSubtypesCount());
-    assertEquals(1, types.get(0).getSubtypes(0));
-    assertEquals(2, types.get(0).getSubtypes(1));
-    assertEquals(OrcProto.Type.Kind.INT, types.get(1).getKind());
-    assertEquals(0, types.get(1).getSubtypesCount());
-    assertEquals(OrcProto.Type.Kind.STRING, types.get(2).getKind());
-    assertEquals(0, types.get(2).getSubtypesCount());
+    TypeDescription type = reader.getSchema();
+    assertEquals(TypeDescription.Category.STRUCT, type.getCategory());
+    assertEquals(2, type.getChildren().size());
+    TypeDescription type1 = type.getChildren().get(0);
+    TypeDescription type2 = type.getChildren().get(1);
+    assertEquals(TypeDescription.Category.INT, type1.getCategory());
+    assertEquals(TypeDescription.Category.STRING, type2.getCategory());
+    assertEquals("struct<int1:int,string1:string>", type.toString());
 
     // read the contents and make sure they match
     RecordReader rows1 = reader.rows(new boolean[]{true, true, false});
     RecordReader rows2 = reader.rows(new boolean[]{true, false, true});
     r1 = new Random(1);
     r2 = new Random(2);
-    OrcStruct row1 = null;
-    OrcStruct row2 = null;
-    for(int i = 0; i < 21000; ++i) {
-      assertEquals(true, rows1.hasNext());
-      assertEquals(true, rows2.hasNext());
-      row1 = (OrcStruct) rows1.next(row1);
-      row2 = (OrcStruct) rows2.next(row2);
-      assertEquals(r1.nextInt(), ((IntWritable) row1.getFieldValue(0)).get());
-      assertEquals(Long.toHexString(r2.nextLong()),
-          row2.getFieldValue(1).toString());
-    }
-    assertEquals(false, rows1.hasNext());
-    assertEquals(false, rows2.hasNext());
+    VectorizedRowBatch batch1 = reader.getSchema().createRowBatch(1000);
+    VectorizedRowBatch batch2 = reader.getSchema().createRowBatch(1000);
+    for(int i = 0; i < 21000; i += 1000) {
+      assertEquals(true, rows1.nextBatch(batch1));
+      assertEquals(true, rows2.nextBatch(batch2));
+      assertEquals(1000, batch1.size);
+      assertEquals(1000, batch2.size);
+      for(int j=0; j < 1000; ++j) {
+        assertEquals(r1.nextInt(),
+            ((LongColumnVector) batch1.cols[0]).vector[j]);
+        assertEquals(Long.toHexString(r2.nextLong()),
+            ((BytesColumnVector) batch2.cols[1]).toString(j));
+      }
+    }
+    assertEquals(false, rows1.nextBatch(batch1));
+    assertEquals(false, rows2.nextBatch(batch2));
     rows1.close();
     rows2.close();
   }
@@ -1355,17 +1331,33 @@ public class TestVectorOrcFile {
     Reader reader = OrcFile.createReader(file,
         OrcFile.readerOptions(conf));
     RecordReader rows = reader.rows();
-    OrcStruct row = null;
+    batch = reader.getSchema().createRowBatch(1000);
+    TimestampColumnVector times = (TimestampColumnVector) batch.cols[0];
+    LongColumnVector dates = (LongColumnVector) batch.cols[1];
     for (int year = minYear; year < maxYear; ++year) {
+      rows.nextBatch(batch);
+      assertEquals(1000, batch.size);
       for(int ms = 1000; ms < 2000; ++ms) {
-        row = (OrcStruct) rows.next(row);
-        assertEquals(new TimestampWritable
-                (Timestamp.valueOf(year + "-05-05 12:34:56." + ms)),
-            row.getFieldValue(0));
-        assertEquals(new DateWritable(new Date(year - 1900, 11, 25)),
-            row.getFieldValue(1));
+        StringBuilder buffer = new StringBuilder();
+        times.stringifyValue(buffer, ms - 1000);
+        String expected = Integer.toString(year) + "-05-05 12:34:56.";
+        // suppress the final zeros on the string by dividing by the largest
+        // power of 10 that divides evenly.
+        int roundedMs = ms;
+        for(int round = 1000; round > 0; round /= 10) {
+          if (ms % round == 0) {
+            roundedMs = ms / round;
+            break;
+          }
+        }
+        expected += roundedMs;
+        assertEquals(expected, buffer.toString());
+        assertEquals(Integer.toString(year) + "-12-25",
+            new DateWritable((int) dates.vector[ms - 1000]).toString());
       }
     }
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
   }
 
   @Test
@@ -1483,6 +1475,7 @@ public class TestVectorOrcFile {
     for(int c=0; c < batch.cols.length; ++c) {
       batch.cols[c].setRepeating(true);
     }
+    ((UnionColumnVector) batch.cols[1]).fields[0].isRepeating = true;
     setUnion(batch, 0, null, 0, 1732050807, null, null);
     for(int i=0; i < 5; ++i) {
       writer.addRowBatch(batch);
@@ -1540,83 +1533,115 @@ public class TestVectorOrcFile {
     RecordReader rows = reader.rows();
     assertEquals(0, rows.getRowNumber());
     assertEquals(0.0, rows.getProgress(), 0.000001);
-    assertEquals(true, rows.hasNext());
-    OrcStruct row = (OrcStruct) rows.next(null);
-    assertEquals(1, rows.getRowNumber());
-    ObjectInspector inspector = reader.getObjectInspector();
+
+    schema = reader.getSchema();
+    batch = schema.createRowBatch(74);
+    assertEquals(0, rows.getRowNumber());
+    rows.nextBatch(batch);
+    assertEquals(74, batch.size);
+    assertEquals(74, rows.getRowNumber());
+    TimestampColumnVector ts = (TimestampColumnVector) batch.cols[0];
+    UnionColumnVector union = (UnionColumnVector) batch.cols[1];
+    LongColumnVector longs = (LongColumnVector) union.fields[0];
+    BytesColumnVector strs = (BytesColumnVector) union.fields[1];
+    DecimalColumnVector decs = (DecimalColumnVector) batch.cols[2];
+
     assertEquals("struct<time:timestamp,union:uniontype<int,string>,decimal:decimal(38,18)>",
-        inspector.getTypeName());
-    assertEquals(new TimestampWritable(Timestamp.valueOf("2000-03-12 15:00:00")),
-        row.getFieldValue(0));
-    OrcUnion union = (OrcUnion) row.getFieldValue(1);
-    assertEquals(0, union.getTag());
-    assertEquals(new IntWritable(42), union.getObject());
-    assertEquals(new HiveDecimalWritable(HiveDecimal.create("12345678.6547456")),
-        row.getFieldValue(2));
-    row = (OrcStruct) rows.next(row);
-    assertEquals(2, rows.getRowNumber());
-    assertEquals(new TimestampWritable(Timestamp.valueOf("2000-03-20 12:00:00.123456789")),
-        row.getFieldValue(0));
-    assertEquals(1, union.getTag());
-    assertEquals(new Text("hello"), union.getObject());
-    assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")),
-        row.getFieldValue(2));
-    row = (OrcStruct) rows.next(row);
-    assertEquals(null, row.getFieldValue(0));
-    assertEquals(null, row.getFieldValue(1));
-    assertEquals(null, row.getFieldValue(2));
-    row = (OrcStruct) rows.next(row);
-    assertEquals(null, row.getFieldValue(0));
-    union = (OrcUnion) row.getFieldValue(1);
-    assertEquals(0, union.getTag());
-    assertEquals(null, union.getObject());
-    assertEquals(null, row.getFieldValue(2));
-    row = (OrcStruct) rows.next(row);
-    assertEquals(null, row.getFieldValue(0));
-    assertEquals(1, union.getTag());
-    assertEquals(null, union.getObject());
-    assertEquals(null, row.getFieldValue(2));
-    row = (OrcStruct) rows.next(row);
-    assertEquals(new TimestampWritable(Timestamp.valueOf("1970-01-01 00:00:00")),
-        row.getFieldValue(0));
-    assertEquals(new IntWritable(200000), union.getObject());
-    assertEquals(new HiveDecimalWritable(HiveDecimal.create("10000000000000000000")),
-                 row.getFieldValue(2));
+        schema.toString());
+    assertEquals("2000-03-12 15:00:00.0", ts.asScratchTimestamp(0).toString());
+    assertEquals(0, union.tags[0]);
+    assertEquals(42, longs.vector[0]);
+    assertEquals("12345678.6547456", decs.vector[0].toString());
+
+    assertEquals("2000-03-20 12:00:00.123456789", ts.asScratchTimestamp(1).toString());
+    assertEquals(1, union.tags[1]);
+    assertEquals("hello", strs.toString(1));
+    assertEquals("-5643.234", decs.vector[1].toString());
+
+    assertEquals(false, ts.noNulls);
+    assertEquals(false, union.noNulls);
+    assertEquals(false, decs.noNulls);
+    assertEquals(true, ts.isNull[2]);
+    assertEquals(true, union.isNull[2]);
+    assertEquals(true, decs.isNull[2]);
+
+    assertEquals(true, ts.isNull[3]);
+    assertEquals(false, union.isNull[3]);
+    assertEquals(0, union.tags[3]);
+    assertEquals(true, longs.isNull[3]);
+    assertEquals(true, decs.isNull[3]);
+
+    assertEquals(true, ts.isNull[4]);
+    assertEquals(false, union.isNull[4]);
+    assertEquals(1, union.tags[4]);
+    assertEquals(true, strs.isNull[4]);
+    assertEquals(true, decs.isNull[4]);
+
+    assertEquals(false, ts.isNull[5]);
+    assertEquals("1970-01-01 00:00:00.0", ts.asScratchTimestamp(5).toString());
+    assertEquals(false, union.isNull[5]);
+    assertEquals(0, union.tags[5]);
+    assertEquals(false, longs.isNull[5]);
+    assertEquals(200000, longs.vector[5]);
+    assertEquals(false, decs.isNull[5]);
+    assertEquals("10000000000000000000", decs.vector[5].toString());
+
     rand = new Random(42);
     for(int i=1970; i < 2038; ++i) {
-      row = (OrcStruct) rows.next(row);
-      assertEquals(new TimestampWritable(Timestamp.valueOf(i + "-05-05 12:34:56." + i)),
-          row.getFieldValue(0));
+      int row = 6 + i - 1970;
+      assertEquals(Timestamp.valueOf(i + "-05-05 12:34:56." + i),
+          ts.asScratchTimestamp(row));
       if ((i & 1) == 0) {
-        assertEquals(0, union.getTag());
-        assertEquals(new IntWritable(i*i), union.getObject());
+        assertEquals(0, union.tags[row]);
+        assertEquals(i*i, longs.vector[row]);
       } else {
-        assertEquals(1, union.getTag());
-        assertEquals(new Text(Integer.toString(i * i)), union.getObject());
+        assertEquals(1, union.tags[row]);
+        assertEquals(Integer.toString(i * i), strs.toString(row));
       }
       assertEquals(new HiveDecimalWritable(HiveDecimal.create(new BigInteger(64, rand),
-                                   rand.nextInt(18))), row.getFieldValue(2));
-    }
-    for(int i=0; i < 5000; ++i) {
-      row = (OrcStruct) rows.next(row);
-      assertEquals(new IntWritable(1732050807), union.getObject());
-    }
-    row = (OrcStruct) rows.next(row);
-    assertEquals(new IntWritable(0), union.getObject());
-    row = (OrcStruct) rows.next(row);
-    assertEquals(new IntWritable(10), union.getObject());
-    row = (OrcStruct) rows.next(row);
-    assertEquals(new IntWritable(138), union.getObject());
-    assertEquals(false, rows.hasNext());
+                                   rand.nextInt(18))), decs.vector[row]);
+    }
+
+    // rebuild the row batch, so that we can read by 1000 rows
+    batch = schema.createRowBatch(1000);
+    ts = (TimestampColumnVector) batch.cols[0];
+    union = (UnionColumnVector) batch.cols[1];
+    longs = (LongColumnVector) union.fields[0];
+    strs = (BytesColumnVector) union.fields[1];
+    decs = (DecimalColumnVector) batch.cols[2];
+
+    for(int i=0; i < 5; ++i) {
+      rows.nextBatch(batch);
+      assertEquals("batch " + i, 1000, batch.size);
+      assertEquals("batch " + i, false, union.isRepeating);
+      assertEquals("batch " + i, true, union.noNulls);
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals("bad tag at " + i + "." +r, 0, union.tags[r]);
+      }
+      assertEquals("batch " + i, true, longs.isRepeating);
+      assertEquals("batch " + i, 1732050807, longs.vector[0]);
+    }
+
+    rows.nextBatch(batch);
+    assertEquals(3, batch.size);
+    assertEquals(0, union.tags[0]);
+    assertEquals(0, longs.vector[0]);
+    assertEquals(0, union.tags[1]);
+    assertEquals(10, longs.vector[1]);
+    assertEquals(0, union.tags[2]);
+    assertEquals(138, longs.vector[2]);
+
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
     assertEquals(1.0, rows.getProgress(), 0.00001);
     assertEquals(reader.getNumberOfRows(), rows.getRowNumber());
     rows.seekToRow(1);
-    row = (OrcStruct) rows.next(row);
-    assertEquals(new TimestampWritable(Timestamp.valueOf("2000-03-20 12:00:00.123456789")),
-        row.getFieldValue(0));
-    assertEquals(1, union.getTag());
-    assertEquals(new Text("hello"), union.getObject());
-    assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")), row.getFieldValue(2));
+    rows.nextBatch(batch);
+    assertEquals(1000, batch.size);
+    assertEquals(Timestamp.valueOf("2000-03-20 12:00:00.123456789"), ts.asScratchTimestamp(0));
+    assertEquals(1, union.tags[0]);
+    assertEquals("hello", strs.toString(0));
+    assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")), decs.vector[0]);
     rows.close();
   }
 
@@ -1647,17 +1672,22 @@ public class TestVectorOrcFile {
     writer.close();
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
     RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch(1000);
     rand = new Random(12);
-    OrcStruct row = null;
-    for(int i=0; i < 10000; ++i) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      assertEquals(rand.nextInt(), ((IntWritable) row.getFieldValue(0)).get());
-      assertEquals(Integer.toHexString(rand.nextInt()),
-          row.getFieldValue(1).toString());
+    LongColumnVector longs = (LongColumnVector) batch.cols[0];
+    BytesColumnVector strs = (BytesColumnVector) batch.cols[1];
+    for(int b=0; b < 10; ++b) {
+      rows.nextBatch(batch);
+      assertEquals(1000, batch.size);
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(rand.nextInt(), longs.vector[r]);
+        assertEquals(Integer.toHexString(rand.nextInt()), strs.toString(r));
+      }
     }
-    assertEquals(false, rows.hasNext());
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
     rows.close();
   }
 
@@ -1697,18 +1727,23 @@ public class TestVectorOrcFile {
     assertEquals(0, stripe.getIndexLength());
     RecordReader rows = reader.rows();
     rand = new Random(24);
-    OrcStruct row = null;
-    for(int i=0; i < 10000; ++i) {
-      int intVal = rand.nextInt();
-      String strVal = Integer.toBinaryString(rand.nextInt());
-      for(int j=0; j < 5; ++j) {
-        assertEquals(true, rows.hasNext());
-        row = (OrcStruct) rows.next(row);
-        assertEquals(intVal, ((IntWritable) row.getFieldValue(0)).get());
-        assertEquals(strVal, row.getFieldValue(1).toString());
+    batch = reader.getSchema().createRowBatch(1000);
+    LongColumnVector longs = (LongColumnVector) batch.cols[0];
+    BytesColumnVector strs = (BytesColumnVector) batch.cols[1];
+    for(int i=0; i < 50; ++i) {
+      rows.nextBatch(batch);
+      assertEquals("batch " + i, 1000, batch.size);
+      for(int j=0; j < 200; ++j) {
+        int intVal = rand.nextInt();
+        String strVal = Integer.toBinaryString(rand.nextInt());
+        for (int k = 0; k < 5; ++k) {
+          assertEquals(intVal, longs.vector[j * 5 + k]);
+          assertEquals(strVal, strs.toString(j * 5 + k));
+        }
       }
     }
-    assertEquals(false, rows.hasNext());
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
     rows.close();
   }
 
@@ -1761,7 +1796,15 @@ public class TestVectorOrcFile {
     assertEquals(COUNT, reader.getNumberOfRows());
     RecordReader rows = reader.rows();
     // get the row index
-    MetadataReader meta = ((RecordReaderImpl) rows).getMetadataReader();
+    DataReader meta = RecordReaderUtils.createDefaultDataReader(
+        DataReaderProperties.builder()
+            .withBufferSize(reader.getCompressionSize())
+            .withFileSystem(fs)
+            .withPath(testFilePath)
+            .withCompression(reader.getCompressionKind())
+            .withTypeCount(reader.getSchema().getMaximumId() + 1)
+            .withZeroCopy(false)
+            .build());
     OrcIndex index =
         meta.readRowIndex(reader.getStripes().get(0), null, null, null, null,
             null);
@@ -1772,34 +1815,18 @@ public class TestVectorOrcFile {
       assertEquals(1000,
           colIndex.getEntry(0).getStatistics().getNumberOfValues());
     }
-    OrcStruct row = null;
-    for(int i=COUNT-1; i >= 0; --i) {
-      rows.seekToRow(i);
-      row = (OrcStruct) rows.next(row);
-      BigRow expected = createRandomRow(intValues, doubleValues,
-          stringValues, byteValues, words, i);
-      assertEquals(expected.boolean1.booleanValue(),
-          ((BooleanWritable) row.getFieldValue(0)).get());
-      assertEquals(expected.byte1.byteValue(),
-          ((ByteWritable) row.getFieldValue(1)).get());
-      assertEquals(expected.short1.shortValue(),
-          ((ShortWritable) row.getFieldValue(2)).get());
-      assertEquals(expected.int1.intValue(),
-          ((IntWritable) row.getFieldValue(3)).get());
-      assertEquals(expected.long1.longValue(),
-          ((LongWritable) row.getFieldValue(4)).get());
-      assertEquals(expected.float1,
-          ((FloatWritable) row.getFieldValue(5)).get(), 0.0001);
-      assertEquals(expected.double1,
-          ((DoubleWritable) row.getFieldValue(6)).get(), 0.0001);
-      assertEquals(expected.bytes1, row.getFieldValue(7));
-      assertEquals(expected.string1, row.getFieldValue(8));
-      List<InnerStruct> expectedList = expected.middle.list;
-      List<OrcStruct> actualList =
-          (List<OrcStruct>) ((OrcStruct) row.getFieldValue(9)).getFieldValue(0);
-      compareList(expectedList, actualList, "middle list " + i);
-      compareList(expected.list, (List<OrcStruct>) row.getFieldValue(10),
-          "list " + i);
+    batch = reader.getSchema().createRowBatch();
+    int nextRowInBatch = -1;
+    for(int i=COUNT-1; i >= 0; --i, --nextRowInBatch) {
+      // if we have consumed the previous batch read a new one
+      if (nextRowInBatch < 0) {
+        long base = Math.max(i - 1023, 0);
+        rows.seekToRow(base);
+        assertEquals("row " + i, true, rows.nextBatch(batch));
+        nextRowInBatch = batch.size - 1;
+      }
+      checkRandomRow(batch, intValues, doubleValues,
+          stringValues, byteValues, words, i, nextRowInBatch);
     }
     rows.close();
     Iterator<StripeInformation> stripeIterator =
@@ -1825,41 +1852,20 @@ public class TestVectorOrcFile {
         .range(offsetOfStripe2, offsetOfStripe4 - offsetOfStripe2)
         .include(columns));
     rows.seekToRow(lastRowOfStripe2);
-    for(int i = 0; i < 2; ++i) {
-      row = (OrcStruct) rows.next(row);
-      BigRow expected = createRandomRow(intValues, doubleValues,
-                                        stringValues, byteValues, words,
-                                        (int) (lastRowOfStripe2 + i));
-
-      assertEquals(expected.long1.longValue(),
-          ((LongWritable) row.getFieldValue(4)).get());
-      assertEquals(expected.string1, row.getFieldValue(8));
-    }
+    // we only want two rows
+    batch = reader.getSchema().createRowBatch(2);
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1, batch.size);
+    assertEquals(intValues[(int) lastRowOfStripe2], getLong(batch, 0));
+    assertEquals(stringValues[(int) lastRowOfStripe2],
+        getText(batch, 0).toString());
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(intValues[(int) lastRowOfStripe2 + 1], getLong(batch, 0));
+    assertEquals(stringValues[(int) lastRowOfStripe2 + 1],
+        getText(batch, 0).toString());
     rows.close();
   }
 
-  private void compareInner(InnerStruct expect,
-                            OrcStruct actual,
-                            String context) throws Exception {
-    if (expect == null || actual == null) {
-      assertEquals(context, null, expect);
-      assertEquals(context, null, actual);
-    } else {
-      assertEquals(context, expect.int1,
-          ((IntWritable) actual.getFieldValue(0)).get());
-      assertEquals(context, expect.string1, actual.getFieldValue(1));
-    }
-  }
-
-  private void compareList(List<InnerStruct> expect,
-                           List<OrcStruct> actual,
-                           String context) throws Exception {
-    assertEquals(context, expect.size(), actual.size());
-    for(int j=0; j < expect.size(); ++j) {
-      compareInner(expect.get(j), actual.get(j), context + " at " + j);
-    }
-  }
-
   private void appendRandomRow(VectorizedRowBatch batch,
                                long[] intValues, double[] doubleValues,
                                String[] stringValues,
@@ -1874,17 +1880,18 @@ public class TestVectorOrcFile {
         new MiddleStruct(inner, inner2), list(), map(inner, inner2));
   }
 
-  private BigRow createRandomRow(long[] intValues, double[] doubleValues,
-                                 String[] stringValues,
-                                 BytesWritable[] byteValues,
-                                 String[] words, int i) {
+  private void checkRandomRow(VectorizedRowBatch batch,
+                              long[] intValues, double[] doubleValues,
+                              String[] stringValues,
+                              BytesWritable[] byteValues,
+                              String[] words, int i, int rowInBatch) {
     InnerStruct inner = new InnerStruct((int) intValues[i], stringValues[i]);
     InnerStruct inner2 = new InnerStruct((int) (intValues[i] >> 32),
         words[i % words.length] + "-x");
-    return new BigRow((intValues[i] & 1) == 0, (byte) intValues[i],
+    checkBigRow(batch, rowInBatch, i, (intValues[i] & 1) == 0, (byte) intValues[i],
         (short) intValues[i], (int) intValues[i], intValues[i],
-        (float) doubleValues[i], doubleValues[i], byteValues[i],stringValues[i],
-        new MiddleStruct(inner, inner2), list(), map(inner,inner2));
+        (float) doubleValues[i], doubleValues[i], byteValues[i], stringValues[i],
+        new MiddleStruct(inner, inner2), list(), map(inner, inner2));
   }
 
   private static class MyMemoryManager extends MemoryManager {
@@ -2045,15 +2052,19 @@ public class TestVectorOrcFile {
         .range(0L, Long.MAX_VALUE)
         .include(new boolean[]{true, true, true})
         .searchArgument(sarg, new String[]{null, "int1", "string1"}));
+    batch = reader.getSchema().createRowBatch(2000);
+    LongColumnVector ints = (LongColumnVector) batch.cols[0];
+    BytesColumnVector strs = (BytesColumnVector) batch.cols[1];
+
     assertEquals(1000L, rows.getRowNumber());
-    OrcStruct row = null;
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1000, batch.size);
+
     for(int i=1000; i < 2000; ++i) {
-      assertTrue(rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      assertEquals(300 * i, ((IntWritable) row.getFieldValue(0)).get());
-      assertEquals(Integer.toHexString(10*i), row.getFieldValue(1).toString());
+      assertEquals(300 * i, ints.vector[i - 1000]);
+      assertEquals(Integer.toHexString(10*i), strs.toString(i - 1000));
     }
-    assertTrue(!rows.hasNext());
+    assertEquals(false, rows.nextBatch(batch));
     assertEquals(3500, rows.getRowNumber());
 
     // look through the file with no rows selected
@@ -2082,40 +2093,26 @@ public class TestVectorOrcFile {
         .range(0L, Long.MAX_VALUE)
         .include(new boolean[]{true, true, true})
         .searchArgument(sarg, new String[]{null, "int1", "string1"}));
-    row = null;
+    assertEquals(0, rows.getRowNumber());
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1000, batch.size);
+    assertEquals(3000, rows.getRowNumber());
     for(int i=0; i < 1000; ++i) {
-      assertTrue(rows.hasNext());
-      assertEquals(i, rows.getRowNumber());
-      row = (OrcStruct) rows.next(row);
-      assertEquals(300 * i, ((IntWritable) row.getFieldValue(0)).get());
-      assertEquals(Integer.toHexString(10*i), row.getFieldValue(1).toString());
+      assertEquals(300 * i, ints.vector[i]);
+      assertEquals(Integer.toHexString(10*i), strs.toString(i));
     }
+
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(500, batch.size);
+    assertEquals(3500, rows.getRowNumber());
     for(int i=3000; i < 3500; ++i) {
-      assertTrue(rows.hasNext());
-      assertEquals(i, rows.getRowNumber());
-      row = (OrcStruct) rows.next(row);
-      assertEquals(300 * i, ((IntWritable) row.getFieldValue(0)).get());
-      assertEquals(Integer.toHexString(10*i), row.getFieldValue(1).toString());
+      assertEquals(300 * i, ints.vector[i - 3000]);
+      assertEquals(Integer.toHexString(10*i), strs.toString(i - 3000));
     }
-    assertTrue(!rows.hasNext());
+    assertEquals(false, rows.nextBatch(batch));
     assertEquals(3500, rows.getRowNumber());
   }
 
-  private static String pad(String value, int length) {
-    if (value.length() == length) {
-      return value;
-    } else if (value.length() > length) {
-      return value.substring(0, length);
-    } else {
-      StringBuilder buf = new StringBuilder();
-      buf.append(value);
-      for(int i=0; i < length - value.length(); ++i) {
-        buf.append(' ');
-      }
-      return buf.toString();
-    }
-  }
-
   /**
    * Test all of the types that have distinct ORC writers using the vectorized
    * writer with different combinations of repeating and null values.
@@ -2232,8 +2229,7 @@ public class TestVectorOrcFile {
       ((LongColumnVector) batch.cols[6]).vector[r] =
           new DateWritable(new Date(111, 6, 1)).getDays() + r;
 
-      Timestamp ts = new Timestamp(115, 9, 23, 10, 11, 59, 999999999);
-      ts.setTime(ts.getTime() + r * 1000);
+      Timestamp ts = new Timestamp(115, 9, 25, 10, 11, 59 + r, 999999999);
       ((TimestampColumnVector) batch.cols[7]).set(r, ts);
       ((DecimalColumnVector) batch.cols[8]).vector[r] =
           new HiveDecimalWritable("1.234567");
@@ -2302,118 +2298,125 @@ public class TestVectorOrcFile {
     assertEquals(14813, ((StringColumnStatistics) stats[12]).getSum());
 
     RecordReader rows = reader.rows();
-    OrcStruct row = null;
+    batch = reader.getSchema().createRowBatch(1024);
+    BytesColumnVector bins = (BytesColumnVector) batch.cols[0];
+    LongColumnVector bools = (LongColumnVector) batch.cols[1];
+    LongColumnVector bytes = (LongColumnVector) batch.cols[2];
+    LongColumnVector longs = (LongColumnVector) batch.cols[3];
+    DoubleColumnVector floats = (DoubleColumnVector) batch.cols[4];
+    DoubleColumnVector doubles = (DoubleColumnVector) batch.cols[5];
+    LongColumnVector dates = (LongColumnVector) batch.cols[6];
+    TimestampColumnVector times = (TimestampColumnVector) batch.cols[7];
+    DecimalColumnVector decs = (DecimalColumnVector) batch.cols[8];
+    BytesColumnVector strs = (BytesColumnVector) batch.cols[9];
+    BytesColumnVector chars = (BytesColumnVector) batch.cols[10];
+    BytesColumnVector vcs = (BytesColumnVector) batch.cols[11];
+    StructColumnVector structs = (StructColumnVector) batch.cols[12];
+    UnionColumnVector unions = (UnionColumnVector) batch.cols[13];
+    ListColumnVector lists = (ListColumnVector) batch.cols[14];
+    MapColumnVector maps = (MapColumnVector) batch.cols[15];
+    LongColumnVector structInts = (LongColumnVector) structs.fields[0];
+    LongColumnVector unionInts = (LongColumnVector) unions.fields[1];
+    LongColumnVector listInts = (LongColumnVector) lists.child;
+    BytesColumnVector mapKeys = (BytesColumnVector) maps.keys;
+    BytesColumnVector mapValues = (BytesColumnVector) maps.values;
+
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
 
     // read the 1024 nulls
-    for(int r=0; r < 1024; ++r) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      for(int f=0; f < row.getNumFields(); ++f) {
-        assertEquals("non-null on row " + r + " field " + f,
-            null, row.getFieldValue(f));
-      }
+    for(int f=0; f < batch.cols.length; ++f) {
+      assertEquals("field " + f,
+          true, batch.cols[f].isRepeating);
+      assertEquals("field " + f,
+          false, batch.cols[f].noNulls);
+      assertEquals("field " + f,
+          true, batch.cols[f].isNull[0]);
     }
 
     // read the 1024 repeat values
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
     for(int r=0; r < 1024; ++r) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      assertEquals("row " + r, "48 6f 72 74 6f 6e",
-          row.getFieldValue(0).toString());
-      assertEquals("row " + r, "true", row.getFieldValue(1).toString());
-      assertEquals("row " + r, "-126", row.getFieldValue(2).toString());
-      assertEquals("row " + r, "1311768467463790320",
-          row.getFieldValue(3).toString());
-      assertEquals("row " + r, "1.125", row.getFieldValue(4).toString());
-      assertEquals("row " + r, "9.765625E-4", row.getFieldValue(5).toString());
-      assertEquals("row " + r, "2011-07-01", row.getFieldValue(6).toString());
+      assertEquals("row " + r, "Horton", bins.toString(r));
+      assertEquals("row " + r, 1, bools.vector[r]);
+      assertEquals("row " + r, -126, bytes.vector[r]);
+      assertEquals("row " + r, 1311768467463790320L, longs.vector[r]);
+      assertEquals("row " + r, 1.125, floats.vector[r], 0.00001);
+      assertEquals("row " + r, 9.765625E-4, doubles.vector[r], 0.000001);
+      assertEquals("row " + r, "2011-07-01",
+          new DateWritable((int) dates.vector[r]).toString());
       assertEquals("row " + r, "2015-10-23 10:11:59.999999999",
-          row.getFieldValue(7).toString());
-      assertEquals("row " + r, "1.234567", row.getFieldValue(8).toString());
-      assertEquals("row " + r, "Echelon", row.getFieldValue(9).toString());
-      assertEquals("row " + r, "Juggernaut", row.getFieldValue(10).toString());
-      assertEquals("row " + r, "Dreadnaugh", row.getFieldValue(11).toString());
-      assertEquals("row " + r, "{123}", row.getFieldValue(12).toString());
-      assertEquals("row " + r, "union(1, 1234)",
-          row.getFieldValue(13).toString());
-      assertEquals("row " + r, "[31415, 31415, 31415]",
-          row.getFieldValue(14).toString());
-      assertEquals("row " + r, "{ORC=fast, Hive=fast, LLAP=fast}",
-          row.getFieldValue(15).toString());
+          times.asScratchTimestamp(r).toString());
+      assertEquals("row " + r, "1.234567", decs.vector[r].toString());
+      assertEquals("row " + r, "Echelon", strs.toString(r));
+      assertEquals("row " + r, "Juggernaut", chars.toString(r));
+      assertEquals("row " + r, "Dreadnaugh", vcs.toString(r));
+      assertEquals("row " + r, 123, structInts.vector[r]);
+      assertEquals("row " + r, 1, unions.tags[r]);
+      assertEquals("row " + r, 1234, unionInts.vector[r]);
+      assertEquals("row " + r, 3, lists.lengths[r]);
+      assertEquals("row " + r, true, listInts.isRepeating);
+      assertEquals("row " + r, 31415, listInts.vector[0]);
+      assertEquals("row " + r, 3, maps.lengths[r]);
+      assertEquals("row " + r, "ORC", mapKeys.toString((int) maps.offsets[r]));
+      assertEquals("row " + r, "Hive", mapKeys.toString((int) maps.offsets[r] + 1));
+      assertEquals("row " + r, "LLAP", mapKeys.toString((int) maps.offsets[r] + 2));
+      assertEquals("row " + r, "fast", mapValues.toString((int) maps.offsets[r]));
+      assertEquals("row " + r, "fast", mapValues.toString((int) maps.offsets[r] + 1));
+      assertEquals("row " + r, "fast", mapValues.toString((int) maps.offsets[r] + 2));
     }
 
     // read the second set of 1024 nulls
-    for(int r=0; r < 1024; ++r) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      for(int f=0; f < row.getNumFields(); ++f) {
-        assertEquals("non-null on row " + r + " field " + f,
-            null, row.getFieldValue(f));
-      }
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
+    for(int f=0; f < batch.cols.length; ++f) {
+      assertEquals("field " + f,
+          true, batch.cols[f].isRepeating);
+      assertEquals("field " + f,
+          false, batch.cols[f].noNulls);
+      assertEquals("field " + f,
+          true, batch.cols[f].isNull[0]);
     }
+
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
     for(int r=0; r < 1024; ++r) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      byte[] hex = Integer.toHexString(r).getBytes();
-      StringBuilder expected = new StringBuilder();
-      for(int i=0; i < hex.length; ++i) {
-        if (i != 0) {
-          expected.append(' ');
-        }
-        expected.append(Integer.toHexString(hex[i]));
-      }
-      assertEquals("row " + r, expected.toString(),
-          row.getFieldValue(0).toString());
-      assertEquals("row " + r, r % 2 == 1 ? "true" : "false",
-          row.getFieldValue(1).toString());
-      assertEquals("row " + r, Integer.toString((byte) (r % 255)),
-          row.getFieldValue(2).toString());
-      assertEquals("row " + r, Long.toString(31415L * r),
-          row.getFieldValue(3).toString());
-      assertEquals("row " + r, Float.toString(1.125F * r),
-          row.getFieldValue(4).toString());
-      assertEquals("row " + r, Double.toString(0.0009765625 * r),
-          row.getFieldValue(5).toString());
-      assertEquals("row " + r, new Date(111, 6, 1 + r).toString(),
-          row.getFieldValue(6).toString());
-      Timestamp ts = new Timestamp(115, 9, 23, 10, 11, 59, 999999999);
-      ts.setTime(ts.getTime() + r * 1000);
+      String hex = Integer.toHexString(r);
+
+      assertEquals("row " + r, hex, bins.toString(r));
+      assertEquals("row " + r, r % 2 == 1 ? 1 : 0, bools.vector[r]);
+      assertEquals("row " + r, (byte) (r % 255), bytes.vector[r]);
+      assertEquals("row " + r, 31415L * r, longs.vector[r]);
+      assertEquals("row " + r, 1.125F * r, floats.vector[r], 0.0001);
+      assertEquals("row " + r, 0.0009765625 * r, doubles.vector[r], 0.000001);
+      assertEquals("row " + r, new DateWritable(new Date(111, 6, 1 + r)),
+          new DateWritable((int) dates.vector[r]));
       assertEquals("row " + r,
-          ts.toString(),
-          row.getFieldValue(7).toString());
-      assertEquals("row " + r, "1.234567", row.getFieldValue(8).toString());
-      assertEquals("row " + r, Integer.toString(r),
-          row.getFieldValue(9).toString());
-      assertEquals("row " + r, pad(Integer.toHexString(r), 10),
-          row.getFieldValue(10).toString());
-      assertEquals("row " + r, Integer.toHexString(r * 128),
-          row.getFieldValue(11).toString());
-      assertEquals("row " + r, "{" + Integer.toString(r + 13) + "}",
-          row.getFieldValue(12).toString());
-      assertEquals("row " + r, "union(1, " + Integer.toString(r + 42) + ")",
-          row.getFieldValue(13).toString());
-      assertEquals("row " + r, "[31415, 31416, 31417]",
-          row.getFieldValue(14).toString());
-      expected = new StringBuilder();
-      expected.append('{');
-      expected.append(Integer.toHexString(3 * r));
-      expected.append('=');
-      expected.append(3 * r);
-      expected.append(", ");
-      expected.append(Integer.toHexString(3 * r + 1));
-      expected.append('=');
-      expected.append(3 * r + 1);
-      expected.append(", ");
-      expected.append(Integer.toHexString(3 * r + 2));
-      expected.append('=');
-      expected.append(3 * r + 2);
-      expected.append('}');
-      assertEquals("row " + r, expected.toString(),
-          row.getFieldValue(15).toString());
+          new Timestamp(115, 9, 25, 10, 11, 59 + r, 999999999),
+          times.asScratchTimestamp(r));
+      assertEquals("row " + r, "1.234567", decs.vector[r].toString());
+      assertEquals("row " + r, Integer.toString(r), strs.toString(r));
+      assertEquals("row " + r, Integer.toHexString(r), chars.toString(r));
+      assertEquals("row " + r, Integer.toHexString(r * 128), vcs.toString(r));
+      assertEquals("row " + r, r + 13, structInts.vector[r]);
+      assertEquals("row " + r, 1, unions.tags[r]);
+      assertEquals("row " + r, r + 42, unionInts.vector[r]);
+      assertEquals("row " + r, 3, lists.lengths[r]);
+      assertEquals("row " + r, 31415, listInts.vector[(int) lists.offsets[r]]);
+      assertEquals("row " + r, 31416, listInts.vector[(int) lists.offsets[r] + 1]);
+      assertEquals("row " + r, 31417, listInts.vector[(int) lists.offsets[r] + 2]);
+      assertEquals("row " + r, 3, maps.lengths[3]);
+      assertEquals("row " + r, Integer.toHexString(3 * r), mapKeys.toString((int) maps.offsets[r]));
+      assertEquals("row " + r, Integer.toString(3 * r), mapValues.toString((int) maps.offsets[r]));
+      assertEquals("row " + r, Integer.toHexString(3 * r + 1), mapKeys.toString((int) maps.offsets[r] + 1));
+      assertEquals("row " + r, Integer.toString(3 * r + 1), mapValues.toString((int) maps.offsets[r] + 1));
+      assertEquals("row " + r, Integer.toHexString(3 * r + 2), mapKeys.toString((int) maps.offsets[r] + 2));
+      assertEquals("row " + r, Integer.toString(3 * r + 2), mapValues.toString((int) maps.offsets[r] + 2));
     }
 
     // should have no more rows
-    assertEquals(false, rows.hasNext());
+    assertEquals(false, rows.nextBatch(batch));
   }
 
   private static String makeString(BytesColumnVector vector, int row) {
@@ -2455,7 +2458,8 @@ public class TestVectorOrcFile {
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf));
     RecordReader rows = reader.rows();
-    batch = rows.nextBatch(null);
+    batch = reader.getSchema().createRowBatch();
+    assertEquals(true, rows.nextBatch(batch));
     assertEquals(4, batch.size);
     // ORC currently trims the output strings. See HIVE-12286
     assertEquals("",
@@ -2504,19 +2508,20 @@ public class TestVectorOrcFile {
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf));
     RecordReader rows = reader.rows();
-    batch = rows.nextBatch(null);
+    batch = reader.getSchema().createRowBatch();
+    assertEquals(true, rows.nextBatch(batch));
     assertEquals(1024, batch.size);
     for(int r=0; r < 1024; ++r) {
       assertEquals(Integer.toString(r * 10001),
           makeString((BytesColumnVector) batch.cols[0], r));
     }
-    batch = rows.nextBatch(batch);
+    assertEquals(true, rows.nextBatch(batch));
     assertEquals(1024, batch.size);
     for(int r=0; r < 1024; ++r) {
       assertEquals("Halloween",
           makeString((BytesColumnVector) batch.cols[0], r));
     }
-    assertEquals(false, rows.hasNext());
+    assertEquals(false, rows.nextBatch(batch));
   }
 
   @Test
@@ -2541,18 +2546,21 @@ public class TestVectorOrcFile {
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf));
     RecordReader rows = reader.rows();
-    OrcStruct row = null;
+    batch = reader.getSchema().createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals(1024, batch.size);
+    StructColumnVector inner = (StructColumnVector) batch.cols[0];
+    LongColumnVector vec = (LongColumnVector) inner.fields[0];
     for(int r=0; r < 1024; ++r) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      OrcStruct inner = (OrcStruct) row.getFieldValue(0);
       if (r < 200 || (r >= 400 && r < 600) || r >= 800) {
-        assertEquals("row " + r, null, inner);
+        assertEquals("row " + r, true, inner.isNull[r]);
       } else {
-        assertEquals("row " + r, "{" + r + "}", inner.toString());
+        assertEquals("row " + r, false, inner.isNull[r]);
+        assertEquals("row " + r, r, vec.vector[r]);
       }
     }
-    assertEquals(false, rows.hasNext());
+    rows.nextBatch(batch);
+    assertEquals(0, batch.size);
   }
 
   /**
@@ -2595,28 +2603,38 @@ public class TestVectorOrcFile {
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf));
     RecordReader rows = reader.rows();
-    OrcStruct row = null;
+    batch = reader.getSchema().createRowBatch(1024);
+    UnionColumnVector union = (UnionColumnVector) batch.cols[0];
+    LongColumnVector ints = (LongColumnVector) union.fields[0];
+    LongColumnVector longs = (LongColumnVector) union.fields[1];
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
     for(int r=0; r < 1024; ++r) {
-      assertEquals(true, rows.hasNext());
-      row = (OrcStruct) rows.next(row);
-      OrcUnion inner = (OrcUnion) row.getFieldValue(0);
       if (r < 200) {
-        assertEquals("row " + r, null, inner);
+        assertEquals("row " + r, true, union.isNull[r]);
       } else if (r < 300) {
-        assertEquals("row " + r, "union(0, " + r +")", inner.toString());
+        assertEquals("row " + r, false, union.isNull[r]);
+        assertEquals("row " + r, 0, uni

<TRUNCATED>