You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2015/11/18 15:52:16 UTC

[14/17] hive git commit: HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index a8ebf8f..ce49b24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -28,11 +28,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -65,11 +67,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -96,6 +98,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -112,6 +115,7 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
 import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -163,6 +167,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import com.google.common.base.Joiner;
+
 public class Vectorizer implements PhysicalPlanResolver {
 
   protected static transient final Logger LOG = LoggerFactory.getLogger(Vectorizer.class);
@@ -324,14 +330,51 @@ public class Vectorizer implements PhysicalPlanResolver {
     supportedAggregationUdfs.add("stddev_samp");
   }
 
+  private class VectorTaskColumnInfo {
+    List<String> columnNames;
+    List<TypeInfo> typeInfos;
+    int partitionColumnCount;
+
+    String[] scratchTypeNameArray;
+
+    VectorTaskColumnInfo() {
+      partitionColumnCount = 0;
+    }
+
+    public void setColumnNames(List<String> columnNames) {
+      this.columnNames = columnNames;
+    }
+    public void setTypeInfos(List<TypeInfo> typeInfos) {
+      this.typeInfos = typeInfos;
+    }
+    public void setPartitionColumnCount(int partitionColumnCount) {
+      this.partitionColumnCount = partitionColumnCount;
+    }
+    public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
+      this.scratchTypeNameArray = scratchTypeNameArray;
+    }
+
+    public void transferToBaseWork(BaseWork baseWork) {
+
+      String[] columnNameArray = columnNames.toArray(new String[0]);
+      TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]);
+
+      VectorizedRowBatchCtx vectorizedRowBatchCtx =
+          new VectorizedRowBatchCtx(
+            columnNameArray,
+            typeInfoArray,
+            partitionColumnCount,
+            scratchTypeNameArray);
+      baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
+    }
+  }
+
   class VectorizationDispatcher implements Dispatcher {
 
-    private List<String> reduceColumnNames;
-    private List<TypeInfo> reduceTypeInfos;
+    private final PhysicalContext physicalContext;
 
     public VectorizationDispatcher(PhysicalContext physicalContext) {
-      reduceColumnNames = null;
-      reduceTypeInfos = null;
+      this.physicalContext = physicalContext;
     }
 
     @Override
@@ -369,9 +412,10 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
-      boolean ret = validateMapWork(mapWork, isTez);
+      VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+      boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
       if (ret) {
-        vectorizeMapWork(mapWork, isTez);
+        vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
       }
     }
 
@@ -382,40 +426,262 @@ public class Vectorizer implements PhysicalPlanResolver {
           + ReduceSinkOperator.getOperatorName()), np);
     }
 
-    private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
-      LOG.info("Validating MapWork...");
+    private ImmutablePair<String, TableScanOperator> verifyOnlyOneTableScanOperator(MapWork mapWork) {
 
       // Eliminate MR plans with more than one TableScanOperator.
+
       LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
       if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
-        return false;
+        return null;
       }
       int tableScanCount = 0;
