You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:05 UTC

[23/34] hive git commit: HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
deleted file mode 100644
index 5ce7553..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazyDate;
-import org.apache.hadoop.hive.serde2.lazy.LazyLong;
-import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * VectorizedColumnarSerDe is used by Vectorized query execution engine
- * for columnar based storage supported by RCFile.
- */
-public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde {
-
-  public VectorizedColumnarSerDe() throws SerDeException {
-  }
-
-  private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE];
-  private final ObjectWritable ow = new ObjectWritable();
-  private final ByteStream.Output serializeVectorStream = new ByteStream.Output();
-
-  /**
-   * Serialize a vectorized row batch
-   *
-   * @param vrg
-   *          Vectorized row batch to serialize
-   * @param objInspector
-   *          The ObjectInspector for the row object
-   * @return The serialized Writable object
-   * @throws SerDeException
-   * @see SerDe#serialize(Object, ObjectInspector)
-   */
-  @Override
-  public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
-      throws SerDeException {
-    try {
-      // Validate that the OI is of struct type
-      if (objInspector.getCategory() != Category.STRUCT) {
-        throw new UnsupportedOperationException(getClass().toString()
-            + " can only serialize struct types, but we got: "
-            + objInspector.getTypeName());
-      }
-
-      VectorizedRowBatch batch = (VectorizedRowBatch) vrg;
-      StructObjectInspector soi = (StructObjectInspector) objInspector;
-      List<? extends StructField> fields = soi.getAllStructFieldRefs();
-
-      // Reset the byte buffer
-      serializeVectorStream.reset();
-      int count = 0;
-      int rowIndex = 0;
-      for (int i = 0; i < batch.size; i++) {
-
-        // If selectedInUse is true then we need to serialize only
-        // the selected indexes
-        if (batch.selectedInUse) {
-          rowIndex = batch.selected[i];
-        } else {
-          rowIndex = i;
-        }
-
-        BytesRefArrayWritable byteRow = byteRefArray[i];
-        int numCols = fields.size();
-
-        if (byteRow == null) {
-          byteRow = new BytesRefArrayWritable(numCols);
-          byteRefArray[i] = byteRow;
-        }
-
-        byteRow.resetValid(numCols);
-
-        for (int p = 0; p < batch.projectionSize; p++) {
-          int k = batch.projectedColumns[p];
-          ObjectInspector foi = fields.get(k).getFieldObjectInspector();
-          ColumnVector currentColVector = batch.cols[k];
-
-          switch (foi.getCategory()) {
-          case PRIMITIVE: {
-            PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-            if (!currentColVector.noNulls
-                && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) {
-              // The column is null hence write null value
-              serializeVectorStream.write(new byte[0], 0, 0);
-            } else {
-              // If here then the vector value is not null.
-              if (currentColVector.isRepeating) {
-                // If the vector has repeating values then set rowindex to zero
-                rowIndex = 0;
-              }
-
-              switch (poi.getPrimitiveCategory()) {
-              case BOOLEAN: {
-                LongColumnVector lcv = (LongColumnVector) batch.cols[k];
-                // In vectorization true is stored as 1 and false as 0
-                boolean b = lcv.vector[rowIndex] == 1 ? true : false;
-                if (b) {
-                  serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
-                } else {
-                  serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
-                }
-              }
-                break;
-              case BYTE:
-              case SHORT:
-              case INT:
-              case LONG:
-                LongColumnVector lcv = (LongColumnVector) batch.cols[k];
-                LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]);
-                break;
-              case FLOAT:
-              case DOUBLE:
-                DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k];
-                ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex]));
-                serializeVectorStream.write(b.array(), 0, b.limit());
-                break;
-              case BINARY: {
-                BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
-                byte[] bytes = bcv.vector[rowIndex];
-                serializeVectorStream.write(bytes, 0, bytes.length);
-              }
-                break;
-              case STRING:
-              case CHAR:
-              case VARCHAR: {
-                // Is it correct to escape CHAR and VARCHAR?
-                BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
-                LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex],
-                    bcv.start[rowIndex],
-                    bcv.length[rowIndex],
-                    serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams
-                        .getNeedsEscape());
-              }
-                break;
-              case TIMESTAMP:
-                LongColumnVector tcv = (LongColumnVector) batch.cols[k];
-                long timeInNanoSec = tcv.vector[rowIndex];
-                Timestamp t = new Timestamp(0);
-                TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t);
-                TimestampWritable tw = new TimestampWritable();
-                tw.set(t);
-                LazyTimestamp.writeUTF8(serializeVectorStream, tw);
-                break;
-              case DATE:
-                LongColumnVector dacv = (LongColumnVector) batch.cols[k];
-                DateWritable daw = new DateWritable((int) dacv.vector[rowIndex]);
-                LazyDate.writeUTF8(serializeVectorStream, daw);
-                break;
-              default:
-                throw new UnsupportedOperationException(
-                    "Vectorizaton is not supported for datatype:"
-                        + poi.getPrimitiveCategory());
-              }
-            }
-            break;
-          }
-          case LIST:
-          case MAP:
-          case STRUCT:
-          case UNION:
-            throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:"
-                + foi.getCategory());
-          default:
-            throw new SerDeException("Unknown ObjectInspector category!");
-
-          }
-
-          byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream
-              .getLength() - count);
-          count = serializeVectorStream.getLength();
-        }
-
-      }
-      ow.set(byteRefArray);
-    } catch (Exception e) {
-      throw new SerDeException(e);
-    }
-    return ow;
-  }
-
-  @Override
-  public SerDeStats getSerDeStats() {
-    return null;
-  }
-
-  @Override
-  public Class<? extends Writable> getSerializedClass() {
-    return BytesRefArrayWritable.class;
-  }
-
-  @Override
-  public Object deserialize(Writable blob) throws SerDeException {
-
-    // Ideally this should throw  UnsupportedOperationException as the serde is
-    // vectorized serde. But since RC file reader does not support vectorized reading this
-    // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch
-    // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
-    // reading this serde and be standalone serde with no dependency on ColumnarSerDe.
-    return super.deserialize(blob);
-  }
-
-  @Override
-  public ObjectInspector getObjectInspector() throws SerDeException {
-    return cachedObjectInspector;
-  }
-
-  @Override
-  public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Deserializes the rowBlob into Vectorized row batch
-   * @param rowBlob
-   *          rowBlob row batch to deserialize
-   * @param rowsInBlob
-   *          Total number of rows in rowBlob to deserialize
-   * @param reuseBatch
-   *          VectorizedRowBatch to which the rows should be serialized   *
-   * @throws SerDeException
-   */
-  @Override
-  public void deserializeVector(Object rowBlob, int rowsInBlob,
-      VectorizedRowBatch reuseBatch) throws SerDeException {
-
-    BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    for (int i = 0; i < rowsInBlob; i++) {
-      Object row = deserialize(refArray[i]);
-      try {
-        VectorizedBatchUtil.addRowToBatch(row,
-            (StructObjectInspector) cachedObjectInspector, i,
-            reuseBatch, buffer);
-      } catch (HiveException e) {
-        throw new SerDeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 6557002..a904a50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -20,16 +20,10 @@ package org.apache.hadoop.hive.ql.exec.vector;
 import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,337 +31,266 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hive.common.util.DateUtils;
 
 /**
- * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
+ * Context for Vectorized row batch. this class does eager deserialization of row data using serde
  * in the RecordReader layer.
  * It has supports partitions in this layer so that the vectorized batch is populated correctly
  * with the partition column.
  */
 public class VectorizedRowBatchCtx {
 
+  private static final long serialVersionUID = 1L;
+
   private static final Logger LOG = LoggerFactory.getLogger(VectorizedRowBatchCtx.class.getName());
 
-  // OI for raw row data (EG without partition cols)
-  private StructObjectInspector rawRowOI;
+  // The following information is for creating VectorizedRowBatch and for helping with
+  // knowing how the table is partitioned.
+  //
+  // It will be stored in MapWork and ReduceWork.
+  private String[] rowColumnNames;
+  private TypeInfo[] rowColumnTypeInfos;
+  private int dataColumnCount;
+  private int partitionColumnCount;
 
-  // OI for the row (Raw row OI + partition OI)
-  private StructObjectInspector rowOI;
+  private String[] scratchColumnTypeNames;
 
-  // Deserializer for the row data
-  private Deserializer deserializer;
+  /**
+   * Constructor for VectorizedRowBatchCtx
+   */
+  public VectorizedRowBatchCtx() {
+  }
 
-  // Hash map of partition values. Key=TblColName value=PartitionValue
-  private Map<String, Object> partitionValues;
-  
-  //partition types
-  private Map<String, PrimitiveCategory> partitionTypes;
+  public VectorizedRowBatchCtx(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos,
+      int partitionColumnCount, String[] scratchColumnTypeNames) {
+    this.rowColumnNames = rowColumnNames;
+    this.rowColumnTypeInfos = rowColumnTypeInfos;
+    this.partitionColumnCount = partitionColumnCount;
+    this.scratchColumnTypeNames = scratchColumnTypeNames;
 
-  // partition column positions, for use by classes that need to know whether a given column is a
-  // partition column
-  private Set<Integer> partitionCols;
-  
-  // Column projection list - List of column indexes to include. This
-  // list does not contain partition columns
-  private List<Integer> colsToInclude;
+    dataColumnCount = rowColumnTypeInfos.length - partitionColumnCount;
+  }
 
-  private Map<Integer, String> scratchColumnTypeMap = null;
+  public String[] getRowColumnNames() {
+    return rowColumnNames;
+  }
 
-  /**
-   * Constructor for VectorizedRowBatchCtx
-   *
-   * @param rawRowOI
-   *          OI for raw row data (EG without partition cols)
-   * @param rowOI
-   *          OI for the row (Raw row OI + partition OI)
-   * @param deserializer
-   *          Deserializer for the row data
-   * @param partitionValues
-   *          Hash map of partition values. Key=TblColName value=PartitionValue
-   */
-  public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI,
-      Deserializer deserializer, Map<String, Object> partitionValues, 
-      Map<String, PrimitiveCategory> partitionTypes) {
-    this.rowOI = rowOI;
-    this.rawRowOI = rawRowOI;
-    this.deserializer = deserializer;
-    this.partitionValues = partitionValues;
-    this.partitionTypes = partitionTypes;
+  public TypeInfo[] getRowColumnTypeInfos() {
+    return rowColumnTypeInfos;
   }
 
-  /**
-   * Constructor for VectorizedRowBatchCtx
-   */
-  public VectorizedRowBatchCtx() {
+  public int getDataColumnCount() {
+    return dataColumnCount;
+  }
 
+  public int getPartitionColumnCount() {
+    return partitionColumnCount;
+  }
+
+  public String[] getScratchColumnTypeNames() {
+    return scratchColumnTypeNames;
   }
 
   /**
-   * Initializes the VectorizedRowBatch context based on an scratch column type map and
+   * Initializes the VectorizedRowBatch context based on an scratch column type names and
    * object inspector.
-   * @param scratchColumnTypeMap
-   * @param rowOI
+   * @param structObjectInspector
+   * @param scratchColumnTypeNames
    *          Object inspector that shapes the column types
+   * @throws HiveException
    */
-  public void init(Map<Integer, String> scratchColumnTypeMap,
-      StructObjectInspector rowOI) {
-    this.scratchColumnTypeMap = scratchColumnTypeMap;
-    this.rowOI= rowOI;
-    this.rawRowOI = rowOI;
+  public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames)
+          throws HiveException {
+
+    // Row column information.
+    rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector);
+    rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector);
+    partitionColumnCount = 0;
+    dataColumnCount = rowColumnTypeInfos.length;
+
+    // Scratch column information.
+    this.scratchColumnTypeNames = scratchColumnTypeNames;
   }
 
-  /**
-   * Initializes VectorizedRowBatch context based on the
-   * split and Hive configuration (Job conf with hive Plan).
-   *
-   * @param hiveConf
-   *          Hive configuration using Hive plan is extracted
-   * @param split
-   *          File split of the file being read
-   * @throws ClassNotFoundException
-   * @throws IOException
-   * @throws SerDeException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
-   * @throws HiveException
-   */
-  public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException,
-      IOException,
-      SerDeException,
-      InstantiationException,
-      IllegalAccessException,
-      HiveException {
+  public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf,
+      FileSplit split, Object[] partitionValues) throws IOException {
 
     Map<String, PartitionDesc> pathToPartitionInfo = Utilities
         .getMapWork(hiveConf).getPathToPartitionInfo();
 
-    PartitionDesc part = HiveFileFormatUtils
+    PartitionDesc partDesc = HiveFileFormatUtils
         .getPartitionDescFromPathRecursively(pathToPartitionInfo,
             split.getPath(), IOPrepareCache.get().getPartitionDescMap());
 
-    String partitionPath = split.getPath().getParent().toString();
-    scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf);
-    // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString());
-
-    Properties partProps =
-        (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ?
-            part.getTableDesc().getProperties() : part.getProperties();
-
-    Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName());
-    Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); 
-    SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(),
-                               partProps);
-    StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
-        .getObjectInspector();
-
-    deserializer = partDeserializer;
-
-    // Check to see if this split is part of a partition of a table
-    String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-
-    String[] partKeys = null;
-    if (pcols != null && pcols.length() > 0) {
-
-      // Partitions exist for this table. Get the partition object inspector and
-      // raw row object inspector (row with out partition col)
-      LinkedHashMap<String, String> partSpec = part.getPartSpec();
-      partKeys = pcols.trim().split("/");
-      String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);      
-      String[] partKeyTypes = pcolTypes.trim().split(":");      
-      
-      if (partKeys.length  > partKeyTypes.length) {
-        throw new HiveException("Internal error : partKeys length, " +partKeys.length +
-                " greater than partKeyTypes length, " + partKeyTypes.length);
-      }
-      
-      List<String> partNames = new ArrayList<String>(partKeys.length);
-      List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-      partitionValues = new LinkedHashMap<String, Object>();
-      partitionTypes = new LinkedHashMap<String, PrimitiveCategory>();
-      for (int i = 0; i < partKeys.length; i++) {
-        String key = partKeys[i];
-        partNames.add(key);
-        ObjectInspector objectInspector = null;
-        Object objectVal; 
-        if (partSpec == null) {
-          // for partitionless table, initialize partValue to empty string.
-          // We can have partitionless table even if we have partition keys
-          // when there is only only partition selected and the partition key is not
-          // part of the projection/include list.
-          objectVal = null;
-          objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
-          partitionTypes.put(key, PrimitiveCategory.STRING);       
-        } else {
-          // Create a Standard java object Inspector
-          objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
-              TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-          objectVal = 
-              ObjectInspectorConverters.
-              getConverter(PrimitiveObjectInspectorFactory.
-                  javaStringObjectInspector, objectInspector).
-                  convert(partSpec.get(key));              
-          partitionTypes.put(key, TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]).getPrimitiveCategory());
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key));
-        }
-        partitionValues.put(key, objectVal);
-        partObjectInspectors.add(objectInspector);
-      }
-
-      // Create partition OI
-      StructObjectInspector partObjectInspector = ObjectInspectorFactory
-          .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
-      // Get row OI from partition OI and raw row OI
-      StructObjectInspector rowObjectInspector = ObjectInspectorFactory
-          .getUnionStructObjectInspector(Arrays
-              .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector}));
-      rowOI = rowObjectInspector;
-      rawRowOI = partRawRowObjectInspector;
-
-      // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it
-      partitionCols = new HashSet<Integer>();
-      if (pcols != null && pcols.length() > 0) {
-        for (int i = 0; i < partKeys.length; i++) {
-          partitionCols.add(getColIndexBasedOnColName(partKeys[i]));
-        }
-      }
+    getPartitionValues(vrbCtx, partDesc, partitionValues);
 
