You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:04 UTC

[22/34] hive git commit: HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
index b654b64..aabea0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
@@ -17,11 +17,29 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+
+import com.google.common.collect.Lists;
 
 public class OrcUtils {
   private static final Logger LOG = LoggerFactory.getLogger(OrcUtils.class);
@@ -79,4 +97,533 @@ public class OrcUtils {
     }
     return null;
   }
+
+  /**
+   * Convert a Hive type property string that contains separated type names into a list of
+   * TypeDescription objects.
+   * @return the list of TypeDescription objects.
+   */
+  public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+      String hiveTypeProperty) {
+
+    // CONSDIER: We need a type name parser for TypeDescription.
+
+    ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty);
+    ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
+    for (TypeInfo typeInfo : typeInfoList) {
+      typeDescrList.add(convertTypeInfo(typeInfo));
+    }
+    return typeDescrList;
+  }
+
+  public static TypeDescription convertTypeInfo(TypeInfo info) {
+    switch (info.getCategory()) {
+      case PRIMITIVE: {
+        PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+        switch (pinfo.getPrimitiveCategory()) {
+          case BOOLEAN:
+            return TypeDescription.createBoolean();
+          case BYTE:
+            return TypeDescription.createByte();
+          case SHORT:
+            return TypeDescription.createShort();
+          case INT:
+            return TypeDescription.createInt();
+          case LONG:
+            return TypeDescription.createLong();
+          case FLOAT:
+            return TypeDescription.createFloat();
+          case DOUBLE:
+            return TypeDescription.createDouble();
+          case STRING:
+            return TypeDescription.createString();
+          case DATE:
+            return TypeDescription.createDate();
+          case TIMESTAMP:
+            return TypeDescription.createTimestamp();
+          case BINARY:
+            return TypeDescription.createBinary();
+          case DECIMAL: {
+            DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+            return TypeDescription.createDecimal()
+                .withScale(dinfo.getScale())
+                .withPrecision(dinfo.getPrecision());
+          }
+          case VARCHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createVarchar()
+                .withMaxLength(cinfo.getLength());
+          }
+          case CHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createChar()
+                .withMaxLength(cinfo.getLength());
+          }
+          default:
+            throw new IllegalArgumentException("ORC doesn't handle primitive" +
+                " category " + pinfo.getPrimitiveCategory());
+        }
+      }
+      case LIST: {
+        ListTypeInfo linfo = (ListTypeInfo) info;
+        return TypeDescription.createList
+            (convertTypeInfo(linfo.getListElementTypeInfo()));
+      }
+      case MAP: {
+        MapTypeInfo minfo = (MapTypeInfo) info;
+        return TypeDescription.createMap
+            (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+                convertTypeInfo(minfo.getMapValueTypeInfo()));
+      }
+      case UNION: {
+        UnionTypeInfo minfo = (UnionTypeInfo) info;
+        TypeDescription result = TypeDescription.createUnion();
+        for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+          result.addUnionChild(convertTypeInfo(child));
+        }
+        return result;
+      }
+      case STRUCT: {
+        StructTypeInfo sinfo = (StructTypeInfo) info;
+        TypeDescription result = TypeDescription.createStruct();
+        for(String fieldName: sinfo.getAllStructFieldNames()) {
+          result.addField(fieldName,
+              convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+        }
+        return result;
+      }
+      default:
+        throw new IllegalArgumentException("ORC doesn't handle " +
+            info.getCategory());
+    }
+  }
+
+  public static List<OrcProto.Type> getOrcTypes(TypeDescription typeDescr) {
+    List<OrcProto.Type> result = Lists.newArrayList();
+    appendOrcTypes(result, typeDescr);
+    return result;
+  }
+
+  private static void appendOrcTypes(List<OrcProto.Type> result, TypeDescription typeDescr) {
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    List<TypeDescription> children = typeDescr.getChildren();
+    switch (typeDescr.getCategory()) {
+    case BOOLEAN:
+      type.setKind(OrcProto.Type.Kind.BOOLEAN);
+      break;
+    case BYTE:
+      type.setKind(OrcProto.Type.Kind.BYTE);
+      break;
+    case SHORT:
+      type.setKind(OrcProto.Type.Kind.SHORT);
+      break;
+    case INT:
+      type.setKind(OrcProto.Type.Kind.INT);
+      break;
+    case LONG:
+      type.setKind(OrcProto.Type.Kind.LONG);
+      break;
+    case FLOAT:
+      type.setKind(OrcProto.Type.Kind.FLOAT);
+      break;
+    case DOUBLE:
+      type.setKind(OrcProto.Type.Kind.DOUBLE);
+      break;
+    case STRING:
+      type.setKind(OrcProto.Type.Kind.STRING);
+      break;
+    case CHAR:
+      type.setKind(OrcProto.Type.Kind.CHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case VARCHAR:
+      type.setKind(Type.Kind.VARCHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case BINARY:
+      type.setKind(OrcProto.Type.Kind.BINARY);
+      break;
+    case TIMESTAMP:
+      type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+      break;
+    case DATE:
+      type.setKind(OrcProto.Type.Kind.DATE);
+      break;
+    case DECIMAL:
+      type.setKind(OrcProto.Type.Kind.DECIMAL);
+      type.setPrecision(typeDescr.getPrecision());
+      type.setScale(typeDescr.getScale());
+      break;
+    case LIST:
+      type.setKind(OrcProto.Type.Kind.LIST);
+      type.addSubtypes(children.get(0).getId());
+      break;
+    case MAP:
+      type.setKind(OrcProto.Type.Kind.MAP);
+      for(TypeDescription t: children) {
+        type.addSubtypes(t.getId());
+      }
+      break;
+    case STRUCT:
+      type.setKind(OrcProto.Type.Kind.STRUCT);
+      for(TypeDescription t: children) {
+        type.addSubtypes(t.getId());
+      }
+      for(String field: typeDescr.getFieldNames()) {
+        type.addFieldNames(field);
+      }
+      break;
+    case UNION:
+      type.setKind(OrcProto.Type.Kind.UNION);
+      for(TypeDescription t: children) {
+        type.addSubtypes(t.getId());
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown category: " +
+          typeDescr.getCategory());
+    }
+    result.add(type.build());
+    if (children != null) {
+      for(TypeDescription child: children) {
+        appendOrcTypes(result, child);
+      }
+    }
+  }
+
+  /**
+   * NOTE: This method ignores the subtype numbers in the TypeDescription rebuilds the subtype
+   * numbers based on the length of the result list being appended.
+   *
+   * @param result
+   * @param typeInfo
+   */
+  public static void appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+      TypeDescription typeDescr) {
+
+    int subtype = result.size();
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    boolean needsAdd = true;
+    List<TypeDescription> children = typeDescr.getChildren();
+    switch (typeDescr.getCategory()) {
+    case BOOLEAN:
+      type.setKind(OrcProto.Type.Kind.BOOLEAN);
+      break;
+    case BYTE:
+      type.setKind(OrcProto.Type.Kind.BYTE);
+      break;
+    case SHORT:
+      type.setKind(OrcProto.Type.Kind.SHORT);
+      break;
+    case INT:
+      type.setKind(OrcProto.Type.Kind.INT);
+      break;
+    case LONG:
+      type.setKind(OrcProto.Type.Kind.LONG);
+      break;
+    case FLOAT:
+      type.setKind(OrcProto.Type.Kind.FLOAT);
+      break;
+    case DOUBLE:
+      type.setKind(OrcProto.Type.Kind.DOUBLE);
+      break;
+    case STRING:
+      type.setKind(OrcProto.Type.Kind.STRING);
+      break;
+    case CHAR:
+      type.setKind(OrcProto.Type.Kind.CHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case VARCHAR:
+      type.setKind(Type.Kind.VARCHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case BINARY:
+      type.setKind(OrcProto.Type.Kind.BINARY);
+      break;
+    case TIMESTAMP:
+      type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+      break;
+    case DATE:
+      type.setKind(OrcProto.Type.Kind.DATE);
+      break;
+    case DECIMAL:
+      type.setKind(OrcProto.Type.Kind.DECIMAL);
+      type.setPrecision(typeDescr.getPrecision());
+      type.setScale(typeDescr.getScale());
+      break;
+    case LIST:
+      type.setKind(OrcProto.Type.Kind.LIST);
+      type.addSubtypes(++subtype);
+      result.add(type.build());
+      needsAdd = false;
+      appendOrcTypesRebuildSubtypes(result, children.get(0));
+      break;
+    case MAP:
+      {
+        // Make room for MAP type.
+        result.add(null);
+  
+        // Add MAP type pair in order to determine their subtype values.
+        appendOrcTypesRebuildSubtypes(result, children.get(0));
+        int subtype2 = result.size();
+        appendOrcTypesRebuildSubtypes(result, children.get(1));
+        type.setKind(OrcProto.Type.Kind.MAP);
+        type.addSubtypes(subtype + 1);
+        type.addSubtypes(subtype2);
+        result.set(subtype, type.build());
+        needsAdd = false;
+      }
+      break;
+    case STRUCT:
+      {
+        List<String> fieldNames = typeDescr.getFieldNames();
+
+        // Make room for STRUCT type.
+        result.add(null);
+
+        List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+        for(TypeDescription child: children) {
+          int fieldSubtype = result.size();
+          fieldSubtypes.add(fieldSubtype);
+          appendOrcTypesRebuildSubtypes(result, child);
+        }
+
+        type.setKind(OrcProto.Type.Kind.STRUCT);
+
+        for (int i = 0 ; i < fieldNames.size(); i++) {
+          type.addSubtypes(fieldSubtypes.get(i));
+          type.addFieldNames(fieldNames.get(i));
+        }
+        result.set(subtype, type.build());
+        needsAdd = false;
+      }
+      break;
+    case UNION:
+      {
+        // Make room for UNION type.
+        result.add(null);
+
+        List<Integer> unionSubtypes = new ArrayList<Integer>(children.size());
+        for(TypeDescription child: children) {
+          int unionSubtype = result.size();
+          unionSubtypes.add(unionSubtype);
+          appendOrcTypesRebuildSubtypes(result, child);
+        }
+
+        type.setKind(OrcProto.Type.Kind.UNION);
+        for (int i = 0 ; i < children.size(); i++) {
+          type.addSubtypes(unionSubtypes.get(i));
+        }
+        result.set(subtype, type.build());
+        needsAdd = false;
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown category: " + typeDescr.getCategory());
+    }
+    if (needsAdd) {
+      result.add(type.build());
+    }
+  }
+
+  /**
+   * NOTE: This method ignores the subtype numbers in the OrcProto.Type rebuilds the subtype
+   * numbers based on the length of the result list being appended.
+   *
+   * @param result
+   * @param typeInfo
+   */
+  public static int appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+      List<OrcProto.Type> types, int columnId) {
+
+    OrcProto.Type oldType = types.get(columnId++);
+
+    int subtype = result.size();
+    OrcProto.Type.Builder builder = OrcProto.Type.newBuilder();
+    boolean needsAdd = true;
+    switch (oldType.getKind()) {
+    case BOOLEAN:
+      builder.setKind(OrcProto.Type.Kind.BOOLEAN);
+      break;
+    case BYTE:
+      builder.setKind(OrcProto.Type.Kind.BYTE);
+      break;
+    case SHORT:
+      builder.setKind(OrcProto.Type.Kind.SHORT);
+      break;
+    case INT:
+      builder.setKind(OrcProto.Type.Kind.INT);
+      break;
+    case LONG:
+      builder.setKind(OrcProto.Type.Kind.LONG);
+      break;
+    case FLOAT:
+      builder.setKind(OrcProto.Type.Kind.FLOAT);
+      break;
+    case DOUBLE:
+      builder.setKind(OrcProto.Type.Kind.DOUBLE);
+      break;
+    case STRING:
+      builder.setKind(OrcProto.Type.Kind.STRING);
+      break;
+    case CHAR:
+      builder.setKind(OrcProto.Type.Kind.CHAR);
+      builder.setMaximumLength(oldType.getMaximumLength());
+      break;
+    case VARCHAR:
+      builder.setKind(Type.Kind.VARCHAR);
+      builder.setMaximumLength(oldType.getMaximumLength());
+      break;
+    case BINARY:
+      builder.setKind(OrcProto.Type.Kind.BINARY);
+      break;
+    case TIMESTAMP:
+      builder.setKind(OrcProto.Type.Kind.TIMESTAMP);
+      break;
+    case DATE:
+      builder.setKind(OrcProto.Type.Kind.DATE);
+      break;
+    case DECIMAL:
+      builder.setKind(OrcProto.Type.Kind.DECIMAL);
+      builder.setPrecision(oldType.getPrecision());
+      builder.setScale(oldType.getScale());
+      break;
+    case LIST:
+      builder.setKind(OrcProto.Type.Kind.LIST);
+      builder.addSubtypes(++subtype);
+      result.add(builder.build());
+      needsAdd = false;
+      columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+      break;
+    case MAP:
+      {
+        // Make room for MAP type.
+        result.add(null);
+  
+        // Add MAP type pair in order to determine their subtype values.
+        columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        int subtype2 = result.size();
+        columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        builder.setKind(OrcProto.Type.Kind.MAP);
+        builder.addSubtypes(subtype + 1);
+        builder.addSubtypes(subtype2);
+        result.set(subtype, builder.build());
+        needsAdd = false;
+      }
+      break;
+    case STRUCT:
+      {
+        List<String> fieldNames = oldType.getFieldNamesList();
+
+        // Make room for STRUCT type.
+        result.add(null);
+
+        List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+        for(int i = 0 ; i < fieldNames.size(); i++) {
+          int fieldSubtype = result.size();
+          fieldSubtypes.add(fieldSubtype);
+          columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        }
+
+        builder.setKind(OrcProto.Type.Kind.STRUCT);
+
+        for (int i = 0 ; i < fieldNames.size(); i++) {
+          builder.addSubtypes(fieldSubtypes.get(i));
+          builder.addFieldNames(fieldNames.get(i));
+        }
+        result.set(subtype, builder.build());
+        needsAdd = false;
+      }
+      break;
+    case UNION:
+      {
+        int subtypeCount = oldType.getSubtypesCount();
+
+        // Make room for UNION type.
+        result.add(null);
+
+        List<Integer> unionSubtypes = new ArrayList<Integer>(subtypeCount);
+        for(int i = 0 ; i < subtypeCount; i++) {
+          int unionSubtype = result.size();
+          unionSubtypes.add(unionSubtype);
+          columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        }
+
+        builder.setKind(OrcProto.Type.Kind.UNION);
+        for (int i = 0 ; i < subtypeCount; i++) {
+          builder.addSubtypes(unionSubtypes.get(i));
+        }
+        result.set(subtype, builder.build());
+        needsAdd = false;
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown category: " + oldType.getKind());
+    }
+    if (needsAdd) {
+      result.add(builder.build());
+    }
+    return columnId;
+  }
+
+  public static TypeDescription getDesiredRowTypeDescr(Configuration conf) {
+
+    String columnNameProperty = null;
+    String columnTypeProperty = null;
+
+    ArrayList<String> schemaEvolutionColumnNames = null;
+    ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null;
+
+    boolean haveSchemaEvolutionProperties = false;
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+
+      columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+      columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+
+      haveSchemaEvolutionProperties =
+          (columnNameProperty != null && columnTypeProperty != null);
+
+      if (haveSchemaEvolutionProperties) {
+        schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+        if (schemaEvolutionColumnNames.size() == 0) {
+          haveSchemaEvolutionProperties = false;
+        } else {
+          schemaEvolutionTypeDescrs =
+              OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+          if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+            haveSchemaEvolutionProperties = false;
+          }
+        }
+      }
+    }
+
+    if (!haveSchemaEvolutionProperties) {
+
+      // Try regular properties;
+      columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+      columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+      if (columnTypeProperty == null || columnNameProperty == null) {
+        return null;
+      }
+
+      schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+      if (schemaEvolutionColumnNames.size() == 0) {
+        return null;
+      }
+      schemaEvolutionTypeDescrs =
+          OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+      if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+        return null;
+      }
+    }
+
+    // Desired schema does not include virtual columns or partition columns.
+    TypeDescription result = TypeDescription.createStruct();
+    for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+      result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
+    }
+
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index cf81782..750cf8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
@@ -145,6 +146,7 @@ public interface Reader {
     private boolean[] include;
     private long offset = 0;
     private long length = Long.MAX_VALUE;
+    private TypeDescription schema;
     private SearchArgument sarg = null;
     private String[] columnNames = null;
     private Boolean useZeroCopy = null;
@@ -173,6 +175,14 @@ public interface Reader {
     }
 
     /**
+     * Set the schema on read type description.
+     */
+    public Options schema(TypeDescription schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    /**
      * Set search argument for predicate push down.
      * @param sarg the search argument
      * @param columnNames the column names for
@@ -216,6 +226,10 @@ public interface Reader {
       return length;
     }
 
+    public TypeDescription getSchema() {
+      return schema;
+    }
+
     public SearchArgument getSearchArgument() {
       return sarg;
     }
@@ -245,6 +259,7 @@ public interface Reader {
       result.include = include;
       result.offset = offset;
       result.length = length;
+      result.schema = schema;
       result.sarg = sarg;
       result.columnNames = columnNames;
       result.useZeroCopy = useZeroCopy;

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
deleted file mode 100644
index 5e7d636..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-
-import com.google.common.collect.Lists;
-
-/**
- * Factory to create ORC tree readers. It also compares file schema with schema specified on read
- * to see if type promotions are possible.
- */
-public class RecordReaderFactory {
-  static final Logger LOG = LoggerFactory.getLogger(RecordReaderFactory.class);
-  private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
-
-  public static TreeReaderFactory.TreeReader createTreeReader(int colId,
-      Configuration conf,
-      List<OrcProto.Type> fileSchema,
-      boolean[] included,
-      boolean skipCorrupt) throws IOException {
-    final boolean isAcid = checkAcidSchema(fileSchema);
-    final List<OrcProto.Type> originalFileSchema;
-    if (isAcid) {
-      originalFileSchema = fileSchema.subList(fileSchema.get(0).getSubtypesCount(),
-          fileSchema.size());
-    } else {
-      originalFileSchema = fileSchema;
-    }
-    final int numCols = originalFileSchema.get(0).getSubtypesCount();
-    List<OrcProto.Type> schemaOnRead = getSchemaOnRead(numCols, conf);
-    List<OrcProto.Type> schemaUsed = getMatchingSchema(fileSchema, schemaOnRead);
-    if (schemaUsed == null) {
-      return TreeReaderFactory.createTreeReader(colId, fileSchema, included, skipCorrupt);
-    } else {
-      return ConversionTreeReaderFactory.createTreeReader(colId, schemaUsed, included, skipCorrupt);
-    }
-  }
-
-  static List<String> getAcidEventFields() {
-    return Lists.newArrayList("operation", "originalTransaction", "bucket",
-        "rowId", "currentTransaction", "row");
-  }
-
-  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
-    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
-      List<String> acidFields = getAcidEventFields();
-      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
-      if (acidFields.equals(rootFields)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private static List<OrcProto.Type> getMatchingSchema(List<OrcProto.Type> fileSchema,
-      List<OrcProto.Type> schemaOnRead) {
-    if (schemaOnRead == null) {
-      if (isLogInfoEnabled) {
-        LOG.info("Schema is not specified on read. Using file schema.");
-      }
-      return null;
-    }
-
-    if (fileSchema.size() != schemaOnRead.size()) {
-      if (isLogInfoEnabled) {
-        LOG.info("Schema on read column count does not match file schema's column count." +
-            " Falling back to using file schema.");
-      }
-      return null;
-    } else {
-      List<OrcProto.Type> result = Lists.newArrayList(fileSchema);
-      // check type promotion. ORC can only support type promotions for integer types
-      // short -> int -> bigint as same integer readers are used for the above types.
-      boolean canPromoteType = false;
-      for (int i = 0; i < fileSchema.size(); i++) {
-        OrcProto.Type fColType = fileSchema.get(i);
-        OrcProto.Type rColType = schemaOnRead.get(i);
-        if (!fColType.getKind().equals(rColType.getKind())) {
-
-          if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
-
-            if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
-                rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-              // type promotion possible, converting SHORT to INT/LONG requested type
-              result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
-              canPromoteType = true;
-            } else {
-              canPromoteType = false;
-            }
-
-          } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
-
-            if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-              // type promotion possible, converting INT to LONG requested type
-              result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
-              canPromoteType = true;
-            } else {
-              canPromoteType = false;
-            }
-
-          } else {
-            canPromoteType = false;
-          }
-        }
-      }
-
-      if (canPromoteType) {
-        if (isLogInfoEnabled) {
-          LOG.info("Integer type promotion happened in ORC record reader. Using promoted schema.");
-        }
-        return result;
-      }
-    }
-
-    return null;
-  }
-
-  private static List<OrcProto.Type> getSchemaOnRead(int numCols, Configuration conf) {
-    String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
-    final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
-    if (columnTypeProperty == null || columnNameProperty == null) {
-      return null;
-    }
-
-    ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(","));
-    ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
-    StructTypeInfo structTypeInfo = new StructTypeInfo();
-    // Column types from conf includes virtual and partition columns at the end. We consider only
-    // the actual columns in the file.
-    structTypeInfo.setAllStructFieldNames(Lists.newArrayList(columnNames.subList(0, numCols)));
-    structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(fieldTypes.subList(0, numCols)));
-    ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(structTypeInfo);
-    return getOrcTypes(oi);
-  }
-
-  private static List<OrcProto.Type> getOrcTypes(ObjectInspector inspector) {
-    List<OrcProto.Type> result = Lists.newArrayList();
-    getOrcTypesImpl(result, inspector);
-    return result;
-  }
-
-  private static void getOrcTypesImpl(List<OrcProto.Type> result, ObjectInspector inspector) {
-    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
-          case BOOLEAN:
-            type.setKind(OrcProto.Type.Kind.BOOLEAN);
-            break;
-          case BYTE:
-            type.setKind(OrcProto.Type.Kind.BYTE);
-            break;
-          case SHORT:
-            type.setKind(OrcProto.Type.Kind.SHORT);
-            break;
-          case INT:
-            type.setKind(OrcProto.Type.Kind.INT);
-            break;
-          case LONG:
-            type.setKind(OrcProto.Type.Kind.LONG);
-            break;
-          case FLOAT:
-            type.setKind(OrcProto.Type.Kind.FLOAT);
-            break;
-          case DOUBLE:
-            type.setKind(OrcProto.Type.Kind.DOUBLE);
-            break;
-          case STRING:
-            type.setKind(OrcProto.Type.Kind.STRING);
-            break;
-          case CHAR:
-            // The char length needs to be written to file and should be available
-            // from the object inspector
-            CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) inspector)
-                .getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.CHAR);
-            type.setMaximumLength(charTypeInfo.getLength());
-            break;
-          case VARCHAR:
-            // The varchar length needs to be written to file and should be available
-            // from the object inspector
-            VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) inspector)
-                .getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.VARCHAR);
-            type.setMaximumLength(typeInfo.getLength());
-            break;
-          case BINARY:
-            type.setKind(OrcProto.Type.Kind.BINARY);
-            break;
-          case TIMESTAMP:
-            type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-            break;
-          case DATE:
-            type.setKind(OrcProto.Type.Kind.DATE);
-            break;
-          case DECIMAL:
-            DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) inspector)
-                .getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.DECIMAL);
-            type.setPrecision(decTypeInfo.precision());
-            type.setScale(decTypeInfo.scale());
-            break;
-          default:
-            throw new IllegalArgumentException("Unknown primitive category: " +
-                ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
-        }
-        result.add(type.build());
-        break;
-      case LIST:
-        type.setKind(OrcProto.Type.Kind.LIST);
-        result.add(type.build());
-        getOrcTypesImpl(result, ((ListObjectInspector) inspector).getListElementObjectInspector());
-        break;
-      case MAP:
-        type.setKind(OrcProto.Type.Kind.MAP);
-        result.add(type.build());
-        getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapKeyObjectInspector());
-        getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapValueObjectInspector());
-        break;
-      case STRUCT:
-        type.setKind(OrcProto.Type.Kind.STRUCT);
-        result.add(type.build());
-        for (StructField field : ((StructObjectInspector) inspector).getAllStructFieldRefs()) {
-          getOrcTypesImpl(result, field.getFieldObjectInspector());
-        }
-        break;
-      case UNION:
-        type.setKind(OrcProto.Type.Kind.UNION);
-        result.add(type.build());
-        for (ObjectInspector oi : ((UnionObjectInspector) inspector).getObjectInspectors()) {
-          getOrcTypesImpl(result, oi);
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown category: " + inspector.getCategory());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 84d627a..7f550a4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -153,6 +155,18 @@ public class RecordReaderImpl implements RecordReader {
                              long strideRate,
                              Configuration conf
                              ) throws IOException {
+
+    TreeReaderSchema treeReaderSchema;
+    if (options.getSchema() == null) {
+      treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types);
+    } else {
+
+      // Now that we are creating a record reader for a file, validate that the schema to read
+      // is compatible with the file schema.
+      //
+      List<Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
+      treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
+    }
     this.path = path;
     this.codec = codec;
     this.types = types;
