You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/04/26 17:18:00 UTC

[4/5] hive git commit: HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 9cfcc0e..2199b11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -27,12 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closer;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.MetadataReaderFactory;
-import org.apache.orc.OrcUtils;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -42,12 +38,9 @@ import org.apache.orc.DateColumnStatistics;
 import org.apache.orc.DecimalColumnStatistics;
 import org.apache.orc.DoubleColumnStatistics;
 import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.DefaultMetadataReaderFactory;
 import org.apache.orc.impl.InStream;
 import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.OrcConf;
-import org.apache.orc.impl.MetadataReaderProperties;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.PositionProvider;
 import org.apache.orc.impl.StreamName;
@@ -56,14 +49,11 @@ import org.apache.orc.StripeInformation;
 import org.apache.orc.TimestampColumnStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -103,8 +93,6 @@ public class RecordReaderImpl implements RecordReader {
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
-  private final Configuration conf;
-  private final MetadataReader metadata;
   private final DataReader dataReader;
 
   /**
@@ -146,130 +134,36 @@ public class RecordReaderImpl implements RecordReader {
     return result;
   }
 
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder {
-    private Reader.Options options;
-    private CompressionCodec codec;
-    private List<OrcProto.Type> types;
-    private List<StripeInformation> stripes;
-    private int bufferSize;
-    private FileSystem fileSystem;
-    private Path path;
-    private Configuration conf;
-    private long strideRate;
-    private MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory();
-    private DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory();
-
-    private Builder() {
-
-    }
-
-    public Builder withOptions(Reader.Options options) {
-      this.options = options;
-      return this;
-    }
-
-    public Builder withCodec(CompressionCodec codec) {
-      this.codec = codec;
-      return this;
-    }
-
-    public Builder withTypes(List<OrcProto.Type> types) {
-      this.types = types;
-      return this;
-    }
-
-    public Builder withStripes(List<StripeInformation> stripes) {
-      this.stripes = stripes;
-      return this;
-    }
-
-    public Builder withBufferSize(int bufferSize) {
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    public Builder withFileSystem(FileSystem fileSystem) {
-      this.fileSystem = fileSystem;
-      return this;
-    }
-
-    public Builder withPath(Path path) {
-      this.path = path;
-      return this;
-    }
-
-    public Builder withConf(Configuration conf) {
-      this.conf = conf;
-      return this;
-    }
-
-    public Builder withStrideRate(long strideRate) {
-      this.strideRate = strideRate;
-      return this;
-    }
-
-    public Builder withMetadataReaderFactory(MetadataReaderFactory metadataReaderFactory) {
-      this.metadataReaderFactory = metadataReaderFactory;
-      return this;
-    }
-
-    public Builder withDataReaderFactory(DataReaderFactory dataReaderFactory) {
-      this.dataReaderFactory = dataReaderFactory;
-      return this;
-    }
-
-    public RecordReaderImpl build() throws IOException {
-      Preconditions.checkNotNull(metadataReaderFactory);
-      Preconditions.checkNotNull(dataReaderFactory);
-      Preconditions.checkNotNull(options);
-      Preconditions.checkNotNull(types);
-      Preconditions.checkNotNull(stripes);
-      Preconditions.checkNotNull(fileSystem);
-      Preconditions.checkNotNull(path);
-      Preconditions.checkNotNull(conf);
-
-      return new RecordReaderImpl(this);
-    }
-  }
-
-  private RecordReaderImpl(Builder builder) throws IOException {
-    Reader.Options options = builder.options;
-    this.types = builder.types;
-    TreeReaderFactory.TreeReaderSchema treeReaderSchema;
+  protected RecordReaderImpl(ReaderImpl fileReader,
+                             Reader.Options options) throws IOException {
+    SchemaEvolution treeReaderSchema;
+    this.included = options.getInclude();
+    included[0] = true;
     if (options.getSchema() == null) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("Schema on read not provided -- using file schema " + types.toString());
+        LOG.info("Schema on read not provided -- using file schema " +
+            fileReader.getSchema());
       }
-      treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types);
+      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
     } else {
 
       // Now that we are creating a record reader for a file, validate that the schema to read
       // is compatible with the file schema.
       //
-      List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
-      treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
-    }
-    this.path = builder.path;
-    this.codec = builder.codec;
-    this.bufferSize = builder.bufferSize;
-    this.included = options.getInclude();
-    this.conf = builder.conf;
-    this.rowIndexStride = builder.strideRate;
-    this.metadata = builder.metadataReaderFactory.create(MetadataReaderProperties.builder()
-        .withFileSystem(builder.fileSystem)
-        .withPath(path)
-        .withCodec(codec)
-        .withBufferSize(bufferSize)
-        .withTypeCount(types.size())
-        .build());
+      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
+          options.getSchema(),
+          included);
+    }
+    this.path = fileReader.path;
+    this.codec = fileReader.codec;
+    this.types = fileReader.types;
+    this.bufferSize = fileReader.bufferSize;
+    this.rowIndexStride = fileReader.rowIndexStride;
+    FileSystem fileSystem = fileReader.fileSystem;
     SearchArgument sarg = options.getSearchArgument();
-    if (sarg != null && builder.strideRate != 0) {
+    if (sarg != null && rowIndexStride != 0) {
       sargApp = new SargApplier(
-          sarg, options.getColumnNames(), builder.strideRate, types, included.length);
+          sarg, options.getColumnNames(), rowIndexStride, types, included.length);
     } else {
       sargApp = null;
     }
@@ -277,7 +171,7 @@ public class RecordReaderImpl implements RecordReader {
     long skippedRows = 0;
     long offset = options.getOffset();
     long maxOffset = options.getMaxOffset();
-    for(StripeInformation stripe: builder.stripes) {
+    for(StripeInformation stripe: fileReader.getStripes()) {
       long stripeStart = stripe.getOffset();
       if (offset > stripeStart) {
         skippedRows += stripe.getNumberOfRows();
@@ -289,25 +183,30 @@ public class RecordReaderImpl implements RecordReader {
 
     Boolean zeroCopy = options.getUseZeroCopy();
     if (zeroCopy == null) {
-      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf);
+      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+    }
+    if (options.getDataReader() == null) {
+      dataReader = RecordReaderUtils.createDefaultDataReader(
+          DataReaderProperties.builder()
+              .withBufferSize(bufferSize)
+              .withCompression(fileReader.compressionKind)
+              .withFileSystem(fileSystem)
+              .withPath(path)
+              .withTypeCount(types.size())
+              .withZeroCopy(zeroCopy)
+              .build());
+    } else {
+      dataReader = options.getDataReader();
     }
-    // TODO: we could change the ctor to pass this externally
-    this.dataReader = builder.dataReaderFactory.create(DataReaderProperties.builder()
-      .withFileSystem(builder.fileSystem)
-      .withCodec(codec)
-      .withPath(path)
-      .withZeroCopy(zeroCopy)
-      .build());
-    this.dataReader.open();
-
     firstRow = skippedRows;
     totalRowCount = rows;
     Boolean skipCorrupt = options.getSkipCorruptRecords();
     if (skipCorrupt == null) {
-      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
+      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
     }
 
-    reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
+    reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
+        treeReaderSchema, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
@@ -333,10 +232,10 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-    return metadata.readStripeFooter(stripe);
+    return dataReader.readStripeFooter(stripe);
   }
 
-  static enum Location {
+  enum Location {
     BEFORE, MIN, MIDDLE, MAX, AFTER
   }
 
@@ -895,7 +794,7 @@ public class RecordReaderImpl implements RecordReader {
     return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
   }
 
-  private void clearStreams() throws IOException {
+  private void clearStreams()  {
     // explicit close of all streams to de-ref ByteBuffers
     for (InStream is : streams.values()) {
       is.close();
@@ -1149,31 +1048,27 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   @Override
-  public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
     try {
-      final VectorizedRowBatch result;
       if (rowInStripe >= rowCountInStripe) {
         currentStripe += 1;
+        if (currentStripe >= stripes.size()) {
+          batch.size = 0;
+          return false;
+        }
         readStripe();
       }
 
-      final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
+      int batchSize = computeBatchSize(batch.getMaxSize());
 
       rowInStripe += batchSize;
-      if (previous == null) {
-        ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
-        result = new VectorizedRowBatch(cols.length);
-        result.cols = cols;
-      } else {
-        result = previous;
-        result.selectedInUse = false;
-        reader.setVectorColumnCount(result.getDataColumnCount());
-        reader.nextVector(result.cols, batchSize);
-      }
+      reader.setVectorColumnCount(batch.getDataColumnCount());
+      reader.nextBatch(batch, batchSize);
 
-      result.size = batchSize;
+      batch.size = (int) batchSize;
+      batch.selectedInUse = false;
       advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return result;
+      return batch.size  != 0;
     } catch (IOException e) {
       // Rethrow exception with file name in log message
       throw new IOException("Error reading file: " + path, e);
@@ -1216,16 +1111,8 @@ public class RecordReaderImpl implements RecordReader {
 
   @Override
   public void close() throws IOException {
-    Closer closer = Closer.create();
-    try {
-      closer.register(metadata);
-      closer.register(dataReader);
-      clearStreams();
-    } catch (IOException e) {
-      throw closer.rethrow(e);
-    } finally {
-      closer.close();
-    }
+    clearStreams();
+    dataReader.close();
   }
 
   @Override
@@ -1244,10 +1131,6 @@ public class RecordReaderImpl implements RecordReader {
     return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
   }
 
-  MetadataReader getMetadataReader() {
-    return metadata;
-  }
-
   private int findStripe(long rowNumber) {
     for (int i = 0; i < stripes.size(); i++) {
       StripeInformation stripe = stripes.get(i);
@@ -1276,8 +1159,8 @@ public class RecordReaderImpl implements RecordReader {
       sargColumns = sargColumns == null ?
           (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
     }
-    return metadata.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
-        bloomFilterIndex);
+    return dataReader.readRowIndex(stripe, stripeFooter, included, indexes,
+        sargColumns, bloomFilterIndex);
   }
 
   private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
index 177721d..4192588 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
@@ -44,6 +46,8 @@ import org.apache.orc.impl.DirectDecompressionCodec;
 import org.apache.orc.OrcProto;
 
 import com.google.common.collect.ComparisonChain;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.OutStream;
 
 /**
@@ -53,34 +57,130 @@ public class RecordReaderUtils {
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   private static class DefaultDataReader implements DataReader {
-    private FSDataInputStream file;
-    private ByteBufferAllocatorPool pool;
-    private ZeroCopyReaderShim zcr;
-    private FileSystem fs;
-    private Path path;
-    private boolean useZeroCopy;
-    private CompressionCodec codec;
+    private FSDataInputStream file = null;
+    private final ByteBufferAllocatorPool pool;
+    private ZeroCopyReaderShim zcr = null;
+    private final FileSystem fs;
+    private final Path path;
+    private final boolean useZeroCopy;
+    private final CompressionCodec codec;
+    private final int bufferSize;
+    private final int typeCount;
+
+    private DefaultDataReader(DefaultDataReader other) {
+      this.pool = other.pool;
+      this.zcr = other.zcr;
+      this.bufferSize = other.bufferSize;
+      this.typeCount = other.typeCount;
+      this.fs = other.fs;
+      this.path = other.path;
+      this.useZeroCopy = other.useZeroCopy;
+      this.codec = other.codec;
+    }
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
       this.useZeroCopy = properties.getZeroCopy();
-      this.codec = properties.getCodec();
+      this.codec = WriterImpl.createCodec(properties.getCompression());
+      this.bufferSize = properties.getBufferSize();
+      this.typeCount = properties.getTypeCount();
+      if (useZeroCopy) {
+        this.pool = new ByteBufferAllocatorPool();
+      } else {
+        this.pool = null;
+      }
     }
 
     @Override
     public void open() throws IOException {
       this.file = fs.open(path);
       if (useZeroCopy) {
-        pool = new ByteBufferAllocatorPool();
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
-        pool = null;
         zcr = null;
       }
     }
 
     @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      if (file == null) {
+        open();
+      }
+      if (footer == null) {
+        footer = readStripeFooter(stripe);
+      }
+      if (indexes == null) {
+        indexes = new OrcProto.RowIndex[typeCount];
+      }
+      if (bloomFilterIndices == null) {
+        bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+      }
+      long offset = stripe.getOffset();
+      List<OrcProto.Stream> streams = footer.getStreamsList();
+      for (int i = 0; i < streams.size(); i++) {
+        OrcProto.Stream stream = streams.get(i);
+        OrcProto.Stream nextStream = null;
+        if (i < streams.size() - 1) {
+          nextStream = streams.get(i+1);
+        }
+        int col = stream.getColumn();
+        int len = (int) stream.getLength();
+        // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+        // filter and combine the io to read row index and bloom filters for that column together
+        if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+          boolean readBloomFilter = false;
+          if (sargColumns != null && sargColumns[col] &&
+              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+            len += nextStream.getLength();
+            i += 1;
+            readBloomFilter = true;
+          }
+          if ((included == null || included[col]) && indexes[col] == null) {
+            byte[] buffer = new byte[len];
+            file.readFully(offset, buffer, 0, buffer.length);
+            ByteBuffer bb = ByteBuffer.wrap(buffer);
+            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+                Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+                codec, bufferSize));
+            if (readBloomFilter) {
+              bb.position((int) stream.getLength());
+              bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+                  "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+                  nextStream.getLength(), codec, bufferSize));
+            }
+          }
+        }
+        offset += len;
+      }
+
+      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+      return index;
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+      if (file == null) {
+        open();
+      }
+      long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+      int tailLength = (int) stripe.getFooterLength();
+
+      // read the footer
+      ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+      file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+      return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+          Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+          tailLength, codec, bufferSize));
+    }
+
+    @Override
     public DiskRangeList readFileData(
         DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
       return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
@@ -106,9 +206,14 @@ public class RecordReaderUtils {
       zcr.releaseBuffer(buffer);
     }
 
+    @Override
+    public DataReader clone() {
+      return new DefaultDataReader(this);
+    }
+
   }
 
-  static DataReader createDefaultDataReader(DataReaderProperties properties) {
+  public static DataReader createDefaultDataReader(DataReaderProperties properties) {
     return new DefaultDataReader(properties);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
index f28ca13..6747691 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
 import org.apache.orc.TypeDescription;
 
 /**
@@ -34,103 +33,134 @@ import org.apache.orc.TypeDescription;
  * has been schema evolution.
  */
 public class SchemaEvolution {
-
+  private final Map<TypeDescription, TypeDescription> readerToFile;
+  private final boolean[] included;
+  private final TypeDescription readerSchema;
   private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
 
-  public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
-      List<OrcProto.Type> schemaTypes) throws IOException {
+  public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
+    this.included = included;
+    readerToFile = null;
+    this.readerSchema = readerSchema;
+  }
 
-    // For ACID, the row is the ROW field in the outer STRUCT.
-    final boolean isAcid = checkAcidSchema(fileTypes);
-    final List<OrcProto.Type> rowSchema;
-    int rowSubtype;
-    if (isAcid) {
-      rowSubtype = OrcRecordUpdater.ROW + 1;
-      rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+  public SchemaEvolution(TypeDescription fileSchema,
+                         TypeDescription readerSchema,
+                         boolean[] included) throws IOException {
+    readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
+    this.included = included;
+    if (checkAcidSchema(fileSchema)) {
+      this.readerSchema = createEventSchema(readerSchema);
     } else {
-      rowSubtype = 0;
-      rowSchema = fileTypes;
+      this.readerSchema = readerSchema;
     }
+    buildMapping(fileSchema, this.readerSchema);
+  }
 
-    // Do checking on the overlap.  Additional columns will be defaulted to NULL.
-
-    int numFileColumns = rowSchema.get(0).getSubtypesCount();
-    int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
-
-    int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
-
-    /**
-     * Check type promotion.
-     *
-     * Currently, we only support integer type promotions that can be done "implicitly".
-     * That is, we know that using a bigger integer tree reader on the original smaller integer
-     * column will "just work".
-     *
-     * In the future, other type promotions might require type conversion.
-     */
-    // short -> int -> bigint as same integer readers are used for the above types.
-
-    for (int i = 0; i < numReadColumns; i++) {
-      OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
-      OrcProto.Type rColType = schemaTypes.get(i);
-      if (!fColType.getKind().equals(rColType.getKind())) {
-
-        boolean ok = false;
-        if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+  public TypeDescription getReaderSchema() {
+    return readerSchema;
+  }
 
-          if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
-              rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-            // type promotion possible, converting SHORT to INT/LONG requested type
-            ok = true;
-          }
-        } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+  public TypeDescription getFileType(TypeDescription readerType) {
+    TypeDescription result;
+    if (readerToFile == null) {
+      if (included == null || included[readerType.getId()]) {
+        result = readerType;
+      } else {
+        result = null;
+      }
+    } else {
+      result = readerToFile.get(readerType);
+    }
+    return result;
+  }
 
-          if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-            // type promotion possible, converting INT to LONG requested type
-            ok = true;
+  void buildMapping(TypeDescription fileType,
+                    TypeDescription readerType) throws IOException {
+    // if the column isn't included, don't map it
+    if (included != null && !included[readerType.getId()]) {
+      return;
+    }
+    boolean isOk = true;
+    // check the easy case first
+    if (fileType.getCategory() == readerType.getCategory()) {
+      switch (readerType.getCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case FLOAT:
+        case STRING:
+        case TIMESTAMP:
+        case BINARY:
+        case DATE:
+          // these are always a match
+          break;
+        case CHAR:
+        case VARCHAR:
+          isOk = fileType.getMaxLength() == readerType.getMaxLength();
+          break;
+        case DECIMAL:
+          // TODO we don't enforce scale and precision checks, but probably should
+          break;
+        case UNION:
+        case MAP:
+        case LIST: {
+          // these must be an exact match
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() == readerChildren.size()) {
+            for(int i=0; i < fileChildren.size(); ++i) {
+              buildMapping(fileChildren.get(i), readerChildren.get(i));
+            }
+          } else {
+            isOk = false;
           }
+          break;
         }
-
-        if (!ok) {
-          throw new IOException("ORC does not support type conversion from " +
-              fColType.getKind().name() + " to " + rColType.getKind().name());
+        case STRUCT: {
+          // allow either side to have fewer fields than the other
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
+          for(int i=0; i < jointSize; ++i) {
+            buildMapping(fileChildren.get(i), readerChildren.get(i));
+          }
+          break;
         }
+        default:
+          throw new IllegalArgumentException("Unknown type " + readerType);
       }
-    }
-
-    List<OrcProto.Type> fullSchemaTypes;
-
-    if (isAcid) {
-      fullSchemaTypes = new ArrayList<OrcProto.Type>();
-
-      // This copies the ACID struct type which is subtype = 0.
-      // It has field names "operation" through "row".
-      // And we copy the types for all fields EXCEPT ROW (which must be last!).
-
-      for (int i = 0; i < rowSubtype; i++) {
-        fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+    } else {
+      switch (fileType.getCategory()) {
+        case SHORT:
+          if (readerType.getCategory() != TypeDescription.Category.INT &&
+              readerType.getCategory() != TypeDescription.Category.LONG) {
+            isOk = false;
+          }
+          break;
+        case INT:
+          if (readerType.getCategory() != TypeDescription.Category.LONG) {
+            isOk = false;
+          }
+          break;
+        default:
+          isOk = false;
       }
-
-      // Add the row struct type.
-      OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+    }
+    if (isOk) {
+      readerToFile.put(readerType, fileType);
     } else {
-      fullSchemaTypes = schemaTypes;
+      throw new IOException("ORC does not support type conversion from " +
+          fileType + " to " + readerType);
     }
-
-    int innerStructSubtype = rowSubtype;
-
-    // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
-    //     " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
-
-    return new TreeReaderSchema().
-        fileTypes(fileTypes).
-        schemaTypes(fullSchemaTypes).
-        innerStructSubtype(innerStructSubtype);
   }
 
-  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
-    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
-      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+  private static boolean checkAcidSchema(TypeDescription type) {
+    if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      List<String> rootFields = type.getFieldNames();
       if (acidEventFieldNames.equals(rootFields)) {
         return true;
       }
@@ -142,26 +172,14 @@ public class SchemaEvolution {
    * @param typeDescr
    * @return ORC types for the ACID event based on the row's type description
    */
-  public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) {
-
-    List<OrcProto.Type> result = new ArrayList<OrcProto.Type>();
-
-    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    type.setKind(OrcProto.Type.Kind.STRUCT);
-    type.addAllFieldNames(acidEventFieldNames);
-    for (int i = 0; i < acidEventFieldNames.size(); i++) {
-      type.addSubtypes(i + 1);
-    }
-    result.add(type.build());
-
-    // Automatically add all fields except the last (ROW).
-    for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
-      type.clear();
-      type.setKind(acidEventOrcTypeKinds.get(i));
-      result.add(type.build());
-    }
-
-    OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+  public static TypeDescription createEventSchema(TypeDescription typeDescr) {
+    TypeDescription result = TypeDescription.createStruct()
+        .addField("operation", TypeDescription.createInt())
+        .addField("originalTransaction", TypeDescription.createLong())
+        .addField("bucket", TypeDescription.createInt())
+        .addField("rowId", TypeDescription.createLong())
+        .addField("currentTransaction", TypeDescription.createLong())
+        .addField("row", typeDescr.clone());
     return result;
   }
 
@@ -174,14 +192,4 @@ public class SchemaEvolution {
     acidEventFieldNames.add("currentTransaction");
     acidEventFieldNames.add("row");
   }
-  public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
-      new ArrayList<OrcProto.Type.Kind>();
-  static {
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
-    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 8bb32ea..8ee8cd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -35,9 +36,12 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -56,8 +60,7 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.BitFieldReader;
 import org.apache.orc.impl.DynamicByteArray;
 import org.apache.orc.impl.InStream;
@@ -75,60 +78,6 @@ import org.apache.orc.impl.StreamName;
  */
 public class TreeReaderFactory {
 
-  private static final Logger LOG =
-    LoggerFactory.getLogger(TreeReaderFactory.class);
-
-  public static class TreeReaderSchema {
-
-    /**
-     * The types in the ORC file.
-     */
-    List<OrcProto.Type> fileTypes;
-
-    /**
-     * The treeReaderSchema that the reader should read as.
-     */
-    List<OrcProto.Type> schemaTypes;
-
-    /**
-     * The subtype of the row STRUCT.  Different than 0 for ACID.
-     */
-    int innerStructSubtype;
-
-    public TreeReaderSchema() {
-      fileTypes = null;
-      schemaTypes = null;
-      innerStructSubtype = -1;
-    }
-
-    public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
-      this.fileTypes = fileTypes;
-      return this;
-    }
-
-    public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
-      this.schemaTypes = schemaTypes;
-      return this;
-    }
-
-    public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
-      this.innerStructSubtype = innerStructSubtype;
-      return this;
-    }
-
-    public List<OrcProto.Type> getFileTypes() {
-      return fileTypes;
-    }
-
-    public List<OrcProto.Type> getSchemaTypes() {
-      return schemaTypes;
-    }
-
-    public int getInnerStructSubtype() {
-      return innerStructSubtype;
-    }
-  }
-
   public abstract static class TreeReader {
     protected final int columnId;
     protected BitFieldReader present = null;
@@ -230,36 +179,60 @@ public class TreeReaderFactory {
     }
 
     /**
+     * Called at the top level to read into the given batch.
+     * @param batch the batch to read into
+     * @param batchSize the number of rows to read
+     * @throws IOException
+     */
+    public void nextBatch(VectorizedRowBatch batch,
+                          int batchSize) throws IOException {
+      batch.cols[0].reset();
+      batch.cols[0].ensureSize(batchSize, false);
+      nextVector(batch.cols[0], null, batchSize);
+    }
+
+    /**
      * Populates the isNull vector array in the previousVector object based on
      * the present stream values. This function is called from all the child
      * readers, and they all set the values based on isNull field value.
      *
-     * @param previousVector The columnVector object whose isNull value is populated
+     * @param previous The columnVector object whose isNull value is populated
+     * @param isNull Whether the each value was null at a higher level. If
+     *               isNull is null, all values are non-null.
      * @param batchSize      Size of the column vector
-     * @return next column vector
      * @throws IOException
      */
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      ColumnVector result = (ColumnVector) previousVector;
-      if (present != null) {
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      if (present != null || isNull != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
         // present stream
-        result.noNulls = true;
+        previous.noNulls = true;
+        boolean allNull = true;
         for (int i = 0; i < batchSize; i++) {
-          result.isNull[i] = (present.next() != 1);
-          if (result.noNulls && result.isNull[i]) {
-            result.noNulls = false;
+          if (isNull == null || !isNull[i]) {
+            if (present != null && present.next() != 1) {
+              previous.noNulls = false;
+              previous.isNull[i] = true;
+            } else {
+              previous.isNull[i] = false;
+              allNull = false;
+            }
+          } else {
+            previous.noNulls = false;
+            previous.isNull[i] = true;
           }
         }
+        previous.isRepeating = !previous.noNulls && allNull;
       } else {
-        // There is not present stream, this means that all the values are
+        // There is no present stream, this means that all the values are
         // present.
-        result.noNulls = true;
+        previous.noNulls = true;
         for (int i = 0; i < batchSize; i++) {
-          result.isNull[i] = false;
+          previous.isNull[i] = false;
         }
       }
-      return previousVector;
     }
 
     public BitFieldReader getPresent() {
@@ -267,6 +240,46 @@ public class TreeReaderFactory {
     }
   }
 
+  public static class NullTreeReader extends TreeReader {
+
+    public NullTreeReader(int columnId) throws IOException {
+      super(columnId);
+    }
+
+    @Override
+    public void startStripe(Map<StreamName, InStream> streams,
+                            OrcProto.StripeFooter footer) {
+      // PASS
+    }
+
+    @Override
+    void skipRows(long rows) {
+      // PASS
+    }
+
+    @Override
+    public void seek(PositionProvider position) {
+      // PASS
+    }
+
+    @Override
+    public void seek(PositionProvider[] position) {
+      // PASS
+    }
+
+    @Override
+    Object next(Object previous) {
+      return null;
+    }
+
+    @Override
+    public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
+      vector.noNulls = false;
+      vector.isNull[0] = true;
+      vector.isRepeating = true;
+    }
+  }
+
   public static class BooleanTreeReader extends TreeReader {
     protected BitFieldReader reader = null;
 
@@ -322,20 +335,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, batchSize);
-      return result;
     }
   }
 
@@ -387,20 +396,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -473,20 +478,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -559,20 +560,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -646,20 +643,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -719,16 +712,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final DoubleColumnVector result;
-      if (previousVector == null) {
-        result = new DoubleColumnVector();
-      } else {
-        result = (DoubleColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
@@ -768,7 +758,6 @@ public class TreeReaderFactory {
         }
         result.isRepeating = repeating;
       }
-      return result;
     }
 
     @Override
@@ -832,16 +821,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final DoubleColumnVector result;
-      if (previousVector == null) {
-        result = new DoubleColumnVector();
-      } else {
-        result = (DoubleColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
@@ -881,8 +867,6 @@ public class TreeReaderFactory {
         }
         result.isRepeating = repeating;
       }
-
-      return result;
     }
 
     @Override
@@ -974,19 +958,15 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final BytesColumnVector result;
-      if (previousVector == null) {
-        result = new BytesColumnVector();
-      } else {
-        result = (BytesColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
-      return result;
     }
 
     @Override
@@ -1011,7 +991,6 @@ public class TreeReaderFactory {
     private final TimeZone readerTimeZone;
     private TimeZone writerTimeZone;
     private boolean hasSameTZRules;
-    private TimestampWritable scratchTimestampWritable;
 
     TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
       this(columnId, null, null, null, null, skipCorrupt);
@@ -1115,9 +1094,9 @@ public class TreeReaderFactory {
         int newNanos = parseNanos(nanos.next());
         // fix the rounding when we divided by 1000.
         if (millis >= 0) {
-          millis += newNanos / 1000000;
+          millis += newNanos / WriterImpl.NANOS_PER_MILLI;
         } else {
-          millis -= newNanos / 1000000;
+          millis -= newNanos / WriterImpl.NANOS_PER_MILLI;
         }
         long offset = 0;
         // If reader and writer time zones have different rules, adjust the timezone difference
@@ -1144,31 +1123,45 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final TimestampColumnVector result;
-      if (previousVector == null) {
-        result = new TimestampColumnVector();
-      } else {
-        result = (TimestampColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      TimestampColumnVector result = (TimestampColumnVector) previousVector;
+      super.nextVector(previousVector, isNull, batchSize);
 
-      result.reset();
-      if (scratchTimestampWritable == null) {
-        scratchTimestampWritable = new TimestampWritable();
-      }
-      Object obj;
       for (int i = 0; i < batchSize; i++) {
-        obj = next(scratchTimestampWritable);
-        if (obj == null) {
-          result.noNulls = false;
-          result.isNull[i] = true;
-        } else {
-          TimestampWritable writable = (TimestampWritable) obj;
-          result.set(i, writable.getTimestamp());
+        if (result.noNulls || !result.isNull[i]) {
+          long millis = data.next() + base_timestamp;
+          int newNanos = parseNanos(nanos.next());
+          if (millis < 0 && newNanos != 0) {
+            millis -= 1;
+          }
+          millis *= WriterImpl.MILLIS_PER_SECOND;
+          long offset = 0;
+          // If reader and writer time zones have different rules, adjust the timezone difference
+          // between reader and writer taking day light savings into account.
+          if (!hasSameTZRules) {
+            offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
+          }
+          long adjustedMillis = millis + offset;
+          // Sometimes the reader timezone might have changed after adding the adjustedMillis.
+          // To account for that change, check for any difference in reader timezone after
+          // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
+          if (!hasSameTZRules &&
+              (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
+            long newOffset =
+                writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
+            adjustedMillis = millis + newOffset;
+          }
+          result.time[i] = adjustedMillis;
+          result.nanos[i] = newNanos;
+          if (result.isRepeating && i != 0 &&
+              (result.time[0] != result.time[i] ||
+                  result.nanos[0] != result.nanos[i])) {
+            result.isRepeating = false;
+          }
         }
       }
-
-      return result;
     }
 
     private static int parseNanos(long serialized) {
@@ -1253,20 +1246,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final LongColumnVector result;
-      if (previousVector == null) {
-        result = new LongColumnVector();
-      } else {
-        result = (LongColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
-      return result;
+      reader.nextVector(result, result.vector, batchSize);
     }
 
     @Override
@@ -1278,7 +1267,7 @@ public class TreeReaderFactory {
   public static class DecimalTreeReader extends TreeReader {
     protected InStream valueStream;
     protected IntegerReader scaleReader = null;
-    private LongColumnVector scratchScaleVector;
+    private int[] scratchScaleVector;
 
     private final int precision;
     private final int scale;
@@ -1293,7 +1282,7 @@ public class TreeReaderFactory {
       super(columnId, present);
       this.precision = precision;
       this.scale = scale;
-      this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+      this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
       this.valueStream = valueStream;
       if (scaleStream != null && encoding != null) {
         checkEncoding(encoding);
@@ -1352,46 +1341,34 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final DecimalColumnVector result;
-      if (previousVector == null) {
-        result = new DecimalColumnVector(precision, scale);
-      } else {
-        result = (DecimalColumnVector) previousVector;
-      }
-
-      // Save the reference for isNull in the scratch vector
-      boolean[] scratchIsNull = scratchScaleVector.isNull;
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final DecimalColumnVector result = (DecimalColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
       // Read value entries based on isNull entries
-      if (result.isRepeating) {
-        if (!result.isNull[0]) {
+      if (result.noNulls) {
+        for (int r=0; r < batchSize; ++r) {
           BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-          short scaleInData = (short) scaleReader.next();
-          HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
-          dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale);
-          result.set(0, dec);
+          HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+          result.set(r, dec);
         }
-      } else {
-        // result vector has isNull values set, use the same to read scale vector.
-        scratchScaleVector.isNull = result.isNull;
-        scaleReader.nextVector(scratchScaleVector, batchSize);
-        for (int i = 0; i < batchSize; i++) {
-          if (!result.isNull[i]) {
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        for (int r=0; r < batchSize; ++r) {
+          if (!result.isNull[r]) {
             BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-            short scaleInData = (short) scratchScaleVector.vector[i];
-            HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
-            dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale);
-            result.set(i, dec);
+            HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+            result.set(r, dec);
           }
         }
       }
-      // Switch back the null vector.
-      scratchScaleVector.isNull = scratchIsNull;
-      return result;
     }
 
     @Override
@@ -1481,8 +1458,10 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      return reader.nextVector(previousVector, batchSize);
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      reader.nextVector(previousVector, isNull, batchSize);
     }
 
     @Override
@@ -1501,7 +1480,7 @@ public class TreeReaderFactory {
         BytesColumnVector result, final int batchSize) throws IOException {
       // Read lengths
       scratchlcv.isNull = result.isNull;  // Notice we are replacing the isNull vector here...
-      lengths.nextVector(scratchlcv, batchSize);
+      lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
       int totalLength = 0;
       if (!scratchlcv.isRepeating) {
         for (int i = 0; i < batchSize; i++) {
@@ -1532,31 +1511,35 @@ public class TreeReaderFactory {
     }
 
     // This method has the common code for reading in bytes into a BytesColumnVector.
-    public static void readOrcByteArrays(InStream stream, IntegerReader lengths,
-        LongColumnVector scratchlcv,
-        BytesColumnVector result, final int batchSize) throws IOException {
-
-      byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize);
-
-      // Too expensive to figure out 'repeating' by comparisons.
-      result.isRepeating = false;
-      int offset = 0;
-      if (!scratchlcv.isRepeating) {
-        for (int i = 0; i < batchSize; i++) {
-          if (!scratchlcv.isNull[i]) {
-            result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
-            offset += scratchlcv.vector[i];
-          } else {
-            result.setRef(i, allBytes, 0, 0);
+    public static void readOrcByteArrays(InStream stream,
+                                         IntegerReader lengths,
+                                         LongColumnVector scratchlcv,
+                                         BytesColumnVector result,
+                                         int batchSize) throws IOException {
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
+            result, (int) batchSize);
+
+        // Too expensive to figure out 'repeating' by comparisons.
+        result.isRepeating = false;
+        int offset = 0;
+        if (!scratchlcv.isRepeating) {
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
+              offset += scratchlcv.vector[i];
+            } else {
+              result.setRef(i, allBytes, 0, 0);
+            }
           }
-        }
-      } else {
-        for (int i = 0; i < batchSize; i++) {
-          if (!scratchlcv.isNull[i]) {
-            result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
-            offset += scratchlcv.vector[0];
-          } else {
-            result.setRef(i, allBytes, 0, 0);
+        } else {
+          for (int i = 0; i < batchSize; i++) {
+            if (!scratchlcv.isNull[i]) {
+              result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
+              offset += scratchlcv.vector[0];
+            } else {
+              result.setRef(i, allBytes, 0, 0);
+            }
           }
         }
       }
@@ -1641,19 +1624,16 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final BytesColumnVector result;
-      if (previousVector == null) {
-        result = new BytesColumnVector();
-      } else {
-        result = (BytesColumnVector) previousVector;
-      }
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
-      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
-      return result;
+      BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
+          result, batchSize);
     }
 
     @Override
@@ -1816,18 +1796,15 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final BytesColumnVector result;
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      final BytesColumnVector result = (BytesColumnVector) previousVector;
       int offset;
       int length;
-      if (previousVector == null) {
-        result = new BytesColumnVector();
-      } else {
-        result = (BytesColumnVector) previousVector;
-      }
 
       // Read present/isNull stream
-      super.nextVector(result, batchSize);
+      super.nextVector(result, isNull, batchSize);
 
       if (dictionaryBuffer != null) {
 
@@ -1838,7 +1815,8 @@ public class TreeReaderFactory {
 
         // Read string offsets
         scratchlcv.isNull = result.isNull;
-        reader.nextVector(scratchlcv, batchSize);
+        scratchlcv.ensureSize((int) batchSize, false);
+        reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
         if (!scratchlcv.isRepeating) {
 
           // The vector has non-repeating strings. Iterate thru the batch
@@ -1878,7 +1856,6 @@ public class TreeReaderFactory {
           }
         }
       }
-      return result;
     }
 
     int getDictionaryEntryLength(int entry, int offset) {
@@ -1936,11 +1913,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
-      BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
-
+      super.nextVector(previousVector, isNull, batchSize);
+      BytesColumnVector result = (BytesColumnVector) previousVector;
       int adjustedDownLen;
       if (result.isRepeating) {
         if (result.noNulls || !result.isNull[0]) {
@@ -1973,7 +1952,6 @@ public class TreeReaderFactory {
           }
         }
       }
-      return result;
     }
   }
 
@@ -2010,10 +1988,13 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
-      BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
+      super.nextVector(previousVector, isNull, batchSize);
+      BytesColumnVector result = (BytesColumnVector) previousVector;
 
       int adjustedDownLen;
       if (result.isRepeating) {
@@ -2045,62 +2026,26 @@ public class TreeReaderFactory {
           }
         }
       }
-      return result;
     }
   }
 
   protected static class StructTreeReader extends TreeReader {
-    private final int readColumnCount;
-    private final int resultColumnCount;
     protected final TreeReader[] fields;
-    private final String[] fieldNames;
 
-    protected StructTreeReader(
-        int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
+    protected StructTreeReader(int columnId,
+                               TypeDescription readerSchema,
+                               SchemaEvolution treeReaderSchema,
+                               boolean[] included,
+                               boolean skipCorrupt) throws IOException {
       super(columnId);
 
-      OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
-
-      OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+      TypeDescription fileSchema = treeReaderSchema.getFileType(readerSchema);
 
-      readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount());
-
-      if (columnId == treeReaderSchema.getInnerStructSubtype()) {
-        // If there are more result columns than reader columns, we will default those additional
-        // columns to NULL.
-        resultColumnCount = schemaStructType.getFieldNamesCount();
-      } else {
-        resultColumnCount = readColumnCount;
-      }
-
-      this.fields = new TreeReader[readColumnCount];
-      this.fieldNames = new String[readColumnCount];
-
-      if (included == null) {
-        for (int i = 0; i < readColumnCount; ++i) {
-          int subtype = schemaStructType.getSubtypes(i);
-          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
-          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
-          this.fieldNames[i] = schemaStructType.getFieldNames(i);
-        }
-      } else {
-        for (int i = 0; i < readColumnCount; ++i) {
-          int subtype = schemaStructType.getSubtypes(i);
-          if (subtype >= included.length) {
-            throw new IOException("subtype " + subtype + " exceeds the included array size " +
-                included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
-                " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
-                " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
-          }
-          if (included[subtype]) {
-            this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
-          }
-          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
-          this.fieldNames[i] = schemaStructType.getFieldNames(i);
-        }
+      List<TypeDescription> childrenTypes = readerSchema.getChildren();
+      this.fields = new TreeReader[childrenTypes.size()];
+      for (int i = 0; i < fields.length; ++i) {
+        TypeDescription subtype = childrenTypes.get(i);
+        this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
       }
     }
 
@@ -2120,65 +2065,52 @@ public class TreeReaderFactory {
       OrcStruct result = null;
       if (valuePresent) {
         if (previous == null) {
-          result = new OrcStruct(resultColumnCount);
+          result = new OrcStruct(fields.length);
         } else {
           result = (OrcStruct) previous;
 
           // If the input format was initialized with a file with a
           // different number of fields, the number of fields needs to
           // be updated to the correct number
-          if (result.getNumFields() != resultColumnCount) {
-            result.setNumFields(resultColumnCount);
-          }
+          result.setNumFields(fields.length);
         }
-        for (int i = 0; i < readColumnCount; ++i) {
+        for (int i = 0; i < fields.length; ++i) {
           if (fields[i] != null) {
             result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
           }
         }
-        if (resultColumnCount > readColumnCount) {
-          for (int i = readColumnCount; i < resultColumnCount; ++i) {
-            // Default new treeReaderSchema evolution fields to NULL.
-            result.setFieldValue(i, null);
-          }
-        }
       }
       return result;
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      final ColumnVector[] result;
-      if (previousVector == null) {
-        result = new ColumnVector[readColumnCount];
-      } else {
-        result = (ColumnVector[]) previousVector;
+    public void nextBatch(VectorizedRowBatch batch,
+                          int batchSize) throws IOException {
+      for(int i=0; i < fields.length &&
+          (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+        batch.cols[i].reset();
+        batch.cols[i].ensureSize((int) batchSize, false);
+        fields[i].nextVector(batch.cols[i], null, batchSize);
       }
+    }
 
-      // Read all the members of struct as column vectors
-      for (int i = 0; i < readColumnCount; i++) {
-        if (fields[i] != null) {
-          if (result[i] == null) {
-            result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
-          } else {
-            fields[i].nextVector(result[i], batchSize);
-          }
-        }
-      }
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      super.nextVector(previousVector, isNull, batchSize);
+      StructColumnVector result = (StructColumnVector) previousVector;
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        result.isRepeating = false;
 
-      // Default additional treeReaderSchema evolution fields to NULL.
-      if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) {
-        for (int i = readColumnCount; i < vectorColumnCount; ++i) {
-          ColumnVector colVector = result[i];
-          if (colVector != null) {
-            colVector.isRepeating = true;
-            colVector.noNulls = false;
-            colVector.isNull[0] = true;
+        // Read all the members of struct as column vectors
+        boolean[] mask = result.noNulls ? null : result.isNull;
+        for (int f = 0; f < fields.length; f++) {
+          if (fields[f] != null) {
+            fields[f].nextVector(result.fields[f], mask, batchSize);
           }
         }
       }
-
-      return result;
     }
 
     @Override
@@ -2208,19 +2140,18 @@ public class TreeReaderFactory {
     protected final TreeReader[] fields;
     protected RunLengthByteReader tags;
 
-    protected UnionTreeReader(int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
-      super(columnId);
-      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-      int fieldCount = type.getSubtypesCount();
+    protected UnionTreeReader(int fileColumn,
+                              TypeDescription readerSchema,
+                              SchemaEvolution treeReaderSchema,
+                              boolean[] included,
+                              boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      List<TypeDescription> childrenTypes = readerSchema.getChildren();
+      int fieldCount = childrenTypes.size();
       this.fields = new TreeReader[fieldCount];
       for (int i = 0; i < fieldCount; ++i) {
-        int subtype = type.getSubtypes(i);
-        if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
-        }
+        TypeDescription subtype = childrenTypes.get(i);
+        this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
       }
     }
 
@@ -2252,9 +2183,25 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previousVector, final int batchSize) throws IOException {
-      throw new UnsupportedOperationException(
-          "NextVector is not supported operation for Union type");
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      UnionColumnVector result = (UnionColumnVector) previousVector;
+      super.nextVector(result, isNull, batchSize);
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        result.isRepeating = false;
+        tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
+            batchSize);
+        boolean[] ignore = new boolean[(int) batchSize];
+        for (int f = 0; f < result.fields.length; ++f) {
+          // build the ignore list for this tag
+          for (int r = 0; r < batchSize; ++r) {
+            ignore[r] = (!result.noNulls && result.isNull[r]) ||
+                result.tags[r] != f;
+          }
+          fields[f].nextVector(result.fields[f], ignore, batchSize);
+        }
+      }
     }
 
     @Override
@@ -2288,13 +2235,15 @@ public class TreeReaderFactory {
     protected final TreeReader elementReader;
     protected IntegerReader lengths = null;
 
-    protected ListTreeReader(int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
-      super(columnId);
-      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-      elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
+    protected ListTreeReader(int fileColumn,
+                             TypeDescription readerSchema,
+                             SchemaEvolution treeReaderSchema,
+                             boolean[] included,
+                             boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      TypeDescription elementType = readerSchema.getChildren().get(0);
+      elementReader = createTreeReader(elementType, treeReaderSchema, included,
+          skipCorrupt);
     }
 
     @Override
@@ -2335,9 +2284,27 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previous, final int batchSize) throws IOException {
-      throw new UnsupportedOperationException(
-          "NextVector is not supported operation for List type");
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      ListColumnVector result = (ListColumnVector) previous;
+      super.nextVector(result, isNull, batchSize);
+      // if we have some none-null values, then read them
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        lengths.nextVector(result, result.lengths, batchSize);
+        // even with repeating lengths, the list doesn't repeat
+        result.isRepeating = false;
+        // build the offsets vector and figure out how many children to read
+        result.childCount = 0;
+        for (int r = 0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.offsets[r] = result.childCount;
+            result.childCount += result.lengths[r];
+          }
+        }
+        result.child.ensureSize(result.childCount, false);
+        elementReader.nextVector(result.child, null, result.childCount);
+      }
     }
 
     @Override
@@ -2378,24 +2345,16 @@ public class TreeReaderFactory {
     protected final TreeReader valueReader;
     protected IntegerReader lengths = null;
 
-    protected MapTreeReader(int columnId,
-        TreeReaderSchema treeReaderSchema,
-        boolean[] included,
-        boolean skipCorrupt) throws IOException {
-      super(columnId);
-      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-      int keyColumn = type.getSubtypes(0);
-      int valueColumn = type.getSubtypes(1);
-      if (included == null || included[keyColumn]) {
-        keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
-      } else {
-        keyReader = null;
-      }
-      if (included == null || included[valueColumn]) {
-        valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
-      } else {
-        valueReader = null;
-      }
+    protected MapTreeReader(int fileColumn,
+                            TypeDescription readerSchema,
+                            SchemaEvolution treeReaderSchema,
+                            boolean[] included,
+                            boolean skipCorrupt) throws IOException {
+      super(fileColumn);
+      TypeDescription keyType = readerSchema.getChildren().get(0);
+      TypeDescription valueType = readerSchema.getChildren().get(1);
+      keyReader = createTreeReader(keyType, treeReaderSchema, included, skipCorrupt);
+      valueReader = createTreeReader(valueType, treeReaderSchema, included, skipCorrupt);
     }
 
     @Override
@@ -2429,9 +2388,28 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Object nextVector(Object previous, final int batchSize) throws IOException {
-      throw new UnsupportedOperationException(
-          "NextVector is not supported operation for Map type");
+    public void nextVector(ColumnVector previous,
+                           boolean[] isNull,
+                           int batchSize) throws IOException {
+      MapColumnVector result = (MapColumnVector) previous;
+      super.nextVector(result, isNull, batchSize);
+      if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+        lengths.nextVector(result, result.lengths, batchSize);
+        // even with repeating lengths, the map doesn't repeat
+        result.isRepeating = false;
+        // build the offsets vector and figure out how many children to read
+        result.childCount = 0;
+        for (int r = 0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.offsets[r] = result.childCount;
+            result.childCount += result.lengths[r];
+          }
+        }
+        result.keys.ensureSize(result.childCount, false);
+        result.values.ensureSize(result.childCount, false);
+        keyReader.nextVector(result.keys, null, result.childCount);
+        valueReader.nextVector(result.values, null, result.childCount);
+      }
     }
 
     @Override
@@ -2471,61 +2449,61 @@ public class TreeReaderFactory {
     }
   }
 
-  public static TreeReader createTreeReader(int columnId,
-      TreeReaderSchema treeReaderSchema,
-      boolean[] included,
-      boolean skipCorrupt
-  ) throws IOException {
-    OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
-    switch (type.getKind()) {
+  public static TreeReader createTreeReader(TypeDescription readerType,
+                                            SchemaEvolution evolution,
+                                            boolean[] included,
+                                            boolean skipCorrupt
+                                            ) throws IOException {
+    TypeDescription fileType = evolution.getFileType(readerType);
+    if (fileType == null ||
+        (included != null && !included[readerType.getId()])) {
+      return new NullTreeReader(0);
+    }
+    switch (readerType.getCategory()) {
       case BOOLEAN:
-        return new BooleanTreeReader(columnId);
+        return new BooleanTreeReader(fileType.getId());
       case BYTE:
-        return new ByteTreeReader(columnId);
+        return new ByteTreeReader(fileType.getId());
       case DOUBLE:
-        return new DoubleTreeReader(columnId);
+        return new DoubleTreeReader(fileType.getId());
       case FLOAT:
-        return new FloatTreeReader(columnId);
+        return new FloatTreeReader(fileType.getId());
       case SHORT:
-        return new ShortTreeReader(columnId);
+        return new ShortTreeReader(fileType.getId());
       case INT:
-        return new IntTreeReader(columnId);
+        return new IntTreeReader(fileType.getId());
       case LONG:
-        return new LongTreeReader(columnId, skipCorrupt);
+        return new LongTreeReader(fileType.getId(), skipCorrupt);
       case STRING:
-        return new StringTreeReader(columnId);
+        return new StringTreeReader(fileType.getId());
       case CHAR:
-        if (!type.hasMaximumLength()) {
-          throw new IllegalArgumentException("ORC char type has no length specified");
-        }
-        return new CharTreeReader(columnId, type.getMaximumLength());
+        return new CharTreeReader(fileType.getId(), readerType.getMaxLength());
       case VARCHAR:
-        if (!type.hasMaximumLength()) {
-          throw new IllegalArgumentException("ORC varchar type has no length specified");
-        }
-        return new VarcharTreeReader(columnId, type.getMaximumLength());
+        return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength());
       case BINARY:
-        return new BinaryTreeReader(columnId);
+        return new BinaryTreeReader(fileType.getId());
       case TIMESTAMP:
-        return new TimestampTreeReader(columnId, skipCorrupt);
+        return new TimestampTreeReader(fileType.getId(), skipCorrupt);
       case DATE:
-        return new DateTreeReader(columnId);
+        return new DateTreeReader(fileType.getId());
       case DECIMAL:
-        int precision =
-            type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
-        int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
-        return new DecimalTreeReader(columnId, precision, scale);
+        return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(),
+            readerType.getScale());
       case STRUCT:
-        return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new StructTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
       case LIST:
-        return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new ListTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
       case MAP:
-        return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new MapTreeReader(fileType.getId(), readerType, evolution,
+            included, skipCorrupt);
       case UNION:
-        return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+        return new UnionTreeReader(fileType.getId(), readerType,
+            evolution, included, skipCorrupt);
       default:
         throw new IllegalArgumentException("Unsupported type " +
-            type.getKind());
+            readerType.getCategory());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index 816b52d..e4d2e6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -71,14 +71,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
         OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
       }
 
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
       /**
        * Do we have schema on read in the configuration variables?
        */
-      TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ false);
-
       List<OrcProto.Type> types = file.getTypes();
-      Reader.Options options = new Reader.Options();
-      options.schema(schema);
+      int dataColumns = rbCtx.getDataColumnCount();
+      TypeDescription schema =
+          OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns);
+      if (schema == null) {
+        schema = file.getSchema();
+        // Even if the user isn't doing schema evolution, cut the schema
+        // to the desired size.
+        if (schema.getCategory() == TypeDescription.Category.STRUCT &&
+            schema.getChildren().size() > dataColumns) {
+          schema = schema.clone();
+          List<TypeDescription> children = schema.getChildren();
+          for(int c = children.size() - 1; c >= dataColumns; --c) {
+            children.remove(c);
+          }
+        }
+      }
+      Reader.Options options = new Reader.Options().schema(schema);
+
       this.offset = fileSplit.getStart();
       this.length = fileSplit.getLength();
       options.range(offset, length);
@@ -87,8 +102,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
 
       this.reader = file.rowsOptions(options);
 
-      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
-
       columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
 
       int partitionColumnCount = rbCtx.getPartitionColumnCount();
@@ -103,9 +116,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
     @Override
     public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
 
-      if (!reader.hasNext()) {
-        return false;
-      }
       try {
         // Check and update partition cols if necessary. Ideally, this should be done
         // in CreateValue as the partition is constant per split. But since Hive uses
@@ -118,7 +128,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
           }
           addPartitionCols = false;
         }
-        reader.nextBatch(value);
+        if (!reader.nextBatch(value)) {
+          return false;
+        }
       } catch (Exception e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 70fe803..8e52907 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -101,8 +101,6 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
     }
   }
 
-  private static final long NANOS_PER_MILLI = 1000000;
-
   /**
    * Set the value for a given column value within a batch.
    * @param rowId the row to set