-    } else {
+  }
 
-      // No partitions for this table, hence row OI equals raw row OI
-      rowOI = partRawRowObjectInspector;
-      rawRowOI = partRawRowObjectInspector;
+  public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc,
+      Object[] partitionValues) {
+
+    LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+
+    for (int i = 0; i < vrbCtx.partitionColumnCount; i++) {
+      Object objectValue;
+      if (partSpec == null) {
+        // For partition-less table, initialize partValue to empty string.
+        // We can have partition-less table even if we have partition keys
+        // when there is only only partition selected and the partition key is not
+        // part of the projection/include list.
+        objectValue = null;
+      } else {
+        String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i];
+
+        // Create a Standard java object Inspector
+        ObjectInspector objectInspector =
+            TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+                vrbCtx.rowColumnTypeInfos[vrbCtx.dataColumnCount + i]);
+        objectValue =
+            ObjectInspectorConverters.
+                getConverter(PrimitiveObjectInspectorFactory.
+                    javaStringObjectInspector, objectInspector).
+                        convert(partSpec.get(key));
+      }
+      partitionValues[i] = objectValue;
     }
-
-    colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf);
   }
-  
+
   /**
    * Creates a Vectorized row batch and the column vectors.
    *
    * @return VectorizedRowBatch
    * @throws HiveException
    */