-      for (Operator<?> op : aliasToWork.values()) {
+      String alias = "";
+      TableScanOperator tableScanOperator = null;
+      for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+        Operator<?> op = entry.getValue();
         if (op == null) {
           LOG.warn("Map work has invalid aliases to work with. Fail validation!");
-          return false;
+          return null;
         }
         if (op instanceof TableScanOperator) {
           tableScanCount++;
+          alias = entry.getKey();
+          tableScanOperator = (TableScanOperator) op;
         }
       }
       if (tableScanCount > 1) {
-        LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!");
-        return false;
+        LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!");
+        return null;
+      }
+      return new ImmutablePair(alias, tableScanOperator);
+    }
+
+    private void getTableScanOperatorSchemaInfo(TableScanOperator tableScanOperator,
+        List<String> logicalColumnNameList, List<TypeInfo> logicalTypeInfoList) {
+
+      TableScanDesc tableScanDesc = tableScanOperator.getConf();
+
+      // Add all non-virtual columns to make a vectorization context for
+      // the TableScan operator.
+      RowSchema rowSchema = tableScanOperator.getSchema();
+      for (ColumnInfo c : rowSchema.getSignature()) {
+        // Validation will later exclude vectorization of virtual columns usage (HIVE-5560).
+        if (!isVirtualColumn(c)) {
+          String columnName = c.getInternalName();
+          String typeName = c.getTypeName();
+          TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+          logicalColumnNameList.add(columnName);
+          logicalTypeInfoList.add(typeInfo);
+        }
+      }
+    }
+
+     private String getColumns(List<String> columnNames, int start, int length,
+        Character separator) {
+      return Joiner.on(separator).join(columnNames.subList(start, start + length));
+    }
+
+    private String getTypes(List<TypeInfo> typeInfos, int start, int length) {
+      return TypeInfoUtils.getTypesString(typeInfos.subList(start, start + length));
+    }
+
+    private boolean verifyAndSetVectorPartDesc(PartitionDesc pd) {
+
+      // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface
+      // and reads VectorizedRowBatch as a "row".
+
+      if (Utilities.isInputFileFormatVectorized(pd)) {
+
+        pd.setVectorPartitionDesc(VectorPartitionDesc.createVectorizedInputFileFormat());
+
+        return true;
       }
 
+      LOG.info("Input format: " + pd.getInputFileFormatClassName()
+          + ", doesn't provide vectorized input");
+
+      return false;
+    }
+
+    private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
+        TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) {
+
+      // These names/types are the data columns plus partition columns.
+      final List<String> allColumnNameList = new ArrayList<String>();
+      final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>();
+
+      getTableScanOperatorSchemaInfo(tableScanOperator, allColumnNameList, allTypeInfoList);
+      final int allColumnCount = allColumnNameList.size();
+
+      // Validate input format and schema evolution capability.
+
+      // For the table, enter a null value in the multi-key map indicating no conversion necessary
+      // if the schema matches the table.
+
+      HashMap<ImmutablePair, boolean[]> conversionMap = new HashMap<ImmutablePair, boolean[]>();
+
+      boolean isFirst = true;
+      int dataColumnCount = 0;
+      int partitionColumnCount = 0;
+
+      List<String> dataColumnList = null;
+      String dataColumnsString = "";
+      List<TypeInfo> dataTypeInfoList = null;
+
       // Validate the input format
-      for (String path : mapWork.getPathToPartitionInfo().keySet()) {
-        PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
-        List<Class<?>> interfaceList =
-            Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
-        if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
-          LOG.info("Input format: " + pd.getInputFileFormatClassName()
-              + ", doesn't provide vectorized input");
+      VectorPartitionConversion partitionConversion = new VectorPartitionConversion();
+      LinkedHashMap<String, ArrayList<String>> pathToAliases = mapWork.getPathToAliases();
+      LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+      for (Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) {
+        String path = entry.getKey();
+        List<String> aliases = entry.getValue();
+        boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
+        if (!isPresent) {
+          LOG.info("Alias " + alias + " not present in aliases " + aliases);
           return false;
         }
+        PartitionDesc partDesc = pathToPartitionInfo.get(path);
+        if (partDesc.getVectorPartitionDesc() != null) {
+          // We seen this already.
+          continue;
+        }
+        if (!verifyAndSetVectorPartDesc(partDesc)) {
+          return false;
+        }
+        VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+        LOG.info("Vectorizer path: " + path + ", read type " +
+            vectorPartDesc.getVectorMapOperatorReadType().name() + ", aliases " + aliases);
+
+        Properties partProps = partDesc.getProperties();
+
+        String nextDataColumnsString =
+            partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+        String[] nextDataColumns = nextDataColumnsString.split(",");
+
+        String nextDataTypesString =
+            partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+
+        // We convert to an array of TypeInfo using a library routine since it parses the information
+        // and can handle use of different separators, etc.  We cannot use the raw type string
+        // for comparison in the map because of the different separators used.
+        List<TypeInfo> nextDataTypeInfoList =
+            TypeInfoUtils.getTypeInfosFromTypeString(nextDataTypesString);
+
+        if (isFirst) {
+
+          // We establish with the first one whether the table is partitioned or not.
+
+          LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+          if (partSpec != null && partSpec.size() > 0) {
+            partitionColumnCount = partSpec.size();
+            dataColumnCount = allColumnCount - partitionColumnCount;
+          } else {
+            partitionColumnCount = 0;
+            dataColumnCount = allColumnCount;
+          }
+
+          dataColumnList = allColumnNameList.subList(0, dataColumnCount);
+          dataColumnsString = getColumns(allColumnNameList, 0, dataColumnCount, ',');
+          dataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount);
+
+          // Add the table (non-partitioned) columns and types into the map as not needing
+          // conversion (i.e. null).
+          conversionMap.put(
+              new ImmutablePair(dataColumnsString, dataTypeInfoList), null);
+
+          isFirst = false;
+        }
+
+        ImmutablePair columnNamesAndTypesCombination =
+            new ImmutablePair(nextDataColumnsString, nextDataTypeInfoList);
+
+        boolean[] conversionFlags;
+        if (conversionMap.containsKey(columnNamesAndTypesCombination)) {
+
+          conversionFlags = conversionMap.get(columnNamesAndTypesCombination);
+
+        } else {
+
+          List<String> nextDataColumnList = Arrays.asList(nextDataColumns);
+
+          // Validate the column names that are present are the same.  Missing columns will be
+          // implicitly defaulted to null.
+
+          if (nextDataColumnList.size() > dataColumnList.size()) {
+            LOG.info(
+                String.format("Could not vectorize partition %s.  The partition column names %d is greater than the number of table columns %d",
+                    path, nextDataColumnList.size(), dataColumnList.size()));
+            return false;
+          }
+          for (int i = 0; i < nextDataColumnList.size(); i++) {
+            String nextColumnName = nextDataColumnList.get(i);
+            String tableColumnName = dataColumnList.get(i);
+            if (!nextColumnName.equals(tableColumnName)) {
+              LOG.info(
+                  String.format("Could not vectorize partition %s.  The partition column name %s is does not match table column name %s",
+                      path, nextColumnName, tableColumnName));
+              return false;
+            }
+          }
+
+          // The table column types might have been changed with ALTER.  There are restrictions
+          // here for vectorization.
+
+          // Some readers / deserializers take responsibility for conversion themselves.
+
+          // If we need to check for conversion, the conversion object may come back null
+          // indicating from a vectorization point of view the conversion is implicit.  That is,
+          // all implicit integer upgrades.
+
+          if (vectorPartDesc.getNeedsDataTypeConversionCheck() &&
+             !nextDataTypeInfoList.equals(dataTypeInfoList)) {
+
+             // The results will be in 2 members: validConversion and conversionFlags
+            partitionConversion.validateConversion(nextDataTypeInfoList, dataTypeInfoList);
+             if (!partitionConversion.getValidConversion()) {
+               return false;
+             }
+             conversionFlags = partitionConversion.getResultConversionFlags();
+           } else {
+            conversionFlags = null;
+          }
+
+          // We enter this in our map so we don't have to check again for subsequent partitions.
+
+          conversionMap.put(columnNamesAndTypesCombination, conversionFlags);
+        }
+
+        vectorPartDesc.setConversionFlags(conversionFlags);
+
+        vectorPartDesc.setTypeInfos(nextDataTypeInfoList);
+      }
+
+      vectorTaskColumnInfo.setColumnNames(allColumnNameList);
+      vectorTaskColumnInfo.setTypeInfos(allTypeInfoList);
+      vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount);
+
+      return true;
+    }
+
+    private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez)
+            throws SemanticException {
+
+      LOG.info("Validating MapWork...");
+
+      ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork);
+      if (pair ==  null) {
+        return false;
       }