@@ -197,7 +211,7 @@ public class RecordReaderImpl implements RecordReader {
       skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
     }
 
-    reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt);
+    reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
@@ -1101,6 +1115,7 @@ public class RecordReaderImpl implements RecordReader {
       } else {
         result = (VectorizedRowBatch) previous;
         result.selectedInUse = false;
+        reader.setVectorColumnCount(result.getDataColumnCount());
         reader.nextVector(result.cols, (int) batchSize);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
new file mode 100644
index 0000000..9d00eb2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+
+  private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
+
+  public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
+      List<OrcProto.Type> schemaTypes) throws IOException {
+
+    // For ACID, the row is the ROW field in the outer STRUCT.
+    final boolean isAcid = checkAcidSchema(fileTypes);
+    final List<OrcProto.Type> rowSchema;
+    int rowSubtype;
+    if (isAcid) {
+      rowSubtype = OrcRecordUpdater.ROW + 1;
+      rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+    } else {
+      rowSubtype = 0;
+      rowSchema = fileTypes;
+    }
+
+    // Do checking on the overlap.  Additional columns will be defaulted to NULL.
+
+    int numFileColumns = rowSchema.get(0).getSubtypesCount();
+    int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
+
+    int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
+
+    /**
+     * Check type promotion.
+     *
+     * Currently, we only support integer type promotions that can be done "implicitly".
+     * That is, we know that using a bigger integer tree reader on the original smaller integer
+     * column will "just work".
+     *
+     * In the future, other type promotions might require type conversion.
+     */
+    // short -> int -> bigint as same integer readers are used for the above types.
+
+    for (int i = 0; i < numReadColumns; i++) {
+      OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
+      OrcProto.Type rColType = schemaTypes.get(i);
+      if (!fColType.getKind().equals(rColType.getKind())) {
+
+        boolean ok = false;
+        if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+
+          if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
+              rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+            // type promotion possible, converting SHORT to INT/LONG requested type
+            ok = true;
+          }
+        } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+
+          if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+            // type promotion possible, converting INT to LONG requested type
+            ok = true;
+          }
+        }
+
+        if (!ok) {
+          throw new IOException("ORC does not support type conversion from " +
+              fColType.getKind().name() + " to " + rColType.getKind().name());
+        }
+      }
+    }
+
+    List<Type> fullSchemaTypes;
+
+    if (isAcid) {
+      fullSchemaTypes = new ArrayList<OrcProto.Type>();
+
+      // This copies the ACID struct type which is subtype = 0.
+      // It has field names "operation" through "row".
+      // And we copy the types for all fields EXCEPT ROW (which must be last!).
+
+      for (int i = 0; i < rowSubtype; i++) {
+        fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+      }
+
+      // Add the row struct type.
+      OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+    } else {
+      fullSchemaTypes = schemaTypes;
+    }
+
+    int innerStructSubtype = rowSubtype;
+
+    // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
+    //     " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
+
+    return new TreeReaderSchema().
+        fileTypes(fileTypes).
+        schemaTypes(fullSchemaTypes).
+        innerStructSubtype(innerStructSubtype);
+  }
+
+  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
+    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
+      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+      if (acidEventFieldNames.equals(rootFields)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param typeDescr
+   * @return ORC types for the ACID event based on the row's type description
+   */
+  public static List<Type> createEventSchema(TypeDescription typeDescr) {
+
+    List<Type> result = new ArrayList<Type>();
+
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    type.setKind(OrcProto.Type.Kind.STRUCT);
+    type.addAllFieldNames(acidEventFieldNames);
+    for (int i = 0; i < acidEventFieldNames.size(); i++) {
+      type.addSubtypes(i + 1);
+    }
+    result.add(type.build());
+
+    // Automatically add all fields except the last (ROW).
+    for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
+      type.clear();
+      type.setKind(acidEventOrcTypeKinds.get(i));
+      result.add(type.build());
+    }
+
+    OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+    return result;
+  }
+
+  public static final List<String> acidEventFieldNames= new ArrayList<String>();
+  static {
+    acidEventFieldNames.add("operation");
+    acidEventFieldNames.add("originalTransaction");
+    acidEventFieldNames.add("bucket");
+    acidEventFieldNames.add("rowId");
+    acidEventFieldNames.add("currentTransaction");
+    acidEventFieldNames.add("row");
+  }
+  public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
+      new ArrayList<OrcProto.Type.Kind>();
+  static {
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 4bcc621..8c13571 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -47,24 +47,80 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Factory for creating ORC tree readers.
  */
 public class TreeReaderFactory {
 
+  public static final Logger LOG = LoggerFactory.getLogger(TreeReaderFactory.class);
+
+  public static class TreeReaderSchema {
+
+    /**
+     * The types in the ORC file.
+     */
+    List<OrcProto.Type> fileTypes;
+
+    /**
+     * The treeReaderSchema that the reader should read as.
+     */
+    List<OrcProto.Type> schemaTypes;
+
+    /**
+     * The subtype of the row STRUCT.  Different than 0 for ACID.
+     */
+    int innerStructSubtype;
+
+    public TreeReaderSchema() {
+      fileTypes = null;
+      schemaTypes = null;
+      innerStructSubtype = -1;
+    }
+
+    public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
+      this.fileTypes = fileTypes;
+      return this;
+    }
+
+    public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
+      this.schemaTypes = schemaTypes;
+      return this;
+    }
+
+    public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
+      this.innerStructSubtype = innerStructSubtype;
+      return this;
+    }
+
+    public List<OrcProto.Type> getFileTypes() {
+      return fileTypes;
+    }
+
+    public List<OrcProto.Type> getSchemaTypes() {
+      return schemaTypes;
+    }
+
+    public int getInnerStructSubtype() {
+      return innerStructSubtype;
+    }
+  }
+
   public abstract static class TreeReader {
     protected final int columnId;
     protected BitFieldReader present = null;
     protected boolean valuePresent = false;
+    protected int vectorColumnCount;
 
     TreeReader(int columnId) throws IOException {
       this(columnId, null);
@@ -78,6 +134,11 @@ public class TreeReaderFactory {
       } else {
         present = new BitFieldReader(in, 1);
       }
+      vectorColumnCount = -1;
+    }
+
+    void setVectorColumnCount(int vectorColumnCount) {
+      this.vectorColumnCount = vectorColumnCount;
     }
 
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
@@ -1962,25 +2023,57 @@ public class TreeReaderFactory {
     }
   }
 