-  public VectorizedRowBatch createVectorizedRowBatch() throws HiveException {
-    List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
-    VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
-    for (int j = 0; j < fieldRefs.size(); j++) {
-      // If the column is included in the include list or if the column is a
-      // partition column then create the column vector. Also note that partition columns are not
-      // in the included list.
-      if ((colsToInclude == null) || colsToInclude.contains(j)
-          || ((partitionValues != null) &&
-              partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) {
-        ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
-        result.cols[j] = VectorizedBatchUtil.createColumnVector(foi);
-      }
+  public VectorizedRowBatch createVectorizedRowBatch()
+  {
+    int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+    VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+    LOG.info("createVectorizedRowBatch columnsToIncludeTruncated NONE");
+    for (int i = 0; i < rowColumnTypeInfos.length; i++) {
+      TypeInfo typeInfo = rowColumnTypeInfos[i];
+      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+    }
+
+    for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+      String typeName = scratchColumnTypeNames[i];
+      result.cols[rowColumnTypeInfos.length + i] =
+          VectorizedBatchUtil.createColumnVector(typeName);
     }
-    result.numCols = fieldRefs.size();
-    this.addScratchColumnsToBatch(result);
+
+    result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
     result.reset();
     return result;
   }
 
-  /**
-   * Adds the row to the batch after deserializing the row
-   *
-   * @param rowIndex
-   *          Row index in the batch to which the row is added
-   * @param rowBlob
-   *          Row blob (serialized version of row)
-   * @param batch
-   *          Vectorized batch to which the row is added
-   * @param buffer a buffer to copy strings into
-   * @throws HiveException
-   * @throws SerDeException
-   */
-  public void addRowToBatch(int rowIndex, Writable rowBlob,
-                            VectorizedRowBatch batch,
-                            DataOutputBuffer buffer
-                            ) throws HiveException, SerDeException
+  public VectorizedRowBatch createVectorizedRowBatch(boolean[] columnsToIncludeTruncated)
   {
-    Object row = this.deserializer.deserialize(rowBlob);
-    VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer);
-  }
+    if (columnsToIncludeTruncated == null) {
+      return createVectorizedRowBatch();
+    }
 
-  /**
-   * Deserialized set of rows and populates the batch
-   *
-   * @param rowBlob
-   *          to deserialize
-   * @param batch
-   *          Vectorized row batch which contains deserialized data
-   * @throws SerDeException
-   */
-  public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob,
-      VectorizedRowBatch batch)
-      throws SerDeException {
-
-    if (deserializer instanceof VectorizedSerde) {
-      ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch);
-    } else {
-      throw new SerDeException(
-          "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
+    LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
+    int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+    VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+    for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
+      if (columnsToIncludeTruncated[i]) {
+        TypeInfo typeInfo = rowColumnTypeInfos[i];
+        result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+      }
+    }
+
+    for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
+      TypeInfo typeInfo = rowColumnTypeInfos[i];
+      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
     }