+      String alias = pair.left;
+      TableScanOperator tableScanOperator = pair.right;
+
+      // This call fills in the column names, types, and partition column count in
+      // vectorTaskColumnInfo.
+      if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) {
+        return false;
+      }
+
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
       addMapWorkRules(opRules, vnp);
@@ -437,11 +703,14 @@ public class Vectorizer implements PhysicalPlanResolver {
       return true;
     }
 
-    private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+    private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
+            boolean isTez) throws SemanticException {
+
       LOG.info("Vectorizing MapWork...");
       mapWork.setVectorMode(true);
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez);
+      MapWorkVectorizationNodeProcessor vnp =
+          new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo);
       addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderOnceWalker(disp);
@@ -451,9 +720,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
       ogw.startWalking(topNodes, nodeOutput);
 
-      mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
-      mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
-      mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+      vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+      vectorTaskColumnInfo.transferToBaseWork(mapWork);
 
       if (LOG.isDebugEnabled()) {
         debugDisplayAllMaps(mapWork);
@@ -463,13 +732,19 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
-      boolean ret = validateReduceWork(reduceWork);
+      VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+      boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
       if (ret) {
-        vectorizeReduceWork(reduceWork, isTez);
+        vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
       }
     }
 
-    private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+    private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork,
+            VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
+
+      ArrayList<String> reduceColumnNames = new ArrayList<String>();
+      ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
+
       try {
         // Check key ObjectInspector.
         ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
@@ -493,9 +768,6 @@ public class Vectorizer implements PhysicalPlanResolver {
         StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
         List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
 
-        reduceColumnNames = new ArrayList<String>();
-        reduceTypeInfos = new ArrayList<TypeInfo>();
-
         for (StructField field: keyFields) {
           reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
           reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
@@ -507,6 +779,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       } catch (Exception e) {
         throw new SemanticException(e);
       }
+
+      vectorTaskColumnInfo.setColumnNames(reduceColumnNames);
+      vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos);
+
       return true;
     }
 