-  public static class StructTreeReader extends TreeReader {
+  protected static class StructTreeReader extends TreeReader {
+    private final int fileColumnCount;
+    private final int resultColumnCount;
     protected final TreeReader[] fields;
     private final String[] fieldNames;
 
-    protected StructTreeReader(int columnId,
-        List<OrcProto.Type> types,
+    protected StructTreeReader(
+        int columnId,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
-      int fieldCount = type.getFieldNamesCount();
-      this.fields = new TreeReader[fieldCount];
-      this.fieldNames = new String[fieldCount];
-      for (int i = 0; i < fieldCount; ++i) {
-        int subtype = type.getSubtypes(i);
-        if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+
+      OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
+      fileColumnCount = fileStructType.getFieldNamesCount();
+
+      OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+
+      if (columnId == treeReaderSchema.getInnerStructSubtype()) {
+        // If there are more result columns than reader columns, we will default those additional
+        // columns to NULL.
+        resultColumnCount = schemaStructType.getFieldNamesCount();
+      } else {
+        resultColumnCount = fileColumnCount;
+      }
+
+      this.fields = new TreeReader[fileColumnCount];
+      this.fieldNames = new String[fileColumnCount];
+
+      if (included == null) {
+        for (int i = 0; i < fileColumnCount; ++i) {
+          int subtype = schemaStructType.getSubtypes(i);
+          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+          this.fieldNames[i] = schemaStructType.getFieldNames(i);
+        }
+      } else {
+        for (int i = 0; i < fileColumnCount; ++i) {
+          int subtype = schemaStructType.getSubtypes(i);
+          if (subtype >= included.length) {
+            throw new IOException("subtype " + subtype + " exceeds the included array size " +
+                included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
+                " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
+                " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
+          }
+          if (included[subtype]) {
+            this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+          }
+          // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+          this.fieldNames[i] = schemaStructType.getFieldNames(i);
         }
-        this.fieldNames[i] = type.getFieldNames(i);
       }
     }
 
@@ -2000,22 +2093,28 @@ public class TreeReaderFactory {
       OrcStruct result = null;
       if (valuePresent) {
         if (previous == null) {
-          result = new OrcStruct(fields.length);
+          result = new OrcStruct(resultColumnCount);
         } else {
           result = (OrcStruct) previous;
 
           // If the input format was initialized with a file with a
           // different number of fields, the number of fields needs to
           // be updated to the correct number
-          if (result.getNumFields() != fields.length) {
-            result.setNumFields(fields.length);
+          if (result.getNumFields() != resultColumnCount) {
+            result.setNumFields(resultColumnCount);
           }
         }
-        for (int i = 0; i < fields.length; ++i) {
+        for (int i = 0; i < fileColumnCount; ++i) {
           if (fields[i] != null) {
             result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
           }
         }
+        if (resultColumnCount > fileColumnCount) {
+          for (int i = fileColumnCount; i < resultColumnCount; ++i) {
+            // Default new treeReaderSchema evolution fields to NULL.
+            result.setFieldValue(i, null);
+          }
+        }
       }
       return result;
     }
@@ -2024,13 +2123,13 @@ public class TreeReaderFactory {
     public Object nextVector(Object previousVector, long batchSize) throws IOException {
       final ColumnVector[] result;
       if (previousVector == null) {
-        result = new ColumnVector[fields.length];
+        result = new ColumnVector[fileColumnCount];
       } else {
         result = (ColumnVector[]) previousVector;
       }
 
       // Read all the members of struct as column vectors
-      for (int i = 0; i < fields.length; i++) {
+      for (int i = 0; i < fileColumnCount; i++) {
         if (fields[i] != null) {
           if (result[i] == null) {
             result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
@@ -2039,6 +2138,19 @@ public class TreeReaderFactory {
           }
         }
       }
+
+      // Default additional treeReaderSchema evolution fields to NULL.
+      if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) {
+        for (int i = fileColumnCount; i < vectorColumnCount; ++i) {
+          ColumnVector colVector = result[i];
+          if (colVector != null) {
+            colVector.isRepeating = true;
+            colVector.noNulls = false;
+            colVector.isNull[0] = true;
+          }
+        }
+      }
+
       return result;
     }
 
@@ -2070,17 +2182,17 @@ public class TreeReaderFactory {
     protected RunLengthByteReader tags;
 
     protected UnionTreeReader(int columnId,
-        List<OrcProto.Type> types,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
+      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
       int fieldCount = type.getSubtypesCount();
       this.fields = new TreeReader[fieldCount];
       for (int i = 0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+          this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
         }
       }
     }
@@ -2150,12 +2262,12 @@ public class TreeReaderFactory {
     protected IntegerReader lengths = null;
 
     protected ListTreeReader(int columnId,
-        List<OrcProto.Type> types,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
-      elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt);
+      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
+      elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
     }
 
     @Override
@@ -2240,20 +2352,20 @@ public class TreeReaderFactory {
     protected IntegerReader lengths = null;
 
     protected MapTreeReader(int columnId,
-        List<OrcProto.Type> types,
+        TreeReaderSchema treeReaderSchema,
         boolean[] included,
         boolean skipCorrupt) throws IOException {
       super(columnId);
-      OrcProto.Type type = types.get(columnId);
+      OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
       int keyColumn = type.getSubtypes(0);
       int valueColumn = type.getSubtypes(1);
       if (included == null || included[keyColumn]) {
-        keyReader = createTreeReader(keyColumn, types, included, skipCorrupt);
+        keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
       } else {
         keyReader = null;
       }
       if (included == null || included[valueColumn]) {
-        valueReader = createTreeReader(valueColumn, types, included, skipCorrupt);
+        valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
       } else {
         valueReader = null;
       }
@@ -2333,11 +2445,11 @@ public class TreeReaderFactory {
   }
 
   public static TreeReader createTreeReader(int columnId,
-      List<OrcProto.Type> types,
+      TreeReaderSchema treeReaderSchema,
       boolean[] included,
       boolean skipCorrupt
   ) throws IOException {
-    OrcProto.Type type = types.get(columnId);
+    OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
     switch (type.getKind()) {
       case BOOLEAN:
         return new BooleanTreeReader(columnId);
@@ -2377,13 +2489,13 @@ public class TreeReaderFactory {
         int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
         return new DecimalTreeReader(columnId, precision, scale);
       case STRUCT:
-        return new StructTreeReader(columnId, types, included, skipCorrupt);
+        return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       case LIST:
-        return new ListTreeReader(columnId, types, included, skipCorrupt);
+        return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       case MAP:
-        return new MapTreeReader(columnId, types, included, skipCorrupt);
+        return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       case UNION:
-        return new UnionTreeReader(columnId, types, included, skipCorrupt);
+        return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
       default:
         throw new IllegalArgumentException("Unsupported type " +
             type.getKind());

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
index a8e5c2e..a2725b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,31 +45,25 @@ class VectorizedOrcAcidRowReader
   private final AcidInputFormat.RowReader<OrcStruct> innerReader;
   private final RecordIdentifier key;
   private final OrcStruct value;
-  private final VectorizedRowBatchCtx rowBatchCtx;
+  private VectorizedRowBatchCtx rbCtx;
+  private Object[] partitionValues;
   private final ObjectInspector objectInspector;
   private final DataOutputBuffer buffer = new DataOutputBuffer();
 
   VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
                              Configuration conf,
+                             VectorizedRowBatchCtx vectorizedRowBatchCtx,
                              FileSplit split) throws IOException {
     this.innerReader = inner;
     this.key = inner.createKey();
-    this.rowBatchCtx = new VectorizedRowBatchCtx();
+    rbCtx = vectorizedRowBatchCtx;
+    int partitionColumnCount = rbCtx.getPartitionColumnCount();
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
+    }
     this.value = inner.createValue();
     this.objectInspector = inner.getObjectInspector();
-    try {
-      rowBatchCtx.init(conf, split);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (SerDeException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (InstantiationException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Failed to initialize context", e);
-    } catch (HiveException e) {
-      throw new IOException("Failed to initialize context", e);
-    }
   }
 
   @Override
@@ -82,23 +75,21 @@ class VectorizedOrcAcidRowReader
     if (!innerReader.next(key, value)) {
       return false;
     }
-    try {
-      rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch);
-    } catch (HiveException e) {
-      throw new IOException("Problem adding partition column", e);
+    if (partitionValues != null) {
+      rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues);
     }
     try {
       VectorizedBatchUtil.acidAddRowToBatch(value,
           (StructObjectInspector) objectInspector,
-          vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+          vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
       while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
           innerReader.next(key, value)) {
         VectorizedBatchUtil.acidAddRowToBatch(value,
             (StructObjectInspector) objectInspector,
-            vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+            vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
       }
-    } catch (HiveException he) {
-      throw new IOException("error iterating", he);
+    } catch (Exception e) {
+      throw new IOException("error iterating", e);
     }
     return true;
   }
@@ -110,11 +101,7 @@ class VectorizedOrcAcidRowReader
 
   @Override
   public VectorizedRowBatch createValue() {
-    try {
-      return rowBatchCtx.createVectorizedRowBatch();
-    } catch (HiveException e) {
-      throw new RuntimeException("Error creating a batch", e);
-    }
+    return rbCtx.createVectorizedRowBatch();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index bf09001..6d280c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,14 +26,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -47,7 +44,8 @@ import org.apache.hadoop.mapred.Reporter;
  * A MapReduce/Hive input format for ORC files.
  */
 public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
-    implements InputFormatChecker, VectorizedInputFormatInterface {
+    implements InputFormatChecker, VectorizedInputFormatInterface,
+    SelfDescribingInputFormatInterface {
 
   static class VectorizedOrcRecordReader
       implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -56,12 +54,21 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
     private final long length;
     private float progress = 0.0f;
     private VectorizedRowBatchCtx rbCtx;
+    private final boolean[] columnsToIncludeTruncated;
+    private final Object[] partitionValues;
     private boolean addPartitionCols = true;
 
     VectorizedOrcRecordReader(Reader file, Configuration conf,
         FileSplit fileSplit) throws IOException {
+
+      /**
+       * Do we have schema on read in the configuration variables?
+       */
+      TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+
       List<OrcProto.Type> types = file.getTypes();
       Reader.Options options = new Reader.Options();
+      options.schema(schema);
       this.offset = fileSplit.getStart();
       this.length = fileSplit.getLength();
       options.range(offset, length);
@@ -69,11 +76,17 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
       OrcInputFormat.setSearchArgument(options, types, conf, true);
 
       this.reader = file.rowsOptions(options);
-      try {
-        rbCtx = new VectorizedRowBatchCtx();
-        rbCtx.init(conf, fileSplit);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+      columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
+
+      int partitionColumnCount = rbCtx.getPartitionColumnCount();
+      if (partitionColumnCount > 0) {
+        partitionValues = new Object[partitionColumnCount];
+        rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+      } else {
+        partitionValues = null;
       }
     }
 
@@ -90,7 +103,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
         // as this does not call CreateValue for each new RecordReader it creates, this check is
         // required in next()
         if (addPartitionCols) {
-          rbCtx.addPartitionColsToBatch(value);
+          if (partitionValues != null) {
+            rbCtx.addPartitionColsToBatch(value, partitionValues);
+          }
           addPartitionCols = false;
         }
         reader.nextBatch(value);
@@ -108,11 +123,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
 
     @Override
     public VectorizedRowBatch createValue() {
-      try {
-        return rbCtx.createVectorizedRowBatch();
-      } catch (HiveException e) {
-        throw new RuntimeException("Error creating a batch", e);
-      }
+      return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index b28d870..2072533 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,8 +14,10 @@
 package org.apache.hadoop.hive.ql.io.parquet;
 
 import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
@@ -23,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -32,7 +35,6 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-
 import org.apache.parquet.hadoop.ParquetInputFormat;
 
 /**
@@ -52,6 +54,7 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
 
     private final ParquetRecordReaderWrapper internalReader;
       private VectorizedRowBatchCtx rbCtx;
+      private Object[] partitionValues;
       private ArrayWritable internalValues;
       private NullWritable internalKey;
       private VectorColumnAssign[] assigners;
@@ -65,11 +68,11 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
         split,
         conf,
         reporter);
-      try {
-        rbCtx = new VectorizedRowBatchCtx();
-        rbCtx.init(conf, split);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+      int partitionColumnCount = rbCtx.getPartitionColumnCount();
+      if (partitionColumnCount > 0) {
+        partitionValues = new Object[partitionColumnCount];
+        rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
       }
     }
 
@@ -81,13 +84,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
 
       @Override
       public VectorizedRowBatch createValue() {
-        VectorizedRowBatch outputBatch = null;
-        try {
-          outputBatch = rbCtx.createVectorizedRowBatch();
-          internalValues = internalReader.createValue();
-        } catch (HiveException e) {
-          throw new RuntimeException("Error creating a batch", e);
-        }
+        VectorizedRowBatch outputBatch;
+        outputBatch = rbCtx.createVectorizedRowBatch();
+        internalValues = internalReader.createValue();
         return outputBatch;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 05dfc4b..82514d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -111,6 +111,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.InputFormat;
 
@@ -693,6 +695,11 @@ public final class GenMapRedUtils {
       parseCtx.getGlobalLimitCtx().disableOpt();
     }
 
+    if (topOp instanceof TableScanOperator) {
+      Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),
+          (TableScanOperator) topOp);
+    }
+
     Iterator<Path> iterPath = partDir.iterator();
     Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator();
 
@@ -754,6 +761,7 @@ public final class GenMapRedUtils {
    *          whether you need to add to map-reduce or local work
    * @param tt_desc
    *          table descriptor
+   * @throws SerDeException
    */
   public static void setTaskPlan(String path, String alias,
       Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
@@ -763,6 +771,16 @@ public final class GenMapRedUtils {
       return;
     }
 
+    if (topOp instanceof TableScanOperator) {
+      try {
+      Utilities.addSchemaEvolutionToTableScanOperator(
+          (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(),
+          (TableScanOperator) topOp);
+      } catch (Exception e) {
+        throw new SemanticException(e);
+      }
+    }
+
     if (!local) {
       if (plan.getPathToAliases().get(path) == null) {
         plan.getPathToAliases().put(path, new ArrayList<String>());

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 588f407..332e53b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -370,6 +370,7 @@ public class SimpleFetchOptimizer implements Transform {
 
     private FetchWork convertToWork() throws HiveException {
       inputs.clear();
+      Utilities.addSchemaEvolutionToTableScanOperator(table, scanOp);
       TableDesc tableDesc = Utilities.getTableDesc(table);
       if (!table.isPartitioned()) {
         inputs.add(new ReadEntity(table, parent, !table.isView() && parent == null));