+
+    for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+      String typeName = scratchColumnTypeNames[i];
+      result.cols[rowColumnTypeInfos.length + i] =
+          VectorizedBatchUtil.createColumnVector(typeName);
+    }
+
+    result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
+    result.reset();
+    return result;
   }
 
-  private int getColIndexBasedOnColName(String colName) throws HiveException
-  {
-    List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
-    for (int i = 0; i < fieldRefs.size(); i++) {
-      if (fieldRefs.get(i).getFieldName().equals(colName)) {
-        return i;
+  public boolean[] getColumnsToIncludeTruncated(Configuration conf) {
+    boolean[] columnsToIncludeTruncated = null;
+
+    List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(conf);
+    if (columnsToIncludeTruncatedList != null && columnsToIncludeTruncatedList.size() > 0 ) {
+
+      // Partitioned columns will not be in the include list.
+
+      boolean[] columnsToInclude = new boolean[dataColumnCount];
+      Arrays.fill(columnsToInclude, false);
+      for (int columnNum : columnsToIncludeTruncatedList) {
+        if (columnNum < dataColumnCount) {
+          columnsToInclude[columnNum] = true;
+        }
+      }
+
+      // Work backwards to find the highest wanted column.
+
+      int highestWantedColumnNum = -1;
+      for (int i = dataColumnCount - 1; i >= 0; i--) {
+        if (columnsToInclude[i]) {
+          highestWantedColumnNum = i;
+          break;
+        }
+      }
+      if (highestWantedColumnNum == -1) {
+        throw new RuntimeException("No columns to include?");
+      }
+      int newColumnCount = highestWantedColumnNum + 1;
+      if (newColumnCount == dataColumnCount) {
+        // Didn't trim any columns off the end.  Use the original.
+        columnsToIncludeTruncated = columnsToInclude;
+      } else {
+        columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount);
       }
     }
-    throw new HiveException("Not able to find column name in row object inspector");
+    return columnsToIncludeTruncated;
   }
-  
+
   /**
    * Add the partition values to the batch
    *
    * @param batch
+   * @param partitionValues
    * @throws HiveException
    */
-  public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException {
-    int colIndex;
-    Object value;
-    PrimitiveCategory pCategory;
+  public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues)
+  {
     if (partitionValues != null) {
-      for (String key : partitionValues.keySet()) {
-        colIndex = getColIndexBasedOnColName(key);
-        value = partitionValues.get(key);
-        pCategory = partitionTypes.get(key);
-        
-        switch (pCategory) {
+      for (int i = 0; i < partitionColumnCount; i++) {
+        Object value = partitionValues[i];
+
+        int colIndex = dataColumnCount + i;
+        String partitionColumnName = rowColumnNames[colIndex];
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex];
+        switch (primitiveTypeInfo.getPrimitiveCategory()) {
         case BOOLEAN: {
           LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex];
           if (value == null) {
@@ -519,7 +442,7 @@ public class VectorizedRowBatchCtx {
             HiveDecimal hd = (HiveDecimal) value;
             dv.set(0, hd);
             dv.isRepeating = true;
-            dv.isNull[0] = false;      
+            dv.isNull[0] = false;
           }
         }
         break;
@@ -548,15 +471,15 @@ public class VectorizedRowBatchCtx {
             bcv.isNull[0] = true;
             bcv.isRepeating = true;
           } else {
-            bcv.fill(sVal.getBytes()); 
+            bcv.fill(sVal.getBytes());
             bcv.isNull[0] = false;
           }
         }
         break;
-        
+
         default:
-          throw new HiveException("Unable to recognize the partition type " + pCategory + 
-              " for column " + key);
+          throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() +
+              " for column " + partitionColumnName);
         }
       }
     }
@@ -564,64 +487,12 @@ public class VectorizedRowBatchCtx {
 
   /**
    * Determine whether a given column is a partition column
-   * @param colnum column number in
+   * @param colNum column number in
    * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context.
    * @return true if it is a partition column, false otherwise
    */
-  public final boolean isPartitionCol(int colnum) {
-    return (partitionCols == null) ? false : partitionCols.contains(colnum);
-  }
-
-  private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException {
-    if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) {
-      int origNumCols = vrb.numCols;
-      int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size();
-      vrb.cols = Arrays.copyOf(vrb.cols, newNumCols);
-      for (int i = origNumCols; i < newNumCols; i++) {
-       String typeName = scratchColumnTypeMap.get(i);
-       if (typeName == null) {
-         throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString());
-       }
-        vrb.cols[i] = allocateColumnVector(typeName,
-            VectorizedRowBatch.DEFAULT_SIZE);
-      }
-      vrb.numCols = vrb.cols.length;
-    }
+  public final boolean isPartitionCol(int colNum) {
+    return colNum >= dataColumnCount && colNum < rowColumnTypeInfos.length;
   }
 
-  /**
-   * Get the scale and precision for the given decimal type string. The decimal type is assumed to be
-   * of the format decimal(precision,scale) e.g. decimal(20,10).
-   * @param decimalType The given decimal type string.
-   * @return An integer array of size 2 with first element set to precision and second set to scale.
-   */
-  private static int[] getScalePrecisionFromDecimalType(String decimalType) {
-    Pattern p = Pattern.compile("\\d+");
-    Matcher m = p.matcher(decimalType);
-    m.find();
-    int precision = Integer.parseInt(m.group());
-    m.find();
-    int scale = Integer.parseInt(m.group());
-    int [] precScale = { precision, scale };
-    return precScale;
-  }
-
-  public static ColumnVector allocateColumnVector(String type, int defaultSize) {
-    if (type.equalsIgnoreCase("double")) {
-      return new DoubleColumnVector(defaultSize);
-    } else if (VectorizationContext.isStringFamily(type)) {
-      return new BytesColumnVector(defaultSize);
-    } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){
-      int [] precisionScale = getScalePrecisionFromDecimalType(type);
-      return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]);
-    } else if (type.equalsIgnoreCase("long") ||
-               type.equalsIgnoreCase("date") ||
-               type.equalsIgnoreCase("timestamp") ||
-               type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) ||
-               type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) {
-      return new LongColumnVector(defaultSize);
-    } else {
-      throw new RuntimeException("Cannot allocate vector column for " + type);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 1d5a9de..6ecfaf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -20,12 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,12 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
 import org.apache.hadoop.hive.ql.exec.HashTableLoader;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
@@ -51,15 +42,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized.VectorMapJoinOptimizedCreateHashTable;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -69,10 +57,8 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
@@ -576,7 +562,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
       smallTableVectorDeserializeRow =
           new VectorDeserializeRow<LazyBinaryDeserializeRead>(
               new LazyBinaryDeserializeRead(
-                  VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
+                  VectorizedBatchUtil.typeInfosFromTypeNames(
                       smallTableMapping.getTypeNames())));
       smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns());
     }