@@ -515,11 +791,13 @@ public class Vectorizer implements PhysicalPlanResolver {
       opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
     }
 
-    private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+    private boolean validateReduceWork(ReduceWork reduceWork,
+        VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
       LOG.info("Validating ReduceWork...");
 
       // Validate input to ReduceWork.
-      if (!getOnlyStructObjectInspectors(reduceWork)) {
+      if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
         return false;
       }
       // Now check the reduce operator tree.
@@ -543,7 +821,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       return true;
     }
 
-    private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+    private void vectorizeReduceWork(ReduceWork reduceWork,
+        VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
       LOG.info("Vectorizing ReduceWork...");
       reduceWork.setVectorMode(true);
 
@@ -552,7 +832,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       ReduceWorkVectorizationNodeProcessor vnp =
-              new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez);
+              new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez);
       addReduceWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
@@ -567,9 +847,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       // Necessary since we are vectorizing the root operator in reduce.
       reduceWork.setReducer(vnp.getRootVectorOp());
 
-      reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
-      reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
-      reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+      vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+      vectorTaskColumnInfo.transferToBaseWork(reduceWork);
 
       if (LOG.isDebugEnabled()) {
         debugDisplayAllMaps(reduceWork);
@@ -637,23 +917,11 @@ public class Vectorizer implements PhysicalPlanResolver {
     // The vectorization context for the Map or Reduce task.
     protected VectorizationContext taskVectorizationContext;
 
-    // The input projection column type name map for the Map or Reduce task.
-    protected Map<Integer, String> taskColumnTypeNameMap;
-
     VectorizationNodeProcessor() {
-      taskColumnTypeNameMap = new HashMap<Integer, String>();
-    }
-
-    public Map<String, Integer> getVectorColumnNameMap() {
-      return taskVectorizationContext.getProjectionColumnMap();
-    }
-
-    public Map<Integer, String> getVectorColumnTypeMap() {
-      return taskColumnTypeNameMap;
     }
 
-    public Map<Integer, String> getVectorScratchColumnTypeMap() {
-      return taskVectorizationContext.getScratchColumnTypeMap();
+    public String[] getVectorScratchColumnTypeNames() {
+      return taskVectorizationContext.getScratchColumnTypeNames();
     }
 
     protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -722,10 +990,15 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
 
+    private final MapWork mWork;
+    private VectorTaskColumnInfo vectorTaskColumnInfo;
     private final boolean isTez;
 
-    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
+    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+        VectorTaskColumnInfo vectorTaskColumnInfo) {
       super();
+      this.mWork = mWork;
+      this.vectorTaskColumnInfo = vectorTaskColumnInfo;
       this.isTez = isTez;
     }
 
@@ -739,8 +1012,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       if (op instanceof TableScanOperator) {
         if (taskVectorizationContext == null) {
-          taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(),
-                  taskColumnTypeNameMap);
+          taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
         }
         vContext = taskVectorizationContext;
       } else {
@@ -784,10 +1056,9 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
 
-    private final List<String> reduceColumnNames;
-    private final List<TypeInfo> reduceTypeInfos;
+    private VectorTaskColumnInfo vectorTaskColumnInfo;
 
-    private final boolean isTez;
+    private boolean isTez;
 
     private Operator<? extends OperatorDesc> rootVectorOp;
 
@@ -795,11 +1066,11 @@ public class Vectorizer implements PhysicalPlanResolver {
       return rootVectorOp;
     }
 
-    public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
-            List<TypeInfo> reduceTypeInfos, boolean isTez) {
+    public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
+            boolean isTez) {
+
       super();
-      this.reduceColumnNames =  reduceColumnNames;
-      this.reduceTypeInfos = reduceTypeInfos;
+      this.vectorTaskColumnInfo =  vectorTaskColumnInfo;
       rootVectorOp = null;
       this.isTez = isTez;
     }
@@ -815,15 +1086,11 @@ public class Vectorizer implements PhysicalPlanResolver {
       boolean saveRootVectorOp = false;
 
       if (op.getParentOperators().size() == 0) {
-        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString());
 
-        vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames);
+        vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.columnNames);
         taskVectorizationContext = vContext;
