You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:46 UTC

[15/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachan

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index ecd9b14..22f61ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -30,6 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -62,10 +66,64 @@ import org.apache.hadoop.io.Text;
  */
 public class TreeReaderFactory {
 
-  protected abstract static class TreeReader {
+  private static final Log LOG = LogFactory.getLog(TreeReaderFactory.class);
+
+  public static class TreeReaderSchema {
+
+    /**
+     * The types in the ORC file.
+     */
+    List<OrcProto.Type> fileTypes;
+
+    /**
+     * The treeReaderSchema that the reader should read as.
+     */
+    List<OrcProto.Type> schemaTypes;
+
+    /**
+     * The subtype of the row STRUCT.  Different than 0 for ACID.
+     */
+    int innerStructSubtype;
+
+    public TreeReaderSchema() {
+      fileTypes = null;
+      schemaTypes = null;
+      innerStructSubtype = -1;
+    }
+
+    public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
+      this.fileTypes = fileTypes;
+      return this;
+    }
+
+    public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
+      this.schemaTypes = schemaTypes;
+      return this;
+    }
+
+    public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
+      this.innerStructSubtype = innerStructSubtype;
+      return this;
+    }
+
+    public List<OrcProto.Type> getFileTypes() {
+      return fileTypes;
+    }
+
+    public List<OrcProto.Type> getSchemaTypes() {
+      return schemaTypes;
+    }
+
+    public int getInnerStructSubtype() {
+      return innerStructSubtype;
+    }
+  }
+
+  public abstract static class TreeReader {
     protected final int columnId;
     protected BitFieldReader present = null;
     protected boolean valuePresent = false;
+    protected int vectorColumnCount;
 
     TreeReader(int columnId) throws IOException {
       this(columnId, null);
@@ -79,6 +137,11 @@ public class TreeReaderFactory {
       } else {
         present = new BitFieldReader(in, 1);
       }
+      vectorColumnCount = -1;
+    }
+
+    void setVectorColumnCount(int vectorColumnCount) {
+      this.vectorColumnCount = vectorColumnCount;
     }
 
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
@@ -1947,24 +2010,56 @@ public class TreeReaderFactory {
   }
 
   protected static class StructTreeReader extends TreeReader {
+    private final int fileColumnCount;
+    private final int resultColumnCount;
     protected final TreeReader[] fields;
     private final String[] fieldNames;
 
-    StructTreeReader(int columnId,
-        List<OrcProto.Type> types,
+    protected StructTreeReader(
+        int columnId,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
-      int fieldCount = type.getFieldNamesCount();
-      this.fields = new TreeReader[fieldCount];
-      this.fieldNames = new String[fieldCount];
-      for (int i = 0; i < fieldCount; ++i) {
-        int subtype = type.getSubtypes(i);
-        if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+
+      OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
+      fileColumnCount = fileStructType.getFieldNamesCount();
+
+      OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+
+      if (columnId == treeReaderSchema.getInnerStructSubtype()) {
+        // If there are more result columns than reader columns, we will default those additional
+        // columns to NULL.
+        resultColumnCount = schemaStructType.getFieldNamesCount();
+      } else {
+        resultColumnCount = fileColumnCount;
+      }
+
+      this.fields = new TreeReader[fileColumnCount];
+      this.fieldNames = new String[fileColumnCount];
+
+      if (included == null) {
+        for (int i = 0; i < fileColumnCount; ++i) {
+          int subtype = schemaStructType.getSubtypes(i);
+          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+          this.fieldNames[i] = schemaStructType.getFieldNames(i);
+        }
+      } else {
+        for (int i = 0; i < fileColumnCount; ++i) {
+          int subtype = schemaStructType.getSubtypes(i);
+          if (subtype >= included.length) {
+            throw new IOException("subtype " + subtype + " exceeds the included array size " +
+                included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
+                " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
+                " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
+          }
+          if (included[subtype]) {
+            this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+          }
+          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+          this.fieldNames[i] = schemaStructType.getFieldNames(i);
         }
-        this.fieldNames[i] = type.getFieldNames(i);
       }
     }
 
@@ -1984,22 +2079,28 @@ public class TreeReaderFactory {
       OrcStruct result = null;
       if (valuePresent) {
         if (previous == null) {
-          result = new OrcStruct(fields.length);
+          result = new OrcStruct(resultColumnCount);
         } else {
           result = (OrcStruct) previous;
 
           // If the input format was initialized with a file with a
           // different number of fields, the number of fields needs to
           // be updated to the correct number
-          if (result.getNumFields() != fields.length) {
-            result.setNumFields(fields.length);
+          if (result.getNumFields() != resultColumnCount) {
+            result.setNumFields(resultColumnCount);
           }
         }
-        for (int i = 0; i < fields.length; ++i) {
+        for (int i = 0; i < fileColumnCount; ++i) {
           if (fields[i] != null) {
             result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
           }
         }
+        if (resultColumnCount > fileColumnCount) {
+          for (int i = fileColumnCount; i < resultColumnCount; ++i) {
+            // Default new treeReaderSchema evolution fields to NULL.
+            result.setFieldValue(i, null);
+          }
+        }
       }
       return result;
     }
@@ -2008,13 +2109,13 @@ public class TreeReaderFactory {
     public Object nextVector(Object previousVector, long batchSize) throws IOException {
       final ColumnVector[] result;
       if (previousVector == null) {
-        result = new ColumnVector[fields.length];
+        result = new ColumnVector[fileColumnCount];
       } else {
         result = (ColumnVector[]) previousVector;
       }
 
       // Read all the members of struct as column vectors
-      for (int i = 0; i < fields.length; i++) {
+      for (int i = 0; i < fileColumnCount; i++) {
         if (fields[i] != null) {
           if (result[i] == null) {
             result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
@@ -2023,6 +2124,19 @@ public class TreeReaderFactory {
           }
         }
       }
+
+      // Default additional treeReaderSchema evolution fields to NULL.
+      if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) {
+        for (int i = fileColumnCount; i < vectorColumnCount; ++i) {
+          ColumnVector colVector = result[i];
+          if (colVector != null) {
+            colVector.isRepeating = true;
+            colVector.noNulls = false;
+            colVector.isNull[0] = true;
+          }
+        }
+      }
+
       return result;
     }
 
@@ -2053,18 +2167,18 @@ public class TreeReaderFactory {
     protected final TreeReader[] fields;
     protected RunLengthByteReader tags;
 
-    UnionTreeReader(int columnId,
-        List<OrcProto.Type> types,
+    protected UnionTreeReader(int columnId,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
+      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
       int fieldCount = type.getSubtypesCount();
       this.fields = new TreeReader[fieldCount];
       for (int i = 0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
         }
       }
     }
@@ -2133,13 +2247,13 @@ public class TreeReaderFactory {
     protected final TreeReader elementReader;
     protected IntegerReader lengths = null;
 
-    ListTreeReader(int columnId,
-        List<OrcProto.Type> types,
+    protected ListTreeReader(int columnId,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
-      elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt);
+      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
+      elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
     }
 
     @Override
@@ -2223,21 +2337,21 @@ public class TreeReaderFactory {
     protected final TreeReader valueReader;
     protected IntegerReader lengths = null;
 
-    MapTreeReader(int columnId,
-        List<OrcProto.Type> types,
+    protected MapTreeReader(int columnId,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
+      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
       int keyColumn = type.getSubtypes(0);
       int valueColumn = type.getSubtypes(1);
       if (included == null || included[keyColumn]) {
-        keyReader = createTreeReader(keyColumn, types, included, skipCorrupt);
+        keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
       } else {
         keyReader = null;
       }
       if (included == null || included[valueColumn]) {
-        valueReader = createTreeReader(valueColumn, types, included, skipCorrupt);
+        valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
       } else {
         valueReader = null;
       }
@@ -2317,11 +2431,11 @@ public class TreeReaderFactory {
   }
 
   public static TreeReader createTreeReader(int columnId,
-      List<OrcProto.Type> types,
+      TreeReaderSchema treeReaderSchema,
       boolean[] included,
       boolean skipCorrupt
   ) throws IOException {
-    OrcProto.Type type = types.get(columnId);
+    OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
     switch (type.getKind()) {
       case BOOLEAN:
         return new BooleanTreeReader(columnId);
@@ -2361,13 +2475,13 @@ public class TreeReaderFactory {
         int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
         return new DecimalTreeReader(columnId, precision, scale);
       case STRUCT:
-        return new StructTreeReader(columnId, types, included, skipCorrupt);
+        return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       case LIST:
-        return new ListTreeReader(columnId, types, included, skipCorrupt);
+        return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       case MAP:
-        return new MapTreeReader(columnId, types, included, skipCorrupt);
+        return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       case UNION:
-        return new UnionTreeReader(columnId, types, included, skipCorrupt);
+        return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       default:
         throw new IllegalArgumentException("Unsupported type " +
             type.getKind());

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
new file mode 100644
index 0000000..3c0d590
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is the description of the types in an ORC file.
+ */
+public class TypeDescription {
+  private static final int MAX_PRECISION = 38;
+  private static final int MAX_SCALE = 38;
+  private static final int DEFAULT_PRECISION = 38;
+  private static final int DEFAULT_SCALE = 10;
+  private static final int DEFAULT_LENGTH = 256;
+  public enum Category {
+    BOOLEAN("boolean", true),
+    BYTE("tinyint", true),
+    SHORT("smallint", true),
+    INT("int", true),
+    LONG("bigint", true),
+    FLOAT("float", true),
+    DOUBLE("double", true),
+    STRING("string", true),
+    DATE("date", true),
+    TIMESTAMP("timestamp", true),
+    BINARY("binary", true),
+    DECIMAL("decimal", true),
+    VARCHAR("varchar", true),
+    CHAR("char", true),
+    LIST("array", false),
+    MAP("map", false),
+    STRUCT("struct", false),
+    UNION("union", false);
+
+    Category(String name, boolean isPrimitive) {
+      this.name = name;
+      this.isPrimitive = isPrimitive;
+    }
+
+    final boolean isPrimitive;
+    final String name;
+
+    public boolean isPrimitive() {
+      return isPrimitive;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  public static TypeDescription createBoolean() {
+    return new TypeDescription(Category.BOOLEAN);
+  }
+
+  public static TypeDescription createByte() {
+    return new TypeDescription(Category.BYTE);
+  }
+
+  public static TypeDescription createShort() {
+    return new TypeDescription(Category.SHORT);
+  }
+
+  public static TypeDescription createInt() {
+    return new TypeDescription(Category.INT);
+  }
+
+  public static TypeDescription createLong() {
+    return new TypeDescription(Category.LONG);
+  }
+
+  public static TypeDescription createFloat() {
+    return new TypeDescription(Category.FLOAT);
+  }
+
+  public static TypeDescription createDouble() {
+    return new TypeDescription(Category.DOUBLE);
+  }
+
+  public static TypeDescription createString() {
+    return new TypeDescription(Category.STRING);
+  }
+
+  public static TypeDescription createDate() {
+    return new TypeDescription(Category.DATE);
+  }
+
+  public static TypeDescription createTimestamp() {
+    return new TypeDescription(Category.TIMESTAMP);
+  }
+
+  public static TypeDescription createBinary() {
+    return new TypeDescription(Category.BINARY);
+  }
+
+  public static TypeDescription createDecimal() {
+    return new TypeDescription(Category.DECIMAL);
+  }
+
+  /**
+   * For decimal types, set the precision.
+   * @param precision the new precision
+   * @return this
+   */
+  public TypeDescription withPrecision(int precision) {
+    if (category != Category.DECIMAL) {
+      throw new IllegalArgumentException("precision is only allowed on decimal"+
+         " and not " + category.name);
+    } else if (precision < 1 || precision > MAX_PRECISION || scale > precision){
+      throw new IllegalArgumentException("precision " + precision +
+          " is out of range 1 .. " + scale);
+    }
+    this.precision = precision;
+    return this;
+  }
+
+  /**
+   * For decimal types, set the scale.
+   * @param scale the new scale
+   * @return this
+   */
+  public TypeDescription withScale(int scale) {
+    if (category != Category.DECIMAL) {
+      throw new IllegalArgumentException("scale is only allowed on decimal"+
+          " and not " + category.name);
+    } else if (scale < 0 || scale > MAX_SCALE || scale > precision) {
+      throw new IllegalArgumentException("scale is out of range at " + scale);
+    }
+    this.scale = scale;
+    return this;
+  }
+
+  public static TypeDescription createVarchar() {
+    return new TypeDescription(Category.VARCHAR);
+  }
+
+  public static TypeDescription createChar() {
+    return new TypeDescription(Category.CHAR);
+  }
+
+  /**
+   * Set the maximum length for char and varchar types.
+   * @param maxLength the maximum value
+   * @return this
+   */
+  public TypeDescription withMaxLength(int maxLength) {
+    if (category != Category.VARCHAR && category != Category.CHAR) {
+      throw new IllegalArgumentException("maxLength is only allowed on char" +
+                   " and varchar and not " + category.name);
+    }
+    this.maxLength = maxLength;
+    return this;
+  }
+
+  public static TypeDescription createList(TypeDescription childType) {
+    TypeDescription result = new TypeDescription(Category.LIST);
+    result.children.add(childType);
+    childType.parent = result;
+    return result;
+  }
+
+  public static TypeDescription createMap(TypeDescription keyType,
+                                          TypeDescription valueType) {
+    TypeDescription result = new TypeDescription(Category.MAP);
+    result.children.add(keyType);
+    result.children.add(valueType);
+    keyType.parent = result;
+    valueType.parent = result;
+    return result;
+  }
+
+  public static TypeDescription createUnion() {
+    return new TypeDescription(Category.UNION);
+  }
+
+  public static TypeDescription createStruct() {
+    return new TypeDescription(Category.STRUCT);
+  }
+
+  /**
+   * Add a child to a union type.
+   * @param child a new child type to add
+   * @return the union type.
+   */
+  public TypeDescription addUnionChild(TypeDescription child) {
+    if (category != Category.UNION) {
+      throw new IllegalArgumentException("Can only add types to union type" +
+          " and not " + category);
+    }
+    children.add(child);
+    child.parent = this;
+    return this;
+  }
+
+  /**
+   * Add a field to a struct type as it is built.
+   * @param field the field name
+   * @param fieldType the type of the field
+   * @return the struct type
+   */
+  public TypeDescription addField(String field, TypeDescription fieldType) {
+    if (category != Category.STRUCT) {
+      throw new IllegalArgumentException("Can only add fields to struct type" +
+          " and not " + category);
+    }
+    fieldNames.add(field);
+    children.add(fieldType);
+    fieldType.parent = this;
+    return this;
+  }
+
+  /**
+   * Get the id for this type.
+   * The first call will cause all of the the ids in tree to be assigned, so
+   * it should not be called before the type is completely built.
+   * @return the sequential id
+   */
+  public int getId() {
+    // if the id hasn't been assigned, assign all of the ids from the root
+    if (id == -1) {
+      TypeDescription root = this;
+      while (root.parent != null) {
+        root = root.parent;
+      }
+      root.assignIds(0);
+    }
+    return id;
+  }
+
+  /**
+   * Get the maximum id assigned to this type or its children.
+   * The first call will cause all of the the ids in tree to be assigned, so
+   * it should not be called before the type is completely built.
+   * @return the maximum id assigned under this type
+   */
+  public int getMaximumId() {
+    // if the id hasn't been assigned, assign all of the ids from the root
+    if (maxId == -1) {
+      TypeDescription root = this;
+      while (root.parent != null) {
+        root = root.parent;
+      }
+      root.assignIds(0);
+    }
+    return maxId;
+  }
+
+  private ColumnVector createColumn() {
+    switch (category) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case TIMESTAMP:
+      case DATE:
+        return new LongColumnVector();
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleColumnVector();
+      case DECIMAL:
+        return new DecimalColumnVector(precision, scale);
+      case STRING:
+      case BINARY:
+      case CHAR:
+      case VARCHAR:
+        return new BytesColumnVector();
+      default:
+        throw new IllegalArgumentException("Unknown type " + category);
+    }
+  }
+
+  public VectorizedRowBatch createRowBatch() {
+    VectorizedRowBatch result;
+    if (category == Category.STRUCT) {
+      result = new VectorizedRowBatch(children.size(),
+          VectorizedRowBatch.DEFAULT_SIZE);
+      for(int i=0; i < result.cols.length; ++i) {
+        result.cols[i] = children.get(i).createColumn();
+      }
+    } else {
+      result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE);
+      result.cols[0] = createColumn();
+    }
+    result.reset();
+    return result;
+  }
+
+  /**
+   * Get the kind of this type.
+   * @return get the category for this type.
+   */
+  public Category getCategory() {
+    return category;
+  }
+
+  /**
+   * Get the maximum length of the type. Only used for char and varchar types.
+   * @return the maximum length of the string type
+   */
+  public int getMaxLength() {
+    return maxLength;
+  }
+
+  /**
+   * Get the precision of the decimal type.
+   * @return the number of digits for the precision.
+   */
+  public int getPrecision() {
+    return precision;
+  }
+
+  /**
+   * Get the scale of the decimal type.
+   * @return the number of digits for the scale.
+   */
+  public int getScale() {
+    return scale;
+  }
+
+  /**
+   * For struct types, get the list of field names.
+   * @return the list of field names.
+   */
+  public List<String> getFieldNames() {
+    return Collections.unmodifiableList(fieldNames);
+  }
+
+  /**
+   * Get the subtypes of this type.
+   * @return the list of children types
+   */
+  public List<TypeDescription> getChildren() {
+    return children == null ? null : Collections.unmodifiableList(children);
+  }
+
+  /**
+   * Assign ids to all of the nodes under this one.
+   * @param startId the lowest id to assign
+   * @return the next available id
+   */
+  private int assignIds(int startId) {
+    id = startId++;
+    if (children != null) {
+      for (TypeDescription child : children) {
+        startId = child.assignIds(startId);
+      }
+    }
+    maxId = startId - 1;
+    return startId;
+  }
+
+  private TypeDescription(Category category) {
+    this.category = category;
+    if (category.isPrimitive) {
+      children = null;
+    } else {
+      children = new ArrayList<>();
+    }
+    if (category == Category.STRUCT) {
+      fieldNames = new ArrayList<>();
+    } else {
+      fieldNames = null;
+    }
+  }
+
+  private int id = -1;
+  private int maxId = -1;
+  private TypeDescription parent;
+  private final Category category;
+  private final List<TypeDescription> children;
+  private final List<String> fieldNames;
+  private int maxLength = DEFAULT_LENGTH;
+  private int precision = DEFAULT_PRECISION;
+  private int scale = DEFAULT_SCALE;
+
+  public void printToBuffer(StringBuilder buffer) {
+    buffer.append(category.name);
+    switch (category) {
+      case DECIMAL:
+        buffer.append('(');
+        buffer.append(precision);
+        buffer.append(',');
+        buffer.append(scale);
+        buffer.append(')');
+        break;
+      case CHAR:
+      case VARCHAR:
+        buffer.append('(');
+        buffer.append(maxLength);
+        buffer.append(')');
+        break;
+      case LIST:
+      case MAP:
+      case UNION:
+        buffer.append('<');
+        for(int i=0; i < children.size(); ++i) {
+          if (i != 0) {
+            buffer.append(',');
+          }
+          children.get(i).printToBuffer(buffer);
+        }
+        buffer.append('>');
+        break;
+      case STRUCT:
+        buffer.append('<');
+        for(int i=0; i < children.size(); ++i) {
+          if (i != 0) {
+            buffer.append(',');
+          }
+          buffer.append(fieldNames.get(i));
+          buffer.append(':');
+          children.get(i).printToBuffer(buffer);
+        }
+        buffer.append('>');
+        break;
+      default:
+        break;
+    }
+  }
+
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    printToBuffer(buffer);
+    return buffer.toString();
+  }
+
+  private void printJsonToBuffer(String prefix, StringBuilder buffer,
+                                 int indent) {
+    for(int i=0; i < indent; ++i) {
+      buffer.append(' ');
+    }
+    buffer.append(prefix);
+    buffer.append("{\"category\": \"");
+    buffer.append(category.name);
+    buffer.append("\", \"id\": ");
+    buffer.append(getId());
+    buffer.append(", \"max\": ");
+    buffer.append(maxId);
+    switch (category) {
+      case DECIMAL:
+        buffer.append(", \"precision\": ");
+        buffer.append(precision);
+        buffer.append(", \"scale\": ");
+        buffer.append(scale);
+        break;
+      case CHAR:
+      case VARCHAR:
+        buffer.append(", \"length\": ");
+        buffer.append(maxLength);
+        break;
+      case LIST:
+      case MAP:
+      case UNION:
+        buffer.append(", \"children\": [");
+        for(int i=0; i < children.size(); ++i) {
+          buffer.append('\n');
+          children.get(i).printJsonToBuffer("", buffer, indent + 2);
+          if (i != children.size() - 1) {
+            buffer.append(',');
+          }
+        }
+        buffer.append("]");
+        break;
+      case STRUCT:
+        buffer.append(", \"fields\": [");
+        for(int i=0; i < children.size(); ++i) {
+          buffer.append('\n');
+          children.get(i).printJsonToBuffer("\"" + fieldNames.get(i) + "\": ",
+              buffer, indent + 2);
+          if (i != children.size() - 1) {
+            buffer.append(',');
+          }
+        }
+        buffer.append(']');
+        break;
+      default:
+        break;
+    }
+    buffer.append('}');
+  }
+
+  public String toJson() {
+    StringBuilder buffer = new StringBuilder();
+    printJsonToBuffer("", buffer, 0);
+    return buffer.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
index a8e5c2e..a2725b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,31 +45,25 @@ class VectorizedOrcAcidRowReader
   private final AcidInputFormat.RowReader<OrcStruct> innerReader;
   private final RecordIdentifier key;
   private final OrcStruct value;
-  private final VectorizedRowBatchCtx rowBatchCtx;
+  private VectorizedRowBatchCtx rbCtx;
+  private Object[] partitionValues;
   private final ObjectInspector objectInspector;
   private final DataOutputBuffer buffer = new DataOutputBuffer();
 
   VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
                              Configuration conf,
+                             VectorizedRowBatchCtx vectorizedRowBatchCtx,
                              FileSplit split) throws IOException {
     this.innerReader = inner;
     this.key = inner.createKey();
-    this.rowBatchCtx = new VectorizedRowBatchCtx();
+    rbCtx = vectorizedRowBatchCtx;
+    int partitionColumnCount = rbCtx.getPartitionColumnCount();
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
+    }
     this.value = inner.createValue();
     this.objectInspector = inner.getObjectInspector();
-    try {
-      rowBatchCtx.init(conf, split);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (SerDeException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (InstantiationException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (HiveException e) {
-      throw new IOException("Failed to initialize context", e);
-    }
   }
 
   @Override
@@ -82,23 +75,21 @@ class VectorizedOrcAcidRowReader
     if (!innerReader.next(key, value)) {
       return false;
     }
-    try {
-      rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch);
-    } catch (HiveException e) {
-      throw new IOException("Problem adding partition column", e);
+    if (partitionValues != null) {
+      rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues);
     }
     try {
       VectorizedBatchUtil.acidAddRowToBatch(value,
           (StructObjectInspector) objectInspector,
-          vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+          vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
       while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
           innerReader.next(key, value)) {
         VectorizedBatchUtil.acidAddRowToBatch(value,
             (StructObjectInspector) objectInspector,
-            vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+            vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
       }
-    } catch (HiveException he) {
-      throw new IOException("error iterating", he);
+    } catch (Exception e) {
+      throw new IOException("error iterating", e);
     }
     return true;
   }
@@ -110,11 +101,7 @@ class VectorizedOrcAcidRowReader
 
   @Override
   public VectorizedRowBatch createValue() {
-    try {
-      return rowBatchCtx.createVectorizedRowBatch();
-    } catch (HiveException e) {
-      throw new RuntimeException("Error creating a batch", e);
-    }
+    return rbCtx.createVectorizedRowBatch();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index bf09001..d90425a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,14 +26,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -47,7 +44,8 @@ import org.apache.hadoop.mapred.Reporter;
  * A MapReduce/Hive input format for ORC files.
  */
 public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
-    implements InputFormatChecker, VectorizedInputFormatInterface {
+    implements InputFormatChecker, VectorizedInputFormatInterface,
+    SelfDescribingInputFormatInterface {
 
   static class VectorizedOrcRecordReader
       implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -56,12 +54,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
     private final long length;
     private float progress = 0.0f;
     private VectorizedRowBatchCtx rbCtx;
+    private final boolean[] columnsToIncludeTruncated;
+    private final Object[] partitionValues;
     private boolean addPartitionCols = true;
 
     VectorizedOrcRecordReader(Reader file, Configuration conf,
         FileSplit fileSplit) throws IOException {
+
+      // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits,
+      // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this).
+      //
+      // Why would an ACID table reach here instead of VectorizedOrcAcidRowReader?
+      // OrcInputFormat.getRecordReader will use this reader for original files that have no deltas.
+      //
+      boolean isAcid = (fileSplit instanceof OrcSplit);
+
+      /**
+       * Do we have schema on read in the configuration variables?
+       */
+      TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, isAcid);
+
       List<OrcProto.Type> types = file.getTypes();
       Reader.Options options = new Reader.Options();
+      options.schema(schema);
       this.offset = fileSplit.getStart();
       this.length = fileSplit.getLength();
       options.range(offset, length);
@@ -69,11 +84,17 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
       OrcInputFormat.setSearchArgument(options, types, conf, true);
 
       this.reader = file.rowsOptions(options);
-      try {
-        rbCtx = new VectorizedRowBatchCtx();
-        rbCtx.init(conf, fileSplit);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+      columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
+
+      int partitionColumnCount = rbCtx.getPartitionColumnCount();
+      if (partitionColumnCount > 0) {
+        partitionValues = new Object[partitionColumnCount];
+        rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+      } else {
+        partitionValues = null;
       }
     }
 
@@ -90,7 +111,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
         // as this does not call CreateValue for each new RecordReader it creates, this check is
         // required in next()
         if (addPartitionCols) {
-          rbCtx.addPartitionColsToBatch(value);
+          if (partitionValues != null) {
+            rbCtx.addPartitionColsToBatch(value, partitionValues);
+          }
           addPartitionCols = false;
         }
         reader.nextBatch(value);
@@ -108,11 +131,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
 
     @Override
     public VectorizedRowBatch createValue() {
-      try {
-        return rbCtx.createVectorizedRowBatch();
-      } catch (HiveException e) {
-        throw new RuntimeException("Error creating a batch", e);
-      }
+      return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index ed99615..c6070c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,8 +14,11 @@
 package org.apache.hadoop.hive.ql.io.parquet;
 
 import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
@@ -23,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -32,7 +36,6 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-
 import org.apache.parquet.hadoop.ParquetInputFormat;
 
 /**
@@ -52,6 +55,7 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
 
     private final ParquetRecordReaderWrapper internalReader;
       private VectorizedRowBatchCtx rbCtx;
+      private Object[] partitionValues;
       private ArrayWritable internalValues;
       private NullWritable internalKey;
       private VectorColumnAssign[] assigners;
@@ -65,11 +69,11 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
         split,
         conf,
         reporter);
-      try {
-        rbCtx = new VectorizedRowBatchCtx();
-        rbCtx.init(conf, split);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+      int partitionColumnCount = rbCtx.getPartitionColumnCount();
+      if (partitionColumnCount > 0) {
+        partitionValues = new Object[partitionColumnCount];
+        rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
       }
     }
 
@@ -81,13 +85,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
 
       @Override
       public VectorizedRowBatch createValue() {
-        VectorizedRowBatch outputBatch = null;
-        try {
-          outputBatch = rbCtx.createVectorizedRowBatch();
-          internalValues = internalReader.createValue();
-        } catch (HiveException e) {
-          throw new RuntimeException("Error creating a batch", e);
-        }
+        VectorizedRowBatch outputBatch;
+        outputBatch = rbCtx.createVectorizedRowBatch();
+        internalValues = internalReader.createValue();
         return outputBatch;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5708cb8..40d0e34 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -109,6 +109,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.InputFormat;
 
@@ -700,6 +702,11 @@ public final class GenMapRedUtils {
       parseCtx.getGlobalLimitCtx().disableOpt();
     }
 
+    if (topOp instanceof TableScanOperator) {
+      Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),
+          (TableScanOperator) topOp);
+    }
+
     Iterator<Path> iterPath = partDir.iterator();
     Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator();
 
@@ -761,6 +768,7 @@ public final class GenMapRedUtils {
    *          whether you need to add to map-reduce or local work
    * @param tt_desc
    *          table descriptor
+   * @throws SerDeException
    */
   public static void setTaskPlan(String path, String alias,
       Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
@@ -770,6 +778,16 @@ public final class GenMapRedUtils {
       return;
     }
 
+    if (topOp instanceof TableScanOperator) {
+      try {
+      Utilities.addSchemaEvolutionToTableScanOperator(
+          (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(),
+          (TableScanOperator) topOp);
+      } catch (Exception e) {
+        throw new SemanticException(e);
+      }
+    }
+
     if (!local) {
       if (plan.getPathToAliases().get(path) == null) {
         plan.getPathToAliases().put(path, new ArrayList<String>());

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 2af6f9a..20e1ee6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -370,6 +370,7 @@ public class SimpleFetchOptimizer implements Transform {
 
     private FetchWork convertToWork() throws HiveException {
       inputs.clear();
+      Utilities.addSchemaEvolutionToTableScanOperator(table, scanOp);
       TableDesc tableDesc = Utilities.getTableDesc(table);
       if (!table.isPartitioned()) {
         inputs.add(new ReadEntity(table, parent, !table.isView() && parent == null));

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 20f9400..5d010cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
@@ -33,6 +34,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.*;
@@ -63,6 +65,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -88,6 +95,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
@@ -100,6 +108,7 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -149,6 +158,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import com.google.common.base.Joiner;
+
 public class Vectorizer implements PhysicalPlanResolver {
 
   protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class);
@@ -311,17 +322,51 @@ public class Vectorizer implements PhysicalPlanResolver {
     supportedAggregationUdfs.add("stddev_samp");
   }
 
+  private class VectorTaskColumnInfo {
+    List<String> columnNames;
+    List<TypeInfo> typeInfos;
+    int partitionColumnCount;
+
+    String[] scratchTypeNameArray;
+
+    VectorTaskColumnInfo() {
+      partitionColumnCount = 0;
+    }
+
+    public void setColumnNames(List<String> columnNames) {
+      this.columnNames = columnNames;
+    }
+    public void setTypeInfos(List<TypeInfo> typeInfos) {
+      this.typeInfos = typeInfos;
+    }
+    public void setPartitionColumnCount(int partitionColumnCount) {
+      this.partitionColumnCount = partitionColumnCount;
+    }
+    public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
+      this.scratchTypeNameArray = scratchTypeNameArray;
+    }
+
+    public void transferToBaseWork(BaseWork baseWork) {
+
+      String[] columnNameArray = columnNames.toArray(new String[0]);
+      TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]);
+
+      VectorizedRowBatchCtx vectorizedRowBatchCtx =
+          new VectorizedRowBatchCtx(
+            columnNameArray,
+            typeInfoArray,
+            partitionColumnCount,
+            scratchTypeNameArray);
+      baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
+    }
+  }
+
   class VectorizationDispatcher implements Dispatcher {
 
     private final PhysicalContext physicalContext;
 
-    private List<String> reduceColumnNames;
-    private List<TypeInfo> reduceTypeInfos;
-
     public VectorizationDispatcher(PhysicalContext physicalContext) {
       this.physicalContext = physicalContext;
-      reduceColumnNames = null;
-      reduceTypeInfos = null;
     }
 
     @Override
@@ -359,9 +404,10 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
-      boolean ret = validateMapWork(mapWork, isTez);
+      VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+      boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
       if (ret) {
-        vectorizeMapWork(mapWork, isTez);
+        vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
       }
     }
 
@@ -372,40 +418,262 @@ public class Vectorizer implements PhysicalPlanResolver {
           + ReduceSinkOperator.getOperatorName()), np);
     }
 
-    private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
-      LOG.info("Validating MapWork...");
+    private ImmutablePair<String, TableScanOperator> verifyOnlyOneTableScanOperator(MapWork mapWork) {
 
       // Eliminate MR plans with more than one TableScanOperator.
+
       LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
       if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
-        return false;
+        return null;
       }
       int tableScanCount = 0;
-      for (Operator<?> op : aliasToWork.values()) {
+      String alias = "";
+      TableScanOperator tableScanOperator = null;
+      for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+        Operator<?> op = entry.getValue();
         if (op == null) {
           LOG.warn("Map work has invalid aliases to work with. Fail validation!");
-          return false;
+          return null;
         }
         if (op instanceof TableScanOperator) {
           tableScanCount++;
+          alias = entry.getKey();
+          tableScanOperator = (TableScanOperator) op;
         }
       }
       if (tableScanCount > 1) {
-        LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!");
-        return false;
+        LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!");
+        return null;
+      }
+      return new ImmutablePair(alias, tableScanOperator);
+    }
+
+    private void getTableScanOperatorSchemaInfo(TableScanOperator tableScanOperator,
+        List<String> logicalColumnNameList, List<TypeInfo> logicalTypeInfoList) {
+
+      TableScanDesc tableScanDesc = tableScanOperator.getConf();
+
+      // Add all non-virtual columns to make a vectorization context for
+      // the TableScan operator.
+      RowSchema rowSchema = tableScanOperator.getSchema();
+      for (ColumnInfo c : rowSchema.getSignature()) {
+        // Validation will later exclude vectorization of virtual columns usage (HIVE-5560).
+        if (!isVirtualColumn(c)) {
+          String columnName = c.getInternalName();
+          String typeName = c.getTypeName();
+          TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+          logicalColumnNameList.add(columnName);
+          logicalTypeInfoList.add(typeInfo);
+        }
+      }
+    }
+
+     private String getColumns(List<String> columnNames, int start, int length,
+        Character separator) {
+      return Joiner.on(separator).join(columnNames.subList(start, start + length));
+    }
+
+    private String getTypes(List<TypeInfo> typeInfos, int start, int length) {
+      return TypeInfoUtils.getTypesString(typeInfos.subList(start, start + length));
+    }
+
+    private boolean verifyAndSetVectorPartDesc(PartitionDesc pd) {
+
+      // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface
+      // and reads VectorizedRowBatch as a "row".
+
+      if (Utilities.isInputFileFormatVectorized(pd)) {
+
+        pd.setVectorPartitionDesc(VectorPartitionDesc.createVectorizedInputFileFormat());
+
+        return true;
       }
 
+      LOG.info("Input format: " + pd.getInputFileFormatClassName()
+          + ", doesn't provide vectorized input");
+
+      return false;
+    }
+
+    private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
+        TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) {
+
+      // These names/types are the data columns plus partition columns.
+      final List<String> allColumnNameList = new ArrayList<String>();
+      final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>();
+
+      getTableScanOperatorSchemaInfo(tableScanOperator, allColumnNameList, allTypeInfoList);
+      final int allColumnCount = allColumnNameList.size();
+
+      // Validate input format and schema evolution capability.
+
+      // For the table, enter a null value in the multi-key map indicating no conversion necessary
+      // if the schema matches the table.
+
+      HashMap<ImmutablePair, boolean[]> conversionMap = new HashMap<ImmutablePair, boolean[]>();
+
+      boolean isFirst = true;
+      int dataColumnCount = 0;
+      int partitionColumnCount = 0;
+
+      List<String> dataColumnList = null;
+      String dataColumnsString = "";
+      List<TypeInfo> dataTypeInfoList = null;
+
       // Validate the input format
-      for (String path : mapWork.getPathToPartitionInfo().keySet()) {
-        PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
-        List<Class<?>> interfaceList =
-            Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
-        if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
-          LOG.info("Input format: " + pd.getInputFileFormatClassName()
-              + ", doesn't provide vectorized input");
+      VectorPartitionConversion partitionConversion = new VectorPartitionConversion();
+      LinkedHashMap<String, ArrayList<String>> pathToAliases = mapWork.getPathToAliases();
+      LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+      for (Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) {
+        String path = entry.getKey();
+        List<String> aliases = entry.getValue();
+        boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
+        if (!isPresent) {
+          LOG.info("Alias " + alias + " not present in aliases " + aliases);
+          return false;
+        }
+        PartitionDesc partDesc = pathToPartitionInfo.get(path);
+        if (partDesc.getVectorPartitionDesc() != null) {
+          // We seen this already.
+          continue;
+        }
+        if (!verifyAndSetVectorPartDesc(partDesc)) {
           return false;
         }
+        VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+        LOG.info("Vectorizer path: " + path + ", read type " +
+            vectorPartDesc.getVectorMapOperatorReadType().name() + ", aliases " + aliases);
+
+        Properties partProps = partDesc.getProperties();
+
+        String nextDataColumnsString =
+            partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+        String[] nextDataColumns = nextDataColumnsString.split(",");
+
+        String nextDataTypesString =
+            partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+
+        // We convert to an array of TypeInfo using a library routine since it parses the information
+        // and can handle use of different separators, etc.  We cannot use the raw type string
+        // for comparison in the map because of the different separators used.
+        List<TypeInfo> nextDataTypeInfoList =
+            TypeInfoUtils.getTypeInfosFromTypeString(nextDataTypesString);
+
+        if (isFirst) {
+
+          // We establish with the first one whether the table is partitioned or not.
+
+          LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+          if (partSpec != null && partSpec.size() > 0) {
+            partitionColumnCount = partSpec.size();
+            dataColumnCount = allColumnCount - partitionColumnCount;
+          } else {
+            partitionColumnCount = 0;
+            dataColumnCount = allColumnCount;
+          }
+
+          dataColumnList = allColumnNameList.subList(0, dataColumnCount);
+          dataColumnsString = getColumns(allColumnNameList, 0, dataColumnCount, ',');
+          dataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount);
+
+          // Add the table (non-partitioned) columns and types into the map as not needing
+          // conversion (i.e. null).
+          conversionMap.put(
+              new ImmutablePair(dataColumnsString, dataTypeInfoList), null);
+
+          isFirst = false;
+        }
+
+        ImmutablePair columnNamesAndTypesCombination =
+            new ImmutablePair(nextDataColumnsString, nextDataTypeInfoList);
+
+        boolean[] conversionFlags;
+        if (conversionMap.containsKey(columnNamesAndTypesCombination)) {
+
+          conversionFlags = conversionMap.get(columnNamesAndTypesCombination);
+
+        } else {
+
+          List<String> nextDataColumnList = Arrays.asList(nextDataColumns);
+
+          // Validate the column names that are present are the same.  Missing columns will be
+          // implicitly defaulted to null.
+
+          if (nextDataColumnList.size() > dataColumnList.size()) {
+            LOG.info(
+                String.format("Could not vectorize partition %s.  The partition column names %d is greater than the number of table columns %d",
+                    path, nextDataColumnList.size(), dataColumnList.size()));
+            return false;
+          }
+          for (int i = 0; i < nextDataColumnList.size(); i++) {
+            String nextColumnName = nextDataColumnList.get(i);
+            String tableColumnName = dataColumnList.get(i);
+            if (!nextColumnName.equals(tableColumnName)) {
+              LOG.info(
+                  String.format("Could not vectorize partition %s.  The partition column name %s is does not match table column name %s",
+                      path, nextColumnName, tableColumnName));
+              return false;
+            }
+          }
+
+          // The table column types might have been changed with ALTER.  There are restrictions
+          // here for vectorization.
+
+          // Some readers / deserializers take responsibility for conversion themselves.
+
+          // If we need to check for conversion, the conversion object may come back null
+          // indicating from a vectorization point of view the conversion is implicit.  That is,
+          // all implicit integer upgrades.
+
+          if (vectorPartDesc.getNeedsDataTypeConversionCheck() &&
+             !nextDataTypeInfoList.equals(dataTypeInfoList)) {
+
+             // The results will be in 2 members: validConversion and conversionFlags
+            partitionConversion.validateConversion(nextDataTypeInfoList, dataTypeInfoList);
+             if (!partitionConversion.getValidConversion()) {
+               return false;
+             }
+             conversionFlags = partitionConversion.getResultConversionFlags();
+           } else {
+            conversionFlags = null;
+          }
+
+          // We enter this in our map so we don't have to check again for subsequent partitions.
+
+          conversionMap.put(columnNamesAndTypesCombination, conversionFlags);
+        }
+
+        vectorPartDesc.setConversionFlags(conversionFlags);
+
+        vectorPartDesc.setTypeInfos(nextDataTypeInfoList);
+      }
+
+      vectorTaskColumnInfo.setColumnNames(allColumnNameList);
+      vectorTaskColumnInfo.setTypeInfos(allTypeInfoList);
+      vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount);
+
+      return true;
+    }
+
+    private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez)
+            throws SemanticException {
+
+      LOG.info("Validating MapWork...");
+
+      ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork);
+      if (pair ==  null) {
+        return false;
+      }
+      String alias = pair.left;
+      TableScanOperator tableScanOperator = pair.right;
+
+      // This call fills in the column names, types, and partition column count in
+      // vectorTaskColumnInfo.
+      if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) {
+        return false;
       }
+
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
       addMapWorkRules(opRules, vnp);
@@ -427,11 +695,14 @@ public class Vectorizer implements PhysicalPlanResolver {
       return true;
     }
 
-    private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+    private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
+            boolean isTez) throws SemanticException {
+
       LOG.info("Vectorizing MapWork...");
       mapWork.setVectorMode(true);
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez);
+      MapWorkVectorizationNodeProcessor vnp =
+          new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo);
       addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
@@ -441,9 +712,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
       ogw.startWalking(topNodes, nodeOutput);
 
-      mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
-      mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
-      mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+      vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+      vectorTaskColumnInfo.transferToBaseWork(mapWork);
 
       if (LOG.isDebugEnabled()) {
         debugDisplayAllMaps(mapWork);
@@ -453,13 +724,19 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
-      boolean ret = validateReduceWork(reduceWork);
+      VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+      boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
       if (ret) {
-        vectorizeReduceWork(reduceWork, isTez);
+        vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
       }
     }
 
-    private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+    private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork,
+            VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
+
+      ArrayList<String> reduceColumnNames = new ArrayList<String>();
+      ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
+
       try {
         // Check key ObjectInspector.
         ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
@@ -483,9 +760,6 @@ public class Vectorizer implements PhysicalPlanResolver {
         StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
         List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
 
-        reduceColumnNames = new ArrayList<String>();
-        reduceTypeInfos = new ArrayList<TypeInfo>();
-
         for (StructField field: keyFields) {
           reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
           reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
@@ -497,6 +771,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       } catch (Exception e) {
         throw new SemanticException(e);
       }
+
+      vectorTaskColumnInfo.setColumnNames(reduceColumnNames);
+      vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos);
+
       return true;
     }
 
@@ -505,11 +783,13 @@ public class Vectorizer implements PhysicalPlanResolver {
       opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
     }
 
-    private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+    private boolean validateReduceWork(ReduceWork reduceWork,
+        VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
       LOG.info("Validating ReduceWork...");
 
       // Validate input to ReduceWork.
-      if (!getOnlyStructObjectInspectors(reduceWork)) {
+      if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
         return false;
       }
       // Now check the reduce operator tree.
@@ -533,7 +813,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       return true;
     }
 
-    private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+    private void vectorizeReduceWork(ReduceWork reduceWork,
+        VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
       LOG.info("Vectorizing ReduceWork...");
       reduceWork.setVectorMode(true);
 
@@ -542,7 +824,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       ReduceWorkVectorizationNodeProcessor vnp =
-              new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez);
+              new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez);
       addReduceWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
@@ -557,9 +839,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       // Necessary since we are vectorizing the root operator in reduce.
       reduceWork.setReducer(vnp.getRootVectorOp());
 
-      reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
-      reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
-      reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+      vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+      vectorTaskColumnInfo.transferToBaseWork(reduceWork);
 
       if (LOG.isDebugEnabled()) {
         debugDisplayAllMaps(reduceWork);
@@ -627,23 +909,11 @@ public class Vectorizer implements PhysicalPlanResolver {
     // The vectorization context for the Map or Reduce task.
     protected VectorizationContext taskVectorizationContext;
 
-    // The input projection column type name map for the Map or Reduce task.
-    protected Map<Integer, String> taskColumnTypeNameMap;
-
     VectorizationNodeProcessor() {
-      taskColumnTypeNameMap = new HashMap<Integer, String>();
-    }
-
-    public Map<String, Integer> getVectorColumnNameMap() {
-      return taskVectorizationContext.getProjectionColumnMap();
     }
 
-    public Map<Integer, String> getVectorColumnTypeMap() {
-      return taskColumnTypeNameMap;
-    }
-
-    public Map<Integer, String> getVectorScratchColumnTypeMap() {
-      return taskVectorizationContext.getScratchColumnTypeMap();
+    public String[] getVectorScratchColumnTypeNames() {
+      return taskVectorizationContext.getScratchColumnTypeNames();
     }
 
     protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -713,11 +983,14 @@ public class Vectorizer implements PhysicalPlanResolver {
   class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
 
     private final MapWork mWork;
+    private VectorTaskColumnInfo vectorTaskColumnInfo;
     private final boolean isTez;
 
-    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
+    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+        VectorTaskColumnInfo vectorTaskColumnInfo) {
       super();
       this.mWork = mWork;
+      this.vectorTaskColumnInfo = vectorTaskColumnInfo;
       this.isTez = isTez;
     }
 
@@ -731,8 +1004,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       if (op instanceof TableScanOperator) {
         if (taskVectorizationContext == null) {
-          taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(),
-                  taskColumnTypeNameMap);
+          taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
         }
         vContext = taskVectorizationContext;
       } else {
@@ -773,8 +1045,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
 
-    private final List<String> reduceColumnNames;
-    private final List<TypeInfo> reduceTypeInfos;
+    private VectorTaskColumnInfo vectorTaskColumnInfo;
 
     private boolean isTez;
 
@@ -784,11 +1055,11 @@ public class Vectorizer implements PhysicalPlanResolver {
       return rootVectorOp;
     }
 
-    public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
-            List<TypeInfo> reduceTypeInfos, boolean isTez) {
+    public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
+            boolean isTez) {
+
       super();
-      this.reduceColumnNames =  reduceColumnNames;
-      this.reduceTypeInfos = reduceTypeInfos;
+      this.vectorTaskColumnInfo =  vectorTaskColumnInfo;
       rootVectorOp = null;
       this.isTez = isTez;
     }
@@ -804,15 +1075,11 @@ public class Vectorizer implements PhysicalPlanResolver {
       boolean saveRootVectorOp = false;
 
       if (op.getParentOperators().size() == 0) {
-        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString());
 
-        vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames);
+        vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.columnNames);
         taskVectorizationContext = vContext;
-        int i = 0;
-        for (TypeInfo typeInfo : reduceTypeInfos) {
-          taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
-          i++;
-        }
+
         saveRootVectorOp = true;
 
         if (LOG.isDebugEnabled()) {
@@ -881,7 +1148,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   @Override
   public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
-    this.physicalContext  = physicalContext;
+
     hiveConf = physicalContext.getConf();
 
     boolean vectorPath = HiveConf.getBoolVar(hiveConf,
@@ -1022,65 +1289,6 @@ public class Vectorizer implements PhysicalPlanResolver {
       return false;
     }
 
-    String columns = "";
-    String types = "";
-    String partitionColumns = "";
-    String partitionTypes = "";
-    boolean haveInfo = false;
-
-    // This over-reaches slightly, since we can have > 1 table-scan  per map-work.
-    // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate.
-    // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later.
-    LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo();
-
-    // For vectorization, compare each partition information for against the others.
-    // We assume the table information will be from one of the partitions, so it will
-    // work to focus on the partition information and not compare against the TableScanOperator
-    // columns (in the VectorizationContext)....
-    for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
-      PartitionDesc partDesc = entry.getValue();
-      if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
-        // No partition information -- we match because we would default to using the table description.
-        continue;
-      }
-      Properties partProps = partDesc.getProperties();
-      if (!haveInfo) {
-        columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
-        types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
-        partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-        partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
-        haveInfo = true;
-      } else {
-        String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
-        String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
-        String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-        String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
-        if (!columns.equalsIgnoreCase(nextColumns)) {
-          LOG.info(
-                  String.format("Could not vectorize partition %s.  Its column names %s do not match the other column names %s",
-                            entry.getKey(), nextColumns, columns));
-          return false;
-        }
-        if (!types.equalsIgnoreCase(nextTypes)) {
-          LOG.info(
-                  String.format("Could not vectorize partition %s.  Its column types %s do not match the other column types %s",
-                              entry.getKey(), nextTypes, types));
-          return false;
-        }
-        if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
-          LOG.info(
-                 String.format("Could not vectorize partition %s.  Its partition column names %s do not match the other partition column names %s",
-                              entry.getKey(), nextPartitionColumns, partitionColumns));
-          return false;
-        }
-        if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
-          LOG.info(
-                 String.format("Could not vectorize partition %s.  Its partition column types %s do not match the other partition column types %s",
-                                entry.getKey(), nextPartitionTypes, partitionTypes));
-          return false;
-        }
-      }
-    }
     return true;
   }
 
@@ -1412,23 +1620,10 @@ public class Vectorizer implements PhysicalPlanResolver {
     return result;
   }
 
-  private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName,
-    Map<Integer, String> typeNameMap) {
+  private VectorizationContext getVectorizationContext(String contextName,
+      VectorTaskColumnInfo vectorTaskColumnInfo) {
 
-    VectorizationContext vContext = new VectorizationContext(contextName);
-
-    // Add all non-virtual columns to make a vectorization context for
-    // the TableScan operator.
-    int i = 0;
-    for (ColumnInfo c : rowSchema.getSignature()) {
-      // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
-      if (!isVirtualColumn(c)) {
-        vContext.addInitialColumn(c.getInternalName());
-        typeNameMap.put(i, c.getTypeName());
-        i++;
-      }
-    }
-    vContext.finishedAddingInitialColumns();
+    VectorizationContext vContext = new VectorizationContext(contextName, vectorTaskColumnInfo.columnNames);
 
     return vContext;
   }
@@ -1785,12 +1980,16 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   public void debugDisplayAllMaps(BaseWork work) {
 
-    Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
-    Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
-    Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap();
+    VectorizedRowBatchCtx vectorizedRowBatchCtx = work.getVectorizedRowBatchCtx();
+
+    String[] columnNames = vectorizedRowBatchCtx.getRowColumnNames();
+    Object columnTypeInfos = vectorizedRowBatchCtx.getRowColumnTypeInfos();
+    int partitionColumnCount = vectorizedRowBatchCtx.getPartitionColumnCount();
+    String[] scratchColumnTypeNames =vectorizedRowBatchCtx.getScratchColumnTypeNames();
 
-    LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
-    LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
-    LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString());
+    LOG.debug("debugDisplayAllMaps columnNames " + Arrays.toString(columnNames));
+    LOG.debug("debugDisplayAllMaps columnTypeInfos " + Arrays.deepToString((Object[]) columnTypeInfos));
+    LOG.debug("debugDisplayAllMaps partitionColumnCount " + partitionColumnCount);
+    LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " + Arrays.toString(scratchColumnTypeNames));
   }
 }