@@ -649,22 +635,12 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
    * build join output results in.
    */
   protected VectorizedRowBatch setupOverflowBatch() throws HiveException {
+
+    int initialColumnCount = vContext.firstOutputColumnIndex();
     VectorizedRowBatch overflowBatch;
 
-    Map<Integer, String> scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap();
-    int maxColumn = 0;
-    for (int i = 0; i < outputProjection.length; i++) {
-      int outputColumn = outputProjection[i];
-      if (maxColumn < outputColumn) {
-        maxColumn = outputColumn;
-      }
-    }
-    for (int outputColumn : scratchColumnTypeMap.keySet()) {
-      if (maxColumn < outputColumn) {
-        maxColumn = outputColumn;
-      }
-    }
-    overflowBatch = new VectorizedRowBatch(maxColumn + 1);
+    int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length;
+    overflowBatch = new VectorizedRowBatch(totalNumColumns);
 
     // First, just allocate just the projection columns we will be using.
     for (int i = 0; i < outputProjection.length; i++) {
@@ -674,9 +650,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     }
 
     // Now, add any scratch columns needed for children operators.
-    for (int outputColumn : scratchColumnTypeMap.keySet()) {
-      String typeName = scratchColumnTypeMap.get(outputColumn);
-      allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName);
+    int outputColumn = initialColumnCount;
+    for (String typeName : vOutContext.getScratchColumnTypeNames()) {
+      allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName);
     }
 
     overflowBatch.projectedColumns = outputProjection;
@@ -696,33 +672,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     if (overflowBatch.cols[outputColumn] == null) {
       typeName = VectorizationContext.mapTypeNameSynonyms(typeName);
 
-      String columnVectorTypeName;
-
       TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
-      Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
-
-      switch (columnVectorType) {
-      case LONG:
-        columnVectorTypeName = "long";
-        break;
-
-      case DOUBLE:
-        columnVectorTypeName = "double";
-        break;
-
-      case BYTES:
-        columnVectorTypeName = "string";
-        break;
-
-      case DECIMAL:
-        columnVectorTypeName = typeName;  // Keep precision and scale.
-        break;
-
-      default:
-        throw new HiveException("Unexpected column vector type " + columnVectorType);
-      }
 
-      overflowBatch.cols[outputColumn] = VectorizedRowBatchCtx.allocateColumnVector(columnVectorTypeName, VectorizedRowBatch.DEFAULT_SIZE);
+      overflowBatch.cols[outputColumn] = VectorizedBatchUtil.createColumnVector(typeInfo);
 
       if (isLogDebugEnabled) {
         LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator initializeOp overflowBatch outputColumn " + outputColumn + " class " + overflowBatch.cols[outputColumn].getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index b20cca4..2d9da84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 
 /**
@@ -413,8 +414,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 
   private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException {
 
-    PrimitiveTypeInfo[] inputObjInspectorsTypeInfos =
-        VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+    TypeInfo[] inputObjInspectorsTypeInfos =
+        VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                (StructObjectInspector) inputObjInspectors[posBigTable]);
 
     List<Integer> projectedColumns = vContext.getProjectedColumns();

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index a883124..b63deb2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -510,11 +510,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     // ensure filters are not set from previous pushFilters
     jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
     jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+
+    Utilities.unsetSchemaEvolution(jobConf);
+
     TableScanDesc scanDesc = tableScan.getConf();
     if (scanDesc == null) {
       return;
     }
 
+    Utilities.addTableSchemaToConf(jobConf, tableScan);
+
     // construct column name list and types for reference by filter push down
     Utilities.setColumnNameList(jobConf, tableScan);
     Utilities.setColumnTypeList(jobConf, tableScan);

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
index 9879dfe..8d94da8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
@@ -36,6 +36,17 @@ public final class IOConstants {
   public static final String AVRO = "AVRO";
   public static final String AVROFILE = "AVROFILE";
 
+  /**
+   * The desired TABLE column names and types for input format schema evolution.
+   * This is different than COLUMNS and COLUMNS_TYPES, which are based on individual partition
+   * metadata.
+   *
+   * Virtual columns and partition columns are not included
+   *
+   */
+  public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
+  public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
+
   @VisibleForTesting
   public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
new file mode 100644
index 0000000..6c455bd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+/**
+ * Marker interface to indicate a given input format is self-describing and
+ * can perform schema evolution itself.
+ */
+public interface SelfDescribingInputFormatInterface {
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
deleted file mode 100644
index e9e1d5a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * A MapReduce/Hive Vectorized input format for RC files.
- */
-public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
-  implements InputFormatChecker {
-
-  public VectorizedRCFileInputFormat() {
-    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job,
-      Reporter reporter) throws IOException {
-
-    reporter.setStatus(split.toString());
-
-    return new VectorizedRCFileRecordReader(job, (FileSplit) split);
-  }
-
-  @Override
-  public boolean validateInput(FileSystem fs, HiveConf conf,
-      List<FileStatus> files) throws IOException {
-    if (files.size() <= 0) {
-      return false;
-    }
-    for (int fileId = 0; fileId < files.size(); fileId++) {
-      RCFile.Reader reader = null;
-      try {
-        reader = new RCFile.Reader(fs, files.get(fileId)
-            .getPath(), conf);
-        reader.close();
-        reader = null;
-      } catch (IOException e) {
-        return false;
-      } finally {
-        if (null != reader) {
-          reader.close();
-        }
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
deleted file mode 100644
index 4cc1c2f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * RCFileRecordReader.
- */
-public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
-
-  private final Reader in;
-  private final long start;
-  private final long end;
-  private boolean more = true;
-  protected Configuration conf;
-  private final FileSplit split;
-  private final boolean useCache;
-  private VectorizedRowBatchCtx rbCtx;
-  private final LongWritable keyCache = new LongWritable();
-  private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
-  private boolean addPartitionCols = true;
-  private final DataOutputBuffer buffer = new DataOutputBuffer();
-
-  private static RCFileSyncCache syncCache = new RCFileSyncCache();
-
-  private static final class RCFileSyncEntry {
-    long end;
-    long endSync;
-  }
-
-  private static final class RCFileSyncCache {
-
-    private final Map<String, RCFileSyncEntry> cache;
-
-    public RCFileSyncCache() {
-      cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
-    }
-
-    public void put(FileSplit split, long endSync) {
-      Path path = split.getPath();
-      long end = split.getStart() + split.getLength();
-      String key = path.toString() + "+" + String.format("%d", end);
-
-      RCFileSyncEntry entry = new RCFileSyncEntry();
-      entry.end = end;
-      entry.endSync = endSync;
-      if (entry.endSync >= entry.end) {
-        cache.put(key, entry);
-      }
-    }
-
-    public long get(FileSplit split) {
-      Path path = split.getPath();
-      long start = split.getStart();
-      String key = path.toString() + "+" + String.format("%d", start);
-      RCFileSyncEntry entry = cache.get(key);
-      if (entry != null) {
-        return entry.endSync;
-      }
-      return -1;
-    }
-  }
-
-  public VectorizedRCFileRecordReader(Configuration conf, FileSplit split)
-      throws IOException {
-
-    Path path = split.getPath();
-    FileSystem fs = path.getFileSystem(conf);
-    this.in = new RCFile.Reader(fs, path, conf);
-    this.end = split.getStart() + split.getLength();
-    this.conf = conf;
-    this.split = split;
-
-    useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
-
-    if (split.getStart() > in.getPosition()) {
-      long oldSync = useCache ? syncCache.get(split) : -1;
-      if (oldSync == -1) {
-        in.sync(split.getStart()); // sync to start
-      } else {
-        in.seek(oldSync);
-      }
-    }
-
-    this.start = in.getPosition();
-
-    more = start < end;
-    try {
-      rbCtx = new VectorizedRowBatchCtx();
-      rbCtx.init(conf, split);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public Class<?> getKeyClass() {
-    return LongWritable.class;
-  }
-
-  public Class<?> getValueClass() {
-    return BytesRefArrayWritable.class;
-  }
-
-  @Override
-  public NullWritable createKey() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public VectorizedRowBatch createValue() {
-    VectorizedRowBatch result;
-    try {
-      result = rbCtx.createVectorizedRowBatch();
-    } catch (HiveException e) {
-      throw new RuntimeException("Error creating a batch", e);
-    }
-    return result;
-  }
-
-  public boolean nextBlock() throws IOException {
-    return in.nextBlock();
-  }
-
-  @Override
-  public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
-
-    // Reset column fields noNull values to true
-    VectorizedBatchUtil.setNoNullFields(value);
-    buffer.reset();
-    value.selectedInUse = false;
-    for (int i = 0; i < value.numCols; i++) {
-      value.cols[i].isRepeating = false;
-    }
-
-    int i = 0;
-    try {
-
-      for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
-        more = next(keyCache);
-        if (more) {
-          // Check and update partition cols if necessary. Ideally this should be done
-          // in CreateValue() as the partition is constant per split. But since Hive uses
-          // CombineHiveRecordReader and as this does not call CreateValue() for
-          // each new RecordReader it creates, this check is required in next()
-          if (addPartitionCols) {
-            rbCtx.addPartitionColsToBatch(value);
-            addPartitionCols = false;
-          }
-          in.getCurrentRow(colsCache);
-          // Currently RCFile reader does not support reading vectorized
-          // data. Populating the batch by adding one row at a time.
-          rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer);
-        } else {
-          break;
-        }
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Error while getting next row", e);
-    }
-    value.size = i;
-    return more;
-  }
-
-  protected boolean next(LongWritable key) throws IOException {
-    if (!more) {
-      return false;
-    }
-
-    more = in.next(key);
-
-    long lastSeenSyncPos = in.lastSeenSyncPos();
-
-    if (lastSeenSyncPos >= end) {
-      if (useCache) {
-        syncCache.put(split, lastSeenSyncPos);
-      }
-      more = false;
-      return more;
-    }
-    return more;
-  }
-
-  /**
-   * Return the progress within the input split.
-   *
-   * @return 0.0 to 1.0 of the input byte range
-   */
-  public float getProgress() throws IOException {
-    if (end == start) {
-      return 0.0f;
-    } else {
-      return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
-    }
-  }
-
-  public long getPos() throws IOException {
-    return in.getPosition();
-  }
-
-  public KeyBuffer getKeyBuffer() {
-    return in.getCurrentKeyBufferObj();
-  }
-
-  protected void seek(long pos) throws IOException {
-    in.seek(pos);
-  }
-
-  public void sync(long pos) throws IOException {
-    in.sync(pos);
-  }
-
-  public void resetBuffer() {
-    in.resetBuffer();
-  }
-
-  public long getStart() {
-    return start;
-  }
-
-  public void close() throws IOException {
-    in.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
deleted file mode 100644
index aaf4eb4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Factory for creating ORC tree readers. These tree readers can handle type promotions and type
- * conversions.
- */
-public class ConversionTreeReaderFactory extends TreeReaderFactory {
-
-  // TODO: This is currently only a place holder for type conversions.
-
-  public static TreeReader createTreeReader(int columnId,
-      List<OrcProto.Type> types,
-      boolean[] included,
-      boolean skipCorrupt
-  ) throws IOException {
-    return TreeReaderFactory.createTreeReader(columnId, types, included, skipCorrupt);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index b4dd4ab..56ac40b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -402,7 +402,7 @@ public final class OrcFile {
     public WriterOptions inspector(ObjectInspector value) {
       this.inspector = value;
       if (!explicitSchema) {
-        schema = OrcOutputFormat.convertTypeInfo(
+        schema = OrcUtils.convertTypeInfo(
             TypeInfoUtils.getTypeInfoFromObjectInspector(value));
       }
       return this;

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 46862da..714af23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -61,8 +61,10 @@ import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -73,7 +75,6 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -116,7 +117,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface,
-    AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
+  SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>,
+  CombineHiveInputFormat.AvoidSplitCombination {
 
   static enum SplitStrategyKind {
     HYBRID,
@@ -232,7 +234,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                                   Configuration conf,
                                                   long offset, long length
                                                   ) throws IOException {
+
+    /**
+     * Do we have schema on read in the configuration variables?
+     */
+    TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+
     Reader.Options options = new Reader.Options().range(offset, length);
+    options.schema(schema);
     boolean isOriginal = isOriginal(file);
     List<OrcProto.Type> types = file.getTypes();
     options.include(genIncludedColumns(types, conf, isOriginal));
@@ -1415,7 +1424,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     if (vectorMode) {
       return (org.apache.hadoop.mapred.RecordReader)
-          new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit);
+          new VectorizedOrcAcidRowReader(inner, conf,
+              Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
     }
     return new NullKeyRecordReader(inner, conf);
   }
@@ -1467,10 +1477,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
+  // The schema type description does not include the ACID fields (i.e. it is the
+  // non-ACID original schema).
+  private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
 
   @Override
   public RowReader<OrcStruct> getReader(InputSplit inputSplit,
-                                        Options options) throws IOException {
+                                        Options options)
+                                            throws IOException {
     final OrcSplit split = (OrcSplit) inputSplit;
     final Path path = split.getPath();
     Path root;
@@ -1485,36 +1499,30 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
     final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
     final Configuration conf = options.getConfiguration();
+
+
+    /**
+     * Do we have schema on read in the configuration variables?
+     */
+    TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+
     final Reader reader;
     final int bucket;
-    Reader.Options readOptions = new Reader.Options();
+    Reader.Options readOptions = new Reader.Options().schema(schema);
     readOptions.range(split.getStart(), split.getLength());
+
+    // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+    final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+    readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+    setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+
     if (split.hasBase()) {
       bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
           .getBucket();
       reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
-      final List<OrcProto.Type> types = reader.getTypes();
-      readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
-      setSearchArgument(readOptions, types, conf, split.isOriginal());
     } else {
       bucket = (int) split.getStart();
       reader = null;
-      if(deltas != null && deltas.length > 0) {
-        Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
-        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
-        FileSystem fs = readerOptions.getFilesystem();
-        if(fs == null) {
-          fs = path.getFileSystem(options.getConfiguration());
-        }
-        if(fs.exists(bucketPath)) {
-        /* w/o schema evolution (which ACID doesn't support yet) all delta
-        files have the same schema, so choosing the 1st one*/
-          final List<OrcProto.Type> types =
-            OrcFile.createReader(bucketPath, readerOptions).getTypes();
-          readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
-          setSearchArgument(readOptions, types, conf, split.isOriginal());
-        }
-      }
     }
     String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
                                 Long.MAX_VALUE + ":");
@@ -1527,9 +1535,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
       @Override
       public ObjectInspector getObjectInspector() {
-        return ((StructObjectInspector) records.getObjectInspector())
-            .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
-            .getFieldObjectInspector();
+        return OrcStruct.createObjectInspector(0, schemaTypes);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 8a5de7f..2d0eaaf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -68,88 +68,6 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
 
   private static final Logger LOG = LoggerFactory.getLogger(OrcOutputFormat.class);
 
-  static TypeDescription convertTypeInfo(TypeInfo info) {
-    switch (info.getCategory()) {
-      case PRIMITIVE: {
-        PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
-        switch (pinfo.getPrimitiveCategory()) {
-          case BOOLEAN:
-            return TypeDescription.createBoolean();
-          case BYTE:
-            return TypeDescription.createByte();
-          case SHORT:
-            return TypeDescription.createShort();
-          case INT:
-            return TypeDescription.createInt();
-          case LONG:
-            return TypeDescription.createLong();
-          case FLOAT:
-            return TypeDescription.createFloat();
-          case DOUBLE:
-            return TypeDescription.createDouble();
-          case STRING:
-            return TypeDescription.createString();
-          case DATE:
-            return TypeDescription.createDate();
-          case TIMESTAMP:
-            return TypeDescription.createTimestamp();
-          case BINARY:
-            return TypeDescription.createBinary();
-          case DECIMAL: {
-            DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
-            return TypeDescription.createDecimal()
-                .withScale(dinfo.getScale())
-                .withPrecision(dinfo.getPrecision());
-          }
-          case VARCHAR: {
-            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
-            return TypeDescription.createVarchar()
-                .withMaxLength(cinfo.getLength());
-          }
-          case CHAR: {
-            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
-            return TypeDescription.createChar()
-                .withMaxLength(cinfo.getLength());
-          }
-          default:
-            throw new IllegalArgumentException("ORC doesn't handle primitive" +
-                " category " + pinfo.getPrimitiveCategory());
-        }
-      }
-      case LIST: {
-        ListTypeInfo linfo = (ListTypeInfo) info;
-        return TypeDescription.createList
-            (convertTypeInfo(linfo.getListElementTypeInfo()));
-      }
-      case MAP: {
-        MapTypeInfo minfo = (MapTypeInfo) info;
-        return TypeDescription.createMap
-            (convertTypeInfo(minfo.getMapKeyTypeInfo()),
-                convertTypeInfo(minfo.getMapValueTypeInfo()));
-      }
-      case UNION: {
-        UnionTypeInfo minfo = (UnionTypeInfo) info;
-        TypeDescription result = TypeDescription.createUnion();
-        for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
-          result.addUnionChild(convertTypeInfo(child));
-        }
-        return result;
-      }
-      case STRUCT: {
-        StructTypeInfo sinfo = (StructTypeInfo) info;
-        TypeDescription result = TypeDescription.createStruct();
-        for(String fieldName: sinfo.getAllStructFieldNames()) {
-          result.addField(fieldName,
-              convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
-        }
-        return result;
-      }
-      default:
-        throw new IllegalArgumentException("ORC doesn't handle " +
-            info.getCategory());
-    }
-  }
-
   private static class OrcRecordWriter
       implements RecordWriter<NullWritable, OrcSerdeRow>,
                  StatsProvidingRecordWriter {
@@ -242,7 +160,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
         TypeDescription schema = TypeDescription.createStruct();
         for (int i = 0; i < columnNames.size(); ++i) {
           schema.addField(columnNames.get(i),
-              convertTypeInfo(columnTypes.get(i)));
+              OrcUtils.convertTypeInfo(columnTypes.get(i)));
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("ORC schema = " + schema);

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index ebe1afd..bc4d2ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -25,23 +26,16 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -57,6 +51,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   private final Configuration conf;
   private final boolean collapse;
   private final RecordReader baseReader;
+  private final ObjectInspector objectInspector;
   private final long offset;
   private final long length;
   private final ValidTxnList validTxnList;
@@ -443,6 +438,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.offset = options.getOffset();
     this.length = options.getLength();
     this.validTxnList = validTxnList;
+
+    TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf);
+    if (typeDescr == null) {
+      throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
+    }
+
+    objectInspector = OrcRecordUpdater.createEventSchema
+        (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+
     // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
     if (reader == null) {
@@ -675,46 +679,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
   @Override
   public ObjectInspector getObjectInspector() {
-    // Read the configuration parameters
-    String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
-    // NOTE: if "columns.types" is missing, all columns will be of String type
-    String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
-
-    // Parse the configuration parameters
-    ArrayList<String> columnNames = new ArrayList<String>();
-    Deque<Integer> virtualColumns = new ArrayDeque<Integer>();
-    if (columnNameProperty != null && columnNameProperty.length() > 0) {
-      String[] colNames = columnNameProperty.split(",");
-      for (int i = 0; i < colNames.length; i++) {
-        if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) {
-          virtualColumns.addLast(i);
-        } else {
-          columnNames.add(colNames[i]);
-        }
-      }
-    }
-    if (columnTypeProperty == null) {
-      // Default type: all string
-      StringBuilder sb = new StringBuilder();
-      for (int i = 0; i < columnNames.size(); i++) {
-        if (i > 0) {
-          sb.append(":");
-        }
-        sb.append("string");
-      }
-      columnTypeProperty = sb.toString();
-    }
-
-    ArrayList<TypeInfo> fieldTypes =
-        TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
-    while (virtualColumns.size() > 0) {
-      fieldTypes.remove(virtualColumns.removeLast());
-    }
-    StructTypeInfo rowType = new StructTypeInfo();
-    rowType.setAllStructFieldNames(columnNames);
-    rowType.setAllStructFieldTypeInfos(fieldTypes);
-    return OrcRecordUpdater.createEventSchema
-        (OrcStruct.createObjectInspector(rowType));
+    return objectInspector;
   }
 
   @Override