-        int i = 0;
-        for (TypeInfo typeInfo : reduceTypeInfos) {
-          taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
-          i++;
-        }
+
         saveRootVectorOp = true;
 
         if (LOG.isDebugEnabled()) {
@@ -887,6 +1154,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   @Override
   public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
+
     hiveConf = physicalContext.getConf();
 
     boolean vectorPath = HiveConf.getBoolVar(hiveConf,
@@ -1026,65 +1294,6 @@ public class Vectorizer implements PhysicalPlanResolver {
       return false;
     }
 
-    String columns = "";
-    String types = "";
-    String partitionColumns = "";
-    String partitionTypes = "";
-    boolean haveInfo = false;
-
-    // This over-reaches slightly, since we can have > 1 table-scan  per map-work.
-    // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate.
-    // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later.
-    LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo();
-
-    // For vectorization, compare each partition information for against the others.
-    // We assume the table information will be from one of the partitions, so it will
-    // work to focus on the partition information and not compare against the TableScanOperator
-    // columns (in the VectorizationContext)....
-    for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
-      PartitionDesc partDesc = entry.getValue();
-      if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
-        // No partition information -- we match because we would default to using the table description.
-        continue;
-      }
-      Properties partProps = partDesc.getProperties();
-      if (!haveInfo) {
-        columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
-        types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
-        partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-        partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
-        haveInfo = true;
-      } else {
-        String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
-        String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
-        String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-        String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
-        if (!columns.equalsIgnoreCase(nextColumns)) {
-          LOG.info(
-                  String.format("Could not vectorize partition %s.  Its column names %s do not match the other column names %s",
-                            entry.getKey(), nextColumns, columns));
-          return false;
-        }
-        if (!types.equalsIgnoreCase(nextTypes)) {
-          LOG.info(
-                  String.format("Could not vectorize partition %s.  Its column types %s do not match the other column types %s",
-                              entry.getKey(), nextTypes, types));
-          return false;
-        }
-        if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
-          LOG.info(
-                 String.format("Could not vectorize partition %s.  Its partition column names %s do not match the other partition column names %s",
-                              entry.getKey(), nextPartitionColumns, partitionColumns));
-          return false;
-        }
-        if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
-          LOG.info(
-                 String.format("Could not vectorize partition %s.  Its partition column types %s do not match the other partition column types %s",
-                                entry.getKey(), nextPartitionTypes, partitionTypes));
-          return false;
-        }
-      }
-    }
     return true;
   }
 
@@ -1440,23 +1649,10 @@ public class Vectorizer implements PhysicalPlanResolver {
     return result;
   }
 
-  private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName,
-    Map<Integer, String> typeNameMap) {
+  private VectorizationContext getVectorizationContext(String contextName,
+      VectorTaskColumnInfo vectorTaskColumnInfo) {
 
-    VectorizationContext vContext = new VectorizationContext(contextName);
-
-    // Add all non-virtual columns to make a vectorization context for
-    // the TableScan operator.
-    int i = 0;
-    for (ColumnInfo c : rowSchema.getSignature()) {
-      // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
-      if (!isVirtualColumn(c)) {
-        vContext.addInitialColumn(c.getInternalName());
-        typeNameMap.put(i, c.getTypeName());
-        i++;
-      }
-    }
-    vContext.finishedAddingInitialColumns();
+    VectorizationContext vContext = new VectorizationContext(contextName, vectorTaskColumnInfo.columnNames);
 
     return vContext;
   }
@@ -2011,12 +2207,16 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   public void debugDisplayAllMaps(BaseWork work) {
 
-    Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
-    Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
-    Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap();
+    VectorizedRowBatchCtx vectorizedRowBatchCtx = work.getVectorizedRowBatchCtx();
+
+    String[] columnNames = vectorizedRowBatchCtx.getRowColumnNames();
+    Object columnTypeInfos = vectorizedRowBatchCtx.getRowColumnTypeInfos();
+    int partitionColumnCount = vectorizedRowBatchCtx.getPartitionColumnCount();
+    String[] scratchColumnTypeNames =vectorizedRowBatchCtx.getScratchColumnTypeNames();
 
-    LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
-    LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
-    LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString());
+    LOG.debug("debugDisplayAllMaps columnNames " + Arrays.toString(columnNames));
+    LOG.debug("debugDisplayAllMaps columnTypeInfos " + Arrays.deepToString((Object[]) columnTypeInfos));
+    LOG.debug("debugDisplayAllMaps partitionColumnCount " + partitionColumnCount);
+    LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " + Arrays.toString(scratchColumnTypeNames));
   }
 }