You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:48 UTC

[17/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachan

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
index 525b3c5..41dc3e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
@@ -17,12 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
 /**
  * This class represents a nullable double precision floating point column vector.
  * This class will be used for operations on all floating point types (float, double)
@@ -36,7 +33,6 @@ import org.apache.hadoop.io.Writable;
  */
 public class DoubleColumnVector extends ColumnVector {
   public double[] vector;
-  private final DoubleWritable writableObj = new DoubleWritable();
   public static final double NULL_VALUE = Double.NaN;
 
   /**
@@ -57,19 +53,6 @@ public class DoubleColumnVector extends ColumnVector {
     vector = new double[len];
   }
 
-  @Override
-  public Writable getWritableObject(int index) {
-    if (this.isRepeating) {
-      index = 0;
-    }
-    if (!noNulls && isNull[index]) {
-      return NullWritable.get();
-    } else {
-      writableObj.set(vector[index]);
-      return writableObj;
-    }
-  }
-
   // Copy the current object contents into the output. Only copy selected entries,
   // as indicated by selectedInUse and the sel array.
   public void copySelected(
@@ -121,6 +104,14 @@ public class DoubleColumnVector extends ColumnVector {
     vector[0] = value;
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = NULL_VALUE;
+    isNull[0] = true;
+  }
+
   // Simplify vector by brute-force flattening noNulls and isRepeating
   // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
   // with many arguments.
@@ -144,6 +135,44 @@ public class DoubleColumnVector extends ColumnVector {
 
   @Override
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
-    vector[outElementNum] = ((DoubleColumnVector) inputVector).vector[inputElementNum];
+    if (inputVector.isRepeating) {
+      inputElementNum = 0;
+    }
+    if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+      isNull[outElementNum] = false;
+      vector[outElementNum] =
+          ((DoubleColumnVector) inputVector).vector[inputElementNum];
+    } else {
+      isNull[outElementNum] = true;
+      noNulls = false;
+    }
+  }
+
+  @Override
+  public void stringifyValue(StringBuilder buffer, int row) {
+    if (isRepeating) {
+      row = 0;
+    }
+    if (noNulls || !isNull[row]) {
+      buffer.append(vector[row]);
+    } else {
+      buffer.append("null");
+    }
+  }
+
+  @Override
+  public void ensureSize(int size, boolean preserveData) {
+    if (size > vector.length) {
+      super.ensureSize(size, preserveData);
+      double[] oldArray = vector;
+      vector = new double[size];
+      if (preserveData) {
+        if (isRepeating) {
+          vector[0] = oldArray[0];
+        } else {
+          System.arraycopy(oldArray, 0, vector, 0 , oldArray.length);
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
index f0545fe..0afe5db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
@@ -17,12 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
 /**
  * This class represents a nullable int column vector.
  * This class will be used for operations on all integer types (tinyint, smallint, int, bigint)
@@ -36,7 +33,6 @@ import org.apache.hadoop.io.Writable;
  */
 public class LongColumnVector extends ColumnVector {
   public long[] vector;
-  private final LongWritable writableObj = new LongWritable();
   public static final long NULL_VALUE = 1;
 
   /**
@@ -50,26 +46,13 @@ public class LongColumnVector extends ColumnVector {
   /**
    * Don't use this except for testing purposes.
    *
-   * @param len
+   * @param len the number of rows
    */
   public LongColumnVector(int len) {
     super(len);
     vector = new long[len];
   }
 
-  @Override
-  public Writable getWritableObject(int index) {
-    if (this.isRepeating) {
-      index = 0;
-    }
-    if (!noNulls && isNull[index]) {
-      return NullWritable.get();
-    } else {
-      writableObj.set(vector[index]);
-      return writableObj;
-    }
-  }
-
   // Copy the current object contents into the output. Only copy selected entries,
   // as indicated by selectedInUse and the sel array.
   public void copySelected(
@@ -141,7 +124,9 @@ public class LongColumnVector extends ColumnVector {
       }
     }
     else {
-      System.arraycopy(vector, 0, output.vector, 0, size);
+      for(int i = 0; i < size; ++i) {
+        output.vector[i] = vector[i];
+      }
     }
 
     // Copy nulls over if needed
@@ -165,6 +150,14 @@ public class LongColumnVector extends ColumnVector {
     vector[0] = value;
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = NULL_VALUE;
+    isNull[0] = true;
+  }
+
   // Simplify vector by brute-force flattening noNulls and isRepeating
   // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
   // with many arguments.
@@ -188,6 +181,44 @@ public class LongColumnVector extends ColumnVector {
 
   @Override
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
-    vector[outElementNum] = ((LongColumnVector) inputVector).vector[inputElementNum];
+    if (inputVector.isRepeating) {
+      inputElementNum = 0;
+    }
+    if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+      isNull[outElementNum] = false;
+      vector[outElementNum] =
+          ((LongColumnVector) inputVector).vector[inputElementNum];
+    } else {
+      isNull[outElementNum] = true;
+      noNulls = false;
+    }
+  }
+
+  @Override
+  public void stringifyValue(StringBuilder buffer, int row) {
+    if (isRepeating) {
+      row = 0;
+    }
+    if (noNulls || !isNull[row]) {
+      buffer.append(vector[row]);
+    } else {
+      buffer.append("null");
+    }
+  }
+
+  @Override
+  public void ensureSize(int size, boolean preserveData) {
+    if (size > vector.length) {
+      super.ensureSize(size, preserveData);
+      long[] oldArray = vector;
+      vector = new long[size];
+      if (preserveData) {
+        if (isRepeating) {
+          vector[0] = oldArray[0];
+        } else {
+          System.arraycopy(oldArray, 0, vector, 0 , oldArray.length);
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index 8452abd..56cf9ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -31,15 +31,10 @@ import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hive.common.util.DateUtils;
 
@@ -61,12 +56,12 @@ public class VectorDeserializeRow {
 
   private Reader[] readersByValue;
   private Reader[] readersByReference;
-  private PrimitiveTypeInfo[] primitiveTypeInfos;
+  private TypeInfo[] typeInfos;
 
   public VectorDeserializeRow(DeserializeRead deserializeRead) {
     this();
     this.deserializeRead = deserializeRead;
-    primitiveTypeInfos = deserializeRead.primitiveTypeInfos();
+    typeInfos = deserializeRead.typeInfos();
     
   }
 
@@ -564,7 +559,7 @@ public class VectorDeserializeRow {
     Reader readerByValue = null;
     Reader readerByReference = null;
 
-    PrimitiveTypeInfo primitiveTypeInfo = primitiveTypeInfos[index];
+    PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfos[index];
     PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
     switch (primitiveCategory) {
     // case VOID:
@@ -642,10 +637,10 @@ public class VectorDeserializeRow {
 
   public void init(int[] outputColumns) throws HiveException {
 
-    readersByValue = new Reader[primitiveTypeInfos.length];
-    readersByReference = new Reader[primitiveTypeInfos.length];
+    readersByValue = new Reader[typeInfos.length];
+    readersByReference = new Reader[typeInfos.length];
 
-    for (int i = 0; i < primitiveTypeInfos.length; i++) {
+    for (int i = 0; i < typeInfos.length; i++) {
       int outputColumn = outputColumns[i];
       addReader(i, outputColumn);
     }
@@ -653,10 +648,10 @@ public class VectorDeserializeRow {
 
   public void init(List<Integer> outputColumns) throws HiveException {
 
-    readersByValue = new Reader[primitiveTypeInfos.length];
-    readersByReference = new Reader[primitiveTypeInfos.length];
+    readersByValue = new Reader[typeInfos.length];
+    readersByReference = new Reader[typeInfos.length];
 
-    for (int i = 0; i < primitiveTypeInfos.length; i++) {
+    for (int i = 0; i < typeInfos.length; i++) {
       int outputColumn = outputColumns.get(i);
       addReader(i, outputColumn);
     }
@@ -664,10 +659,10 @@ public class VectorDeserializeRow {
 
   public void init(int startColumn) throws HiveException {
 
-    readersByValue = new Reader[primitiveTypeInfos.length];
-    readersByReference = new Reader[primitiveTypeInfos.length];
+    readersByValue = new Reader[typeInfos.length];
+    readersByReference = new Reader[typeInfos.length];
 
-    for (int i = 0; i < primitiveTypeInfos.length; i++) {
+    for (int i = 0; i < typeInfos.length; i++) {
       int outputColumn = startColumn + i;
       addReader(i, outputColumn);
     }
@@ -709,14 +704,14 @@ public class VectorDeserializeRow {
 
   private void throwMoreDetailedException(IOException e, int index) throws EOFException {
     StringBuilder sb = new StringBuilder();
-    sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " +  primitiveTypeInfos.length + " fields (");
-    for (int i = 0; i < primitiveTypeInfos.length; i++) {
+    sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " +  typeInfos.length + " fields (");
+    for (int i = 0; i < typeInfos.length; i++) {
       if (i > 0) {
         sb.append(", ");
       }
-      sb.append(primitiveTypeInfos[i].getPrimitiveCategory().name());
+      sb.append(((PrimitiveTypeInfo) typeInfos[i]).getPrimitiveCategory().name());
     }
     sb.append(")");
     throw new EOFException(sb.toString());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index ee6939d..9774f0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -468,6 +468,9 @@ public abstract class VectorExtractRow {
         int start = colVector.start[adjustedIndex];
         int length = colVector.length[adjustedIndex];
 
+        if (value == null) {
+          LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex);
+        }
         // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
         text.set(value, start, length);
 
@@ -727,9 +730,9 @@ public abstract class VectorExtractRow {
   }
 
   public void extractRow(int batchIndex, Object[] objects) {
-    int i = 0;
-    for (Extractor extracter : extracters) {
-      objects[i++] = extracter.extract(batchIndex);
+    for (int i = 0; i < extracters.length; i++) {
+      Extractor extracter = extracters[i];
+      objects[i] = extracter.extract(batchIndex);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 39a83e3..fa66964 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -814,7 +814,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
           outputFieldNames, objectInspectors);
       if (isVectorOutput) {
         vrbCtx = new VectorizedRowBatchCtx();
-        vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector);
+        vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
         outputBatch = vrbCtx.createVectorizedRowBatch();
         vectorAssignRowSameBatch = new VectorAssignRowSameBatch();
         vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index 0baec2c..9920e9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -91,7 +91,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
     Collection<Future<?>> result = super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
-    vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+    vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
 
     outputBatch = vrbCtx.createVectorizedRowBatch();
 
@@ -182,4 +182,4 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
   public VectorizationContext getOuputVectorizationContext() {
     return vOutContext;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index 804ba17..66190ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -146,7 +146,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
     Collection<Future<?>> result = super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
-    vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+    vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
 
     outputBatch = vrbCtx.createVectorizedRowBatch();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
index 342bf67..5586944 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.List;
 
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -604,13 +602,13 @@ public class VectorSerializeRow {
     }
   }
 
-  public void init(PrimitiveTypeInfo[] primitiveTypeInfos, List<Integer> columnMap)
+  public void init(TypeInfo[] typeInfos, int[] columnMap)
       throws HiveException {
 
-    writers = new Writer[primitiveTypeInfos.length];
-    for (int i = 0; i < primitiveTypeInfos.length; i++) {
-      int columnIndex = columnMap.get(i);
-      Writer writer = createWriter(primitiveTypeInfos[i], columnIndex);
+    writers = new Writer[typeInfos.length];
+    for (int i = 0; i < typeInfos.length; i++) {
+      int columnIndex = columnMap[i];
+      Writer writer = createWriter(typeInfos[i], columnIndex);
       writers[i] = writer;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index da89e38..ea03099 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -145,6 +145,8 @@ public class VectorizationContext {
 
   VectorExpressionDescriptor vMap;
 
+  private List<String> initialColumnNames;
+
   private List<Integer> projectedColumns;
   private List<String> projectionColumnNames;
   private Map<String, Integer> projectionColumnMap;
@@ -158,7 +160,11 @@ public class VectorizationContext {
   public VectorizationContext(String contextName, List<String> initialColumnNames) {
     this.contextName = contextName;
     level = 0;
-    LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level + " initialColumnNames " + initialColumnNames.toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("VectorizationContext consructor contextName " + contextName + " level "
+          + level + " initialColumnNames " + initialColumnNames);
+    }
+    this.initialColumnNames = initialColumnNames;
     this.projectionColumnNames = initialColumnNames;
 
     projectedColumns = new ArrayList<Integer>();
@@ -178,8 +184,11 @@ public class VectorizationContext {
   public VectorizationContext(String contextName) {
     this.contextName = contextName;
     level = 0;
-    LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level);
-      projectedColumns = new ArrayList<Integer>();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level);
+    }
+    initialColumnNames = new ArrayList<String>();
+    projectedColumns = new ArrayList<Integer>();
     projectionColumnNames = new ArrayList<String>();
     projectionColumnMap = new HashMap<String, Integer>();
     this.ocm = new OutputColumnManager(0);
@@ -194,6 +203,7 @@ public class VectorizationContext {
     this.contextName = contextName;
     level = vContext.level + 1;
     LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level);
+    this.initialColumnNames = vContext.initialColumnNames;
     this.projectedColumns = new ArrayList<Integer>();
     this.projectionColumnNames = new ArrayList<String>();
     this.projectionColumnMap = new HashMap<String, Integer>();
@@ -206,6 +216,7 @@ public class VectorizationContext {
   // Add an initial column to a vectorization context when
   // a vectorized row batch is being created.
   public void addInitialColumn(String columnName) {
+    initialColumnNames.add(columnName);
     int index = projectedColumns.size();
     projectedColumns.add(index);
     projectionColumnNames.add(columnName);
@@ -234,6 +245,10 @@ public class VectorizationContext {
     projectionColumnMap.put(columnName, vectorBatchColIndex);
   }
 
+  public List<String> getInitialColumnNames() {
+    return initialColumnNames;
+  }
+
   public List<Integer> getProjectedColumns() {
     return projectedColumns;
   }
@@ -2303,36 +2318,51 @@ public class VectorizationContext {
   }
 
   public static ColumnVector.Type getColumnVectorTypeFromTypeInfo(TypeInfo typeInfo) throws HiveException {
-    PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
-    PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
-
-    switch (primitiveCategory) {
-    case BOOLEAN:
-    case BYTE:
-    case SHORT:
-    case INT:
-    case LONG:
-    case DATE:
-    case TIMESTAMP:
-    case INTERVAL_YEAR_MONTH:
-    case INTERVAL_DAY_TIME:
-      return ColumnVector.Type.LONG;
-
-    case FLOAT:
-    case DOUBLE:
-      return ColumnVector.Type.DOUBLE;
-
-    case STRING:
-    case CHAR:
-    case VARCHAR:
-    case BINARY:
-      return ColumnVector.Type.BYTES;
-
-    case DECIMAL:
-      return ColumnVector.Type.DECIMAL;
-
-    default:
-      throw new HiveException("Unexpected primitive type category " + primitiveCategory);
+    switch (typeInfo.getCategory()) {
+      case STRUCT:
+        return Type.STRUCT;
+      case UNION:
+        return Type.UNION;
+      case LIST:
+        return Type.LIST;
+      case MAP:
+        return Type.MAP;
+      case PRIMITIVE: {
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+        PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
+
+        switch (primitiveCategory) {
+          case BOOLEAN:
+          case BYTE:
+          case SHORT:
+          case INT:
+          case LONG:
+          case DATE:
+          case TIMESTAMP:
+          case INTERVAL_YEAR_MONTH:
+          case INTERVAL_DAY_TIME:
+            return ColumnVector.Type.LONG;
+
+          case FLOAT:
+          case DOUBLE:
+            return ColumnVector.Type.DOUBLE;
+
+          case STRING:
+          case CHAR:
+          case VARCHAR:
+          case BINARY:
+            return ColumnVector.Type.BYTES;
+
+          case DECIMAL:
+            return ColumnVector.Type.DECIMAL;
+
+          default:
+            throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
+        }
+      }
+      default:
+        throw new RuntimeException("Unexpected type category " +
+            typeInfo.getCategory());
     }
   }
 
@@ -2442,13 +2472,16 @@ public class VectorizationContext {
     return firstOutputColumnIndex;
   }
 
-  public Map<Integer, String> getScratchColumnTypeMap() {
-    Map<Integer, String> map = new HashMap<Integer, String>();
+  public String[] getScratchColumnTypeNames() {
+    String[] result = new String[ocm.outputColCount];
     for (int i = 0; i < ocm.outputColCount; i++) {
-      String type = ocm.outputColumnsTypes[i];
-      map.put(i+this.firstOutputColumnIndex, type);
+      String typeName = ocm.outputColumnsTypes[i];
+      if (typeName.equalsIgnoreCase("long")) {
+        typeName = "bigint";   // Convert our synonym to a real Hive type name.
+      }
+      result[i] =  typeName;
     }
-    return map;
+    return result;
   }
 
   @Override
@@ -2468,9 +2501,7 @@ public class VectorizationContext {
     }
     sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", ");
 
-    Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
-    sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap());
-    sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap);
+    sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString());
 
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 3d7e4f0..b7e13dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -53,9 +53,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspect
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -111,137 +115,55 @@ public class VectorizedBatchUtil {
     batch.size = size;
   }
 
-  /**
-   * Walk through the object inspector and add column vectors
-   *
-   * @param oi
-   * @param cvList
-   *          ColumnVectors are populated in this list
-   */
-  private static void allocateColumnVector(StructObjectInspector oi,
-      List<ColumnVector> cvList) throws HiveException {
-    if (cvList == null) {
-      throw new HiveException("Null columnvector list");
-    }
-    if (oi == null) {
-      return;
-    }
-    final List<? extends StructField> fields = oi.getAllStructFieldRefs();
-    for(StructField field : fields) {
-      ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
-      switch(fieldObjectInspector.getCategory()) {
-      case PRIMITIVE:
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector;
-        switch(poi.getPrimitiveCategory()) {
-        case BOOLEAN:
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-        case TIMESTAMP:
-        case DATE:
-        case INTERVAL_YEAR_MONTH:
-        case INTERVAL_DAY_TIME:
-          cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
-          break;
-        case FLOAT:
-        case DOUBLE:
-          cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
-          break;
-        case BINARY:
-        case STRING:
-        case CHAR:
-        case VARCHAR:
-          cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
-          break;
-        case DECIMAL:
-          DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
-          cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-              tInfo.precision(), tInfo.scale()));
-          break;
-        default:
-          throw new HiveException("Vectorizaton is not supported for datatype:"
-              + poi.getPrimitiveCategory());
-        }
-        break;
-      case STRUCT:
-        throw new HiveException("Struct not supported");
-      default:
-        throw new HiveException("Flattening is not supported for datatype:"
-            + fieldObjectInspector.getCategory());
-      }
-    }
-  }
-
-
-  /**
-   * Create VectorizedRowBatch from ObjectInspector
-   *
-   * @param oi
-   * @return
-   * @throws HiveException
-   */
-  public static VectorizedRowBatch constructVectorizedRowBatch(
-      StructObjectInspector oi) throws HiveException {
-    final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
-    allocateColumnVector(oi, cvList);
-    final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
-    int i = 0;
-    for(ColumnVector cv : cvList) {
-      result.cols[i++] = cv;
-    }
-    return result;
-  }
+  public static ColumnVector createColumnVector(String typeName) {
+    typeName = typeName.toLowerCase();
 
-  /**
-   * Create VectorizedRowBatch from key and value object inspectors
-   * The row object inspector used by ReduceWork needs to be a **standard**
-   * struct object inspector, not just any struct object inspector.
-   * @param keyInspector
-   * @param valueInspector
-   * @param vectorScratchColumnTypeMap
-   * @return VectorizedRowBatch, OI
-   * @throws HiveException
-   */
-  public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
-      StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
-          throws HiveException {
-
-    ArrayList<String> colNames = new ArrayList<String>();
-    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-    List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
-    for (StructField field: fields) {
-      colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
-      ois.add(field.getFieldObjectInspector());
+    // Allow undecorated CHAR and VARCHAR to support scratch column type names.
+    if (typeName.equals("char") || typeName.equals("varchar")) {
+      return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
     }
-    fields = valueInspector.getAllStructFieldRefs();
-    for (StructField field: fields) {
-      colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
-      ois.add(field.getFieldObjectInspector());
-    }
-    StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
 
-    VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
-    batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
-    return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
+    TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+    return createColumnVector(typeInfo);
   }
 
-  /**
-   * Iterates through all columns in a given row and populates the batch
-   *
-   * @param row
-   * @param oi
-   * @param rowIndex
-   * @param batch
-   * @param buffer
-   * @throws HiveException
-   */
-  public static void addRowToBatch(Object row, StructObjectInspector oi,
-          int rowIndex,
-          VectorizedRowBatch batch,
-          DataOutputBuffer buffer
-          ) throws HiveException {
-    addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
+  public static ColumnVector createColumnVector(TypeInfo typeInfo) {
+    switch(typeInfo.getCategory()) {
+    case PRIMITIVE:
+      {
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+        switch(primitiveTypeInfo.getPrimitiveCategory()) {
+          case BOOLEAN:
+          case BYTE:
+          case SHORT:
+          case INT:
+          case LONG:
+          case TIMESTAMP:
+          case DATE:
+          case INTERVAL_YEAR_MONTH:
+          case INTERVAL_DAY_TIME:
+            return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+          case FLOAT:
+          case DOUBLE:
+            return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+          case BINARY:
+          case STRING:
+          case CHAR:
+          case VARCHAR:
+            return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+          case DECIMAL:
+            DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo;
+            return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+                tInfo.precision(), tInfo.scale());
+          default:
+            throw new RuntimeException("Vectorizaton is not supported for datatype:"
+                + primitiveTypeInfo.getPrimitiveCategory());
+        }
+      }
+    default:
+      throw new RuntimeException("Vectorization is not supported for datatype:"
+          + typeInfo.getCategory());
+    }
   }
 
   /**
@@ -584,31 +506,30 @@ public class VectorizedBatchUtil {
     return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
   }
 
-  public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
+  public static String[] columnNamesFromStructObjectInspector(
       StructObjectInspector structObjectInspector) throws HiveException {
 
     List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
-    PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
+    String[] result = new String[fields.size()];
 
     int i = 0;
     for(StructField field : fields) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
-          field.getFieldObjectInspector().getTypeName());
-      result[i++] =  (PrimitiveTypeInfo) typeInfo;
+      result[i++] =  field.getFieldName();
     }
     return result;
   }
 
-  public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
-      String[] typeNames) throws HiveException {
-
-    PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
+  public static TypeInfo[] typeInfosFromTypeNames(String[] typeNames) throws HiveException {
+    ArrayList<TypeInfo> typeInfoList =
+        TypeInfoUtils.typeInfosFromTypeNames(Arrays.asList(typeNames));
+    return typeInfoList.toArray(new TypeInfo[0]);
+  }
 
-    for(int i = 0; i < typeNames.length; i++) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
-      result[i] =  (PrimitiveTypeInfo) typeInfo;
-    }
-    return result;
+  public static TypeInfo[] typeInfosFromStructObjectInspector(
+      StructObjectInspector structObjectInspector) {
+    ArrayList<TypeInfo> typeInfoList =
+        TypeInfoUtils.typeInfosFromStructObjectInspector(structObjectInspector);
+    return typeInfoList.toArray(new TypeInfo[0]);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
deleted file mode 100644
index 5ce7553..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazyDate;
-import org.apache.hadoop.hive.serde2.lazy.LazyLong;
-import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * VectorizedColumnarSerDe is used by Vectorized query execution engine
- * for columnar based storage supported by RCFile.
- */
-public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde {
-
-  public VectorizedColumnarSerDe() throws SerDeException {
-  }
-
-  private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE];
-  private final ObjectWritable ow = new ObjectWritable();
-  private final ByteStream.Output serializeVectorStream = new ByteStream.Output();
-
-  /**
-   * Serialize a vectorized row batch
-   *
-   * @param vrg
-   *          Vectorized row batch to serialize
-   * @param objInspector
-   *          The ObjectInspector for the row object
-   * @return The serialized Writable object
-   * @throws SerDeException
-   * @see SerDe#serialize(Object, ObjectInspector)
-   */
-  @Override
-  public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
-      throws SerDeException {
-    try {
-      // Validate that the OI is of struct type
-      if (objInspector.getCategory() != Category.STRUCT) {
-        throw new UnsupportedOperationException(getClass().toString()
-            + " can only serialize struct types, but we got: "
-            + objInspector.getTypeName());
-      }
-
-      VectorizedRowBatch batch = (VectorizedRowBatch) vrg;
-      StructObjectInspector soi = (StructObjectInspector) objInspector;
-      List<? extends StructField> fields = soi.getAllStructFieldRefs();
-
-      // Reset the byte buffer
-      serializeVectorStream.reset();
-      int count = 0;
-      int rowIndex = 0;
-      for (int i = 0; i < batch.size; i++) {
-
-        // If selectedInUse is true then we need to serialize only
-        // the selected indexes
-        if (batch.selectedInUse) {
-          rowIndex = batch.selected[i];
-        } else {
-          rowIndex = i;
-        }
-
-        BytesRefArrayWritable byteRow = byteRefArray[i];
-        int numCols = fields.size();
-
-        if (byteRow == null) {
-          byteRow = new BytesRefArrayWritable(numCols);
-          byteRefArray[i] = byteRow;
-        }
-
-        byteRow.resetValid(numCols);
-
-        for (int p = 0; p < batch.projectionSize; p++) {
-          int k = batch.projectedColumns[p];
-          ObjectInspector foi = fields.get(k).getFieldObjectInspector();
-          ColumnVector currentColVector = batch.cols[k];
-
-          switch (foi.getCategory()) {
-          case PRIMITIVE: {
-            PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-            if (!currentColVector.noNulls
-                && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) {
-              // The column is null hence write null value
-              serializeVectorStream.write(new byte[0], 0, 0);
-            } else {
-              // If here then the vector value is not null.
-              if (currentColVector.isRepeating) {
-                // If the vector has repeating values then set rowindex to zero
-                rowIndex = 0;
-              }
-
-              switch (poi.getPrimitiveCategory()) {
-              case BOOLEAN: {
-                LongColumnVector lcv = (LongColumnVector) batch.cols[k];
-                // In vectorization true is stored as 1 and false as 0
-                boolean b = lcv.vector[rowIndex] == 1 ? true : false;
-                if (b) {
-                  serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
-                } else {
-                  serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
-                }
-              }
-                break;
-              case BYTE:
-              case SHORT:
-              case INT:
-              case LONG:
-                LongColumnVector lcv = (LongColumnVector) batch.cols[k];
-                LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]);
-                break;
-              case FLOAT:
-              case DOUBLE:
-                DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k];
-                ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex]));
-                serializeVectorStream.write(b.array(), 0, b.limit());
-                break;
-              case BINARY: {
-                BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
-                byte[] bytes = bcv.vector[rowIndex];
-                serializeVectorStream.write(bytes, 0, bytes.length);
-              }
-                break;
-              case STRING:
-              case CHAR:
-              case VARCHAR: {
-                // Is it correct to escape CHAR and VARCHAR?
-                BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
-                LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex],
-                    bcv.start[rowIndex],
-                    bcv.length[rowIndex],
-                    serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams
-                        .getNeedsEscape());
-              }
-                break;
-              case TIMESTAMP:
-                LongColumnVector tcv = (LongColumnVector) batch.cols[k];
-                long timeInNanoSec = tcv.vector[rowIndex];
-                Timestamp t = new Timestamp(0);
-                TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t);
-                TimestampWritable tw = new TimestampWritable();
-                tw.set(t);
-                LazyTimestamp.writeUTF8(serializeVectorStream, tw);
-                break;
-              case DATE:
-                LongColumnVector dacv = (LongColumnVector) batch.cols[k];
-                DateWritable daw = new DateWritable((int) dacv.vector[rowIndex]);
-                LazyDate.writeUTF8(serializeVectorStream, daw);
-                break;
-              default:
-                throw new UnsupportedOperationException(
-                    "Vectorizaton is not supported for datatype:"
-                        + poi.getPrimitiveCategory());
-              }
-            }
-            break;
-          }
-          case LIST:
-          case MAP:
-          case STRUCT:
-          case UNION:
-            throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:"
-                + foi.getCategory());
-          default:
-            throw new SerDeException("Unknown ObjectInspector category!");
-
-          }
-
-          byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream
-              .getLength() - count);
-          count = serializeVectorStream.getLength();
-        }
-
-      }
-      ow.set(byteRefArray);
-    } catch (Exception e) {
-      throw new SerDeException(e);
-    }
-    return ow;
-  }
-
-  @Override
-  public SerDeStats getSerDeStats() {
-    return null;
-  }
-
-  @Override
-  public Class<? extends Writable> getSerializedClass() {
-    return BytesRefArrayWritable.class;
-  }
-
-  @Override
-  public Object deserialize(Writable blob) throws SerDeException {
-
-    // Ideally this should throw  UnsupportedOperationException as the serde is
-    // vectorized serde. But since RC file reader does not support vectorized reading this
-    // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch
-    // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
-    // reading this serde and be standalone serde with no dependency on ColumnarSerDe.
-    return super.deserialize(blob);
-  }
-
-  @Override
-  public ObjectInspector getObjectInspector() throws SerDeException {
-    return cachedObjectInspector;
-  }
-
-  @Override
-  public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Deserializes the rowBlob into Vectorized row batch
-   * @param rowBlob
-   *          rowBlob row batch to deserialize
-   * @param rowsInBlob
-   *          Total number of rows in rowBlob to deserialize
-   * @param reuseBatch
-   *          VectorizedRowBatch to which the rows should be serialized   *
-   * @throws SerDeException
-   */
-  @Override
-  public void deserializeVector(Object rowBlob, int rowsInBlob,
-      VectorizedRowBatch reuseBatch) throws SerDeException {
-
-    BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    for (int i = 0; i < rowsInBlob; i++) {
-      Object row = deserialize(refArray[i]);
-      try {
-        VectorizedBatchUtil.addRowToBatch(row,
-            (StructObjectInspector) cachedObjectInspector, i,
-            reuseBatch, buffer);
-      } catch (HiveException e) {
-        throw new SerDeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
index 7e41384..2882024 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
@@ -45,6 +45,10 @@ public class VectorizedRowBatch implements Writable {
   public int[] projectedColumns;
   public int projectionSize;
 
+  private int dataColumnCount;
+  private int partitionColumnCount;
+
+
   /*
    * If no filtering has been applied yet, selectedInUse is false,
    * meaning that all rows qualify. If it is true, then the selected[] array
@@ -94,6 +98,22 @@ public class VectorizedRowBatch implements Writable {
     for (int i = 0; i < numCols; i++) {
       projectedColumns[i] = i;
     }
+
+    dataColumnCount = -1;
+    partitionColumnCount = -1;
+  }
+
+  public void setPartitionInfo(int dataColumnCount, int partitionColumnCount) {
+    this.dataColumnCount = dataColumnCount;
+    this.partitionColumnCount = partitionColumnCount;
+  }
+
+  public int getDataColumnCount() {
+    return dataColumnCount;
+  }
+
+  public int getPartitionColumnCount() {
+    return partitionColumnCount;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 81ab129..efb06b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -20,16 +20,10 @@ package org.apache.hadoop.hive.ql.exec.vector;
 import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,392 +32,270 @@ import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hive.common.util.DateUtils;
 
 /**
- * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
+ * Context for Vectorized row batch. this class does eager deserialization of row data using serde
  * in the RecordReader layer.
  * It has supports partitions in this layer so that the vectorized batch is populated correctly
  * with the partition column.
  */
 public class VectorizedRowBatchCtx {
 
+  private static final long serialVersionUID = 1L;
+
   private static final Log LOG = LogFactory.getLog(VectorizedRowBatchCtx.class.getName());
 
-  // OI for raw row data (EG without partition cols)
-  private StructObjectInspector rawRowOI;
+  // The following information is for creating VectorizedRowBatch and for helping with
+  // knowing how the table is partitioned.
+  //
+  // It will be stored in MapWork and ReduceWork.
+  private String[] rowColumnNames;
+  private TypeInfo[] rowColumnTypeInfos;
+  private int dataColumnCount;
+  private int partitionColumnCount;
 
-  // OI for the row (Raw row OI + partition OI)
-  private StructObjectInspector rowOI;
+  private String[] scratchColumnTypeNames;
 
-  // Deserializer for the row data
-  private Deserializer deserializer;
+  /**
+   * Constructor for VectorizedRowBatchCtx
+   */
+  public VectorizedRowBatchCtx() {
+  }
 
-  // Hash map of partition values. Key=TblColName value=PartitionValue
-  private Map<String, Object> partitionValues;
-  
-  //partition types
-  private Map<String, PrimitiveCategory> partitionTypes;
+  public VectorizedRowBatchCtx(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos,
+      int partitionColumnCount, String[] scratchColumnTypeNames) {
+    this.rowColumnNames = rowColumnNames;
+    this.rowColumnTypeInfos = rowColumnTypeInfos;
+    this.partitionColumnCount = partitionColumnCount;
+    this.scratchColumnTypeNames = scratchColumnTypeNames;
 
-  // partition column positions, for use by classes that need to know whether a given column is a
-  // partition column
-  private Set<Integer> partitionCols;
-  
-  // Column projection list - List of column indexes to include. This
-  // list does not contain partition columns
-  private List<Integer> colsToInclude;
+    dataColumnCount = rowColumnTypeInfos.length - partitionColumnCount;
+  }
 
-  private Map<Integer, String> scratchColumnTypeMap = null;
+  public String[] getRowColumnNames() {
+    return rowColumnNames;
+  }
 
-  /**
-   * Constructor for VectorizedRowBatchCtx
-   *
-   * @param rawRowOI
-   *          OI for raw row data (EG without partition cols)
-   * @param rowOI
-   *          OI for the row (Raw row OI + partition OI)
-   * @param deserializer
-   *          Deserializer for the row data
-   * @param partitionValues
-   *          Hash map of partition values. Key=TblColName value=PartitionValue
-   */
-  public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI,
-      Deserializer deserializer, Map<String, Object> partitionValues, 
-      Map<String, PrimitiveCategory> partitionTypes) {
-    this.rowOI = rowOI;
-    this.rawRowOI = rawRowOI;
-    this.deserializer = deserializer;
-    this.partitionValues = partitionValues;
-    this.partitionTypes = partitionTypes;
+  public TypeInfo[] getRowColumnTypeInfos() {
+    return rowColumnTypeInfos;
   }
 
-  /**
-   * Constructor for VectorizedRowBatchCtx
-   */
-  public VectorizedRowBatchCtx() {
+  public int getDataColumnCount() {
+    return dataColumnCount;
+  }
 
+  public int getPartitionColumnCount() {
+    return partitionColumnCount;
+  }
+
+  public String[] getScratchColumnTypeNames() {
+    return scratchColumnTypeNames;
   }
 
   /**
-   * Initializes the VectorizedRowBatch context based on an scratch column type map and
+   * Initializes the VectorizedRowBatch context based on an scratch column type names and
    * object inspector.
-   * @param scratchColumnTypeMap
-   * @param rowOI
+   * @param structObjectInspector
+   * @param scratchColumnTypeNames
    *          Object inspector that shapes the column types
+   * @throws HiveException
    */
-  public void init(Map<Integer, String> scratchColumnTypeMap,
-      StructObjectInspector rowOI) {
-    this.scratchColumnTypeMap = scratchColumnTypeMap;
-    this.rowOI= rowOI;
-    this.rawRowOI = rowOI;
+  public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames)
+          throws HiveException {
+
+    // Row column information.
+    rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector);
+    rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector);
+    partitionColumnCount = 0;
+    dataColumnCount = rowColumnTypeInfos.length;
+
+    // Scratch column information.
+    this.scratchColumnTypeNames = scratchColumnTypeNames;
   }
 
-  /**
-   * Initializes VectorizedRowBatch context based on the
-   * split and Hive configuration (Job conf with hive Plan).
-   *
-   * @param hiveConf
-   *          Hive configuration using Hive plan is extracted
-   * @param split
-   *          File split of the file being read
-   * @throws ClassNotFoundException
-   * @throws IOException
-   * @throws SerDeException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
-   * @throws HiveException
-   */
-  public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException,
-      IOException,
-      SerDeException,
-      InstantiationException,
-      IllegalAccessException,
-      HiveException {
+  public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf,
+      FileSplit split, Object[] partitionValues) throws IOException {
 
     Map<String, PartitionDesc> pathToPartitionInfo = Utilities
         .getMapWork(hiveConf).getPathToPartitionInfo();
 
-    PartitionDesc part = HiveFileFormatUtils
+    PartitionDesc partDesc = HiveFileFormatUtils
         .getPartitionDescFromPathRecursively(pathToPartitionInfo,
             split.getPath(), IOPrepareCache.get().getPartitionDescMap());
 
-    String partitionPath = split.getPath().getParent().toString();
-    scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf);
-    // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString());
-
-    Properties partProps =
-        (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ?
-            part.getTableDesc().getProperties() : part.getProperties();
-
-    Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName());
-    Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); 
-    SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(),
-                               partProps);
-    StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
-        .getObjectInspector();
-
-    deserializer = partDeserializer;
-
-    // Check to see if this split is part of a partition of a table
-    String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-
-    String[] partKeys = null;
-    if (pcols != null && pcols.length() > 0) {
-
-      // Partitions exist for this table. Get the partition object inspector and
-      // raw row object inspector (row with out partition col)
-      LinkedHashMap<String, String> partSpec = part.getPartSpec();
-      partKeys = pcols.trim().split("/");
-      String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);      
-      String[] partKeyTypes = pcolTypes.trim().split(":");      
-      
-      if (partKeys.length  > partKeyTypes.length) {
-        throw new HiveException("Internal error : partKeys length, " +partKeys.length +
-                " greater than partKeyTypes length, " + partKeyTypes.length);
-      }
-      
-      List<String> partNames = new ArrayList<String>(partKeys.length);
-      List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-      partitionValues = new LinkedHashMap<String, Object>();
-      partitionTypes = new LinkedHashMap<String, PrimitiveCategory>();
-      for (int i = 0; i < partKeys.length; i++) {
-        String key = partKeys[i];
-        partNames.add(key);
-        ObjectInspector objectInspector = null;
-        Object objectVal; 
-        if (partSpec == null) {
-          // for partitionless table, initialize partValue to empty string.
-          // We can have partitionless table even if we have partition keys
-          // when there is only only partition selected and the partition key is not
-          // part of the projection/include list.
-          objectVal = null;
-          objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
-          partitionTypes.put(key, PrimitiveCategory.STRING);       
-        } else {
-          // Create a Standard java object Inspector
-          PrimitiveTypeInfo partColTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]);
-          objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
-              partColTypeInfo);
-          objectVal = 
-              ObjectInspectorConverters.
-              getConverter(PrimitiveObjectInspectorFactory.
-                  javaStringObjectInspector, objectInspector).
-                  convert(partSpec.get(key));
-          if (partColTypeInfo instanceof CharTypeInfo) {
-            objectVal = ((HiveChar) objectVal).getStrippedValue();
-          }
-          partitionTypes.put(key, partColTypeInfo.getPrimitiveCategory());
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key));
-        }
-        partitionValues.put(key, objectVal);
-        partObjectInspectors.add(objectInspector);
-      }
-
-      // Create partition OI
-      StructObjectInspector partObjectInspector = ObjectInspectorFactory
-          .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
-      // Get row OI from partition OI and raw row OI
-      StructObjectInspector rowObjectInspector = ObjectInspectorFactory
-          .getUnionStructObjectInspector(Arrays
-              .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector}));
-      rowOI = rowObjectInspector;
-      rawRowOI = partRawRowObjectInspector;
-
-      // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it
-      partitionCols = new HashSet<Integer>();
-      if (pcols != null && pcols.length() > 0) {
-        for (int i = 0; i < partKeys.length; i++) {
-          partitionCols.add(getColIndexBasedOnColName(partKeys[i]));
-        }
-      }
+    getPartitionValues(vrbCtx, partDesc, partitionValues);
 
-    } else {
+  }
 
-      // No partitions for this table, hence row OI equals raw row OI
-      rowOI = partRawRowObjectInspector;
-      rawRowOI = partRawRowObjectInspector;
+  public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc,
+      Object[] partitionValues) {
+
+    LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+
+    for (int i = 0; i < vrbCtx.partitionColumnCount; i++) {
+      Object objectValue;
+      if (partSpec == null) {
+        // For partition-less table, initialize partValue to empty string.
+        // We can have partition-less table even if we have partition keys
+        // when there is only only partition selected and the partition key is not
+        // part of the projection/include list.
+        objectValue = null;
+      } else {
+        String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i];
+
+        // Create a Standard java object Inspector
+        ObjectInspector objectInspector =
+            TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+                vrbCtx.rowColumnTypeInfos[vrbCtx.dataColumnCount + i]);
+        objectValue =
+            ObjectInspectorConverters.
+                getConverter(PrimitiveObjectInspectorFactory.
+                    javaStringObjectInspector, objectInspector).
+                        convert(partSpec.get(key));
+      }
+      partitionValues[i] = objectValue;
     }
-
-    colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf);
   }
-  
+
   /**
    * Creates a Vectorized row batch and the column vectors.
    *
    * @return VectorizedRowBatch
    * @throws HiveException
    */
-  public VectorizedRowBatch createVectorizedRowBatch() throws HiveException
+  public VectorizedRowBatch createVectorizedRowBatch()
   {
-    List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
-    VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
-    for (int j = 0; j < fieldRefs.size(); j++) {
-      // If the column is included in the include list or if the column is a
-      // partition column then create the column vector. Also note that partition columns are not
-      // in the included list.
-      if ((colsToInclude == null) || colsToInclude.contains(j)
-          || ((partitionValues != null) &&
-              partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) {
-        ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
-        switch (foi.getCategory()) {
-        case PRIMITIVE: {
-          PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-          // Vectorization currently only supports the following data types:
-          // BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, BINARY, STRING, CHAR, VARCHAR, TIMESTAMP,
-          // DATE and DECIMAL
-          switch (poi.getPrimitiveCategory()) {
-          case BOOLEAN:
-          case BYTE:
-          case SHORT:
-          case INT:
-          case LONG:
-          case TIMESTAMP:
-          case DATE:
-          case INTERVAL_YEAR_MONTH:
-          case INTERVAL_DAY_TIME:
-            result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
-            break;
-          case FLOAT:
-          case DOUBLE:
-            result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
-            break;
-          case BINARY:
-          case STRING:
-          case CHAR:
-          case VARCHAR:
-            result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
-            break;
-          case DECIMAL:
-            DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
-            result.cols[j] = new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-                tInfo.precision(), tInfo.scale());
-            break;
-          default:
-            throw new RuntimeException("Vectorizaton is not supported for datatype:"
-                + poi.getPrimitiveCategory());
-          }
-          break;
-        }
-        case LIST:
-        case MAP:
-        case STRUCT:
-        case UNION:
-          throw new HiveException("Vectorizaton is not supported for datatype:"
-              + foi.getCategory());
-        default:
-          throw new HiveException("Unknown ObjectInspector category!");
-        }    
-      }
+    int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+    VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+    LOG.info("createVectorizedRowBatch columnsToIncludeTruncated NONE");
+    for (int i = 0; i < rowColumnTypeInfos.length; i++) {
+      TypeInfo typeInfo = rowColumnTypeInfos[i];
+      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+    }
+
+    for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+      String typeName = scratchColumnTypeNames[i];
+      result.cols[rowColumnTypeInfos.length + i] =
+          VectorizedBatchUtil.createColumnVector(typeName);
     }
-    result.numCols = fieldRefs.size();
-    this.addScratchColumnsToBatch(result);
+
+    result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
     result.reset();
     return result;
   }
 
-  /**
-   * Adds the row to the batch after deserializing the row
-   *
-   * @param rowIndex
-   *          Row index in the batch to which the row is added
-   * @param rowBlob
-   *          Row blob (serialized version of row)
-   * @param batch
-   *          Vectorized batch to which the row is added
-   * @param buffer a buffer to copy strings into
-   * @throws HiveException
-   * @throws SerDeException
-   */
-  public void addRowToBatch(int rowIndex, Writable rowBlob,
-                            VectorizedRowBatch batch,
-                            DataOutputBuffer buffer
-                            ) throws HiveException, SerDeException
+  public VectorizedRowBatch createVectorizedRowBatch(boolean[] columnsToIncludeTruncated)
   {
-    Object row = this.deserializer.deserialize(rowBlob);
-    VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer);
-  }
+    if (columnsToIncludeTruncated == null) {
+      return createVectorizedRowBatch();
+    }
 
-  /**
-   * Deserialized set of rows and populates the batch
-   *
-   * @param rowBlob
-   *          to deserialize
-   * @param batch
-   *          Vectorized row batch which contains deserialized data
-   * @throws SerDeException
-   */
-  public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob,
-      VectorizedRowBatch batch)
-      throws SerDeException {
-
-    if (deserializer instanceof VectorizedSerde) {
-      ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch);
-    } else {
-      throw new SerDeException(
-          "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
+    LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
+    int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+    VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+    for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
+      if (columnsToIncludeTruncated[i]) {
+        TypeInfo typeInfo = rowColumnTypeInfos[i];
+        result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+      }
     }
+
+    for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
+      TypeInfo typeInfo = rowColumnTypeInfos[i];
+      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+    }
+
+    for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+      String typeName = scratchColumnTypeNames[i];
+      result.cols[rowColumnTypeInfos.length + i] =
+          VectorizedBatchUtil.createColumnVector(typeName);
+    }
+
+    result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
+    result.reset();
+    return result;
   }
 
-  private int getColIndexBasedOnColName(String colName) throws HiveException
-  {
-    List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
-    for (int i = 0; i < fieldRefs.size(); i++) {
-      if (fieldRefs.get(i).getFieldName().equals(colName)) {
-        return i;
+  public boolean[] getColumnsToIncludeTruncated(Configuration conf) {
+    boolean[] columnsToIncludeTruncated = null;
+
+    List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(conf);
+    if (columnsToIncludeTruncatedList != null && columnsToIncludeTruncatedList.size() > 0 ) {
+
+      // Partitioned columns will not be in the include list.
+
+      boolean[] columnsToInclude = new boolean[dataColumnCount];
+      Arrays.fill(columnsToInclude, false);
+      for (int columnNum : columnsToIncludeTruncatedList) {
+        if (columnNum < dataColumnCount) {
+          columnsToInclude[columnNum] = true;
+        }
+      }
+
+      // Work backwards to find the highest wanted column.
+
+      int highestWantedColumnNum = -1;
+      for (int i = dataColumnCount - 1; i >= 0; i--) {
+        if (columnsToInclude[i]) {
+          highestWantedColumnNum = i;
+          break;
+        }
+      }
+      if (highestWantedColumnNum == -1) {
+        throw new RuntimeException("No columns to include?");
+      }
+      int newColumnCount = highestWantedColumnNum + 1;
+      if (newColumnCount == dataColumnCount) {
+        // Didn't trim any columns off the end.  Use the original.
+        columnsToIncludeTruncated = columnsToInclude;
+      } else {
+        columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount);
       }
     }
-    throw new HiveException("Not able to find column name in row object inspector");
+    return columnsToIncludeTruncated;
   }
-  
+
   /**
    * Add the partition values to the batch
    *
    * @param batch
+   * @param partitionValues
    * @throws HiveException
    */
-  public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException
+  public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues)
   {
-    int colIndex;
-    Object value;
-    PrimitiveCategory pCategory;
     if (partitionValues != null) {
-      for (String key : partitionValues.keySet()) {
-        colIndex = getColIndexBasedOnColName(key);
-        value = partitionValues.get(key);
-        pCategory = partitionTypes.get(key);
-        
-        switch (pCategory) {
+      for (int i = 0; i < partitionColumnCount; i++) {
+        Object value = partitionValues[i];
+
+        int colIndex = dataColumnCount + i;
+        String partitionColumnName = rowColumnNames[colIndex];
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex];
+        switch (primitiveTypeInfo.getPrimitiveCategory()) {
         case BOOLEAN: {
           LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex];
           if (value == null) {
@@ -575,7 +447,7 @@ public class VectorizedRowBatchCtx {
             HiveDecimal hd = (HiveDecimal) value;
             dv.set(0, hd);
             dv.isRepeating = true;
-            dv.isNull[0] = false;      
+            dv.isNull[0] = false;
           }
         }
         break;
@@ -604,15 +476,15 @@ public class VectorizedRowBatchCtx {
             bcv.isNull[0] = true;
             bcv.isRepeating = true;
           } else {
-            bcv.fill(sVal.getBytes()); 
+            bcv.fill(sVal.getBytes());
             bcv.isNull[0] = false;
           }
         }
         break;
-        
+
         default:
-          throw new HiveException("Unable to recognize the partition type " + pCategory + 
-              " for column " + key);
+          throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() +
+              " for column " + partitionColumnName);
         }
       }
     }
@@ -620,64 +492,12 @@ public class VectorizedRowBatchCtx {
 
   /**
    * Determine whether a given column is a partition column
-   * @param colnum column number in
+   * @param colNum column number in
    * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context.
    * @return true if it is a partition column, false otherwise
    */
-  public final boolean isPartitionCol(int colnum) {
-    return (partitionCols == null) ? false : partitionCols.contains(colnum);
-  }
-
-  private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException {
-    if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) {
-      int origNumCols = vrb.numCols;
-      int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size();
-      vrb.cols = Arrays.copyOf(vrb.cols, newNumCols);
-      for (int i = origNumCols; i < newNumCols; i++) {
-       String typeName = scratchColumnTypeMap.get(i);
-       if (typeName == null) {
-         throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString());
-       }
-        vrb.cols[i] = allocateColumnVector(typeName,
-            VectorizedRowBatch.DEFAULT_SIZE);
-      }
-      vrb.numCols = vrb.cols.length;
-    }
-  }
-
-  /**
-   * Get the scale and precision for the given decimal type string. The decimal type is assumed to be
-   * of the format decimal(precision,scale) e.g. decimal(20,10).
-   * @param decimalType The given decimal type string.
-   * @return An integer array of size 2 with first element set to precision and second set to scale.
-   */
-  private static int[] getScalePrecisionFromDecimalType(String decimalType) {
-    Pattern p = Pattern.compile("\\d+");
-    Matcher m = p.matcher(decimalType);
-    m.find();
-    int precision = Integer.parseInt(m.group());
-    m.find();
-    int scale = Integer.parseInt(m.group());
-    int [] precScale = { precision, scale };
-    return precScale;
+  public final boolean isPartitionCol(int colNum) {
+    return colNum >= dataColumnCount && colNum < rowColumnTypeInfos.length;
   }
 
-  public static ColumnVector allocateColumnVector(String type, int defaultSize) {
-    if (type.equalsIgnoreCase("double")) {
-      return new DoubleColumnVector(defaultSize);
-    } else if (VectorizationContext.isStringFamily(type)) {
-      return new BytesColumnVector(defaultSize);
-    } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){
-      int [] precisionScale = getScalePrecisionFromDecimalType(type);
-      return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]);
-    } else if (type.equalsIgnoreCase("long") ||
-               type.equalsIgnoreCase("date") ||
-               type.equalsIgnoreCase("timestamp") ||
-               type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) ||
-               type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) {
-      return new LongColumnVector(defaultSize);
-    } else {
-      throw new RuntimeException("Cannot allocate vector column for " + type);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 35e3403..f28d3ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,12 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
 import org.apache.hadoop.hive.ql.exec.HashTableLoader;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
@@ -58,7 +52,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized.VectorMapJoinOpti
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -70,6 +63,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
  * This class is common operator class for native vectorized map join.
@@ -572,10 +567,11 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
      * Create our vectorized copy row and deserialize row helper objects.
      */
     if (smallTableMapping.getCount() > 0) {
-      smallTableVectorDeserializeRow = new VectorDeserializeRow(
-                      new LazyBinaryDeserializeRead(
-                          VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
-                              smallTableMapping.getTypeNames())));
+      smallTableVectorDeserializeRow =
+          new VectorDeserializeRow(
+              new LazyBinaryDeserializeRead(
+                  VectorizedBatchUtil.typeInfosFromTypeNames(
+                      smallTableMapping.getTypeNames())));
       smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns());
     }
 
@@ -649,23 +645,13 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
    * Setup our 2nd batch with the same "column schema" as the big table batch that can be used to
    * build join output results in.
    */
-  protected VectorizedRowBatch setupOverflowBatch() {
+  protected VectorizedRowBatch setupOverflowBatch() throws HiveException {
+
+    int initialColumnCount = vContext.firstOutputColumnIndex();
     VectorizedRowBatch overflowBatch;
 
-    Map<Integer, String> scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap();
-    int maxColumn = 0;
-    for (int i = 0; i < outputProjection.length; i++) {
-      int outputColumn = outputProjection[i];
-      if (maxColumn < outputColumn) {
-        maxColumn = outputColumn;
-      }
-    }
-    for (int outputColumn : scratchColumnTypeMap.keySet()) {
-      if (maxColumn < outputColumn) {
-        maxColumn = outputColumn;
-      }
-    }
-    overflowBatch = new VectorizedRowBatch(maxColumn + 1);
+    int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length;
+    overflowBatch = new VectorizedRowBatch(totalNumColumns);
 
     // First, just allocate just the projection columns we will be using.
     for (int i = 0; i < outputProjection.length; i++) {
@@ -675,9 +661,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     }
 
     // Now, add any scratch columns needed for children operators.
-    for (int outputColumn : scratchColumnTypeMap.keySet()) {
-      String typeName = scratchColumnTypeMap.get(outputColumn);
-      allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName);
+    int outputColumn = initialColumnCount;
+    for (String typeName : vOutContext.getScratchColumnTypeNames()) {
+      allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName);
     }
 
     overflowBatch.projectedColumns = outputProjection;
@@ -695,22 +681,13 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
               String typeName) {
 
     if (overflowBatch.cols[outputColumn] == null) {
-      String vectorTypeName;
-      if (VectorizationContext.isIntFamily(typeName) ||
-          VectorizationContext.isDatetimeFamily(typeName)) {
-        vectorTypeName = "long";
-      } else if (VectorizationContext.isFloatFamily(typeName)) {
-        vectorTypeName = "double";
-      } else if (VectorizationContext.isStringFamily(typeName)) {
-         vectorTypeName = "string";
-      } else if (VectorizationContext.decimalTypePattern.matcher(typeName).matches()){
-        vectorTypeName = typeName;  // Keep precision and scale.
-      } else {
-        throw new RuntimeException("Cannot determine vector type for " + typeName);
-      }
-      overflowBatch.cols[outputColumn] = VectorizedRowBatchCtx.allocateColumnVector(vectorTypeName, VectorizedRowBatch.DEFAULT_SIZE);
+      typeName = VectorizationContext.mapTypeNameSynonyms(typeName);
 
-      if (LOG.isDebugEnabled()) {
+      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+      overflowBatch.cols[outputColumn] = VectorizedBatchUtil.createColumnVector(typeInfo);
+
+      if (isLogDebugEnabled) {
         LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator initializeOp overflowBatch outputColumn " + outputColumn + " class " + overflowBatch.cols[outputColumn].getClass().getSimpleName());
       }
     }