You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2015/11/18 15:52:16 UTC
[14/17] hive git commit: HIVE-11981: ORC Schema Evolution Issues
(Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index a8ebf8f..ce49b24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -28,11 +28,13 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -65,11 +67,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -96,6 +98,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -112,6 +115,7 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -163,6 +167,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import com.google.common.base.Joiner;
+
public class Vectorizer implements PhysicalPlanResolver {
protected static transient final Logger LOG = LoggerFactory.getLogger(Vectorizer.class);
@@ -324,14 +330,51 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedAggregationUdfs.add("stddev_samp");
}
+ private class VectorTaskColumnInfo {
+ List<String> columnNames;
+ List<TypeInfo> typeInfos;
+ int partitionColumnCount;
+
+ String[] scratchTypeNameArray;
+
+ VectorTaskColumnInfo() {
+ partitionColumnCount = 0;
+ }
+
+ public void setColumnNames(List<String> columnNames) {
+ this.columnNames = columnNames;
+ }
+ public void setTypeInfos(List<TypeInfo> typeInfos) {
+ this.typeInfos = typeInfos;
+ }
+ public void setPartitionColumnCount(int partitionColumnCount) {
+ this.partitionColumnCount = partitionColumnCount;
+ }
+ public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
+ this.scratchTypeNameArray = scratchTypeNameArray;
+ }
+
+ public void transferToBaseWork(BaseWork baseWork) {
+
+ String[] columnNameArray = columnNames.toArray(new String[0]);
+ TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]);
+
+ VectorizedRowBatchCtx vectorizedRowBatchCtx =
+ new VectorizedRowBatchCtx(
+ columnNameArray,
+ typeInfoArray,
+ partitionColumnCount,
+ scratchTypeNameArray);
+ baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
+ }
+ }
+
class VectorizationDispatcher implements Dispatcher {
- private List<String> reduceColumnNames;
- private List<TypeInfo> reduceTypeInfos;
+ private final PhysicalContext physicalContext;
public VectorizationDispatcher(PhysicalContext physicalContext) {
- reduceColumnNames = null;
- reduceTypeInfos = null;
+ this.physicalContext = physicalContext;
}
@Override
@@ -369,9 +412,10 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- boolean ret = validateMapWork(mapWork, isTez);
+ VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+ boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
if (ret) {
- vectorizeMapWork(mapWork, isTez);
+ vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
}
}
@@ -382,40 +426,262 @@ public class Vectorizer implements PhysicalPlanResolver {
+ ReduceSinkOperator.getOperatorName()), np);
}
- private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- LOG.info("Validating MapWork...");
+ private ImmutablePair<String, TableScanOperator> verifyOnlyOneTableScanOperator(MapWork mapWork) {
// Eliminate MR plans with more than one TableScanOperator.
+
LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
- return false;
+ return null;
}
int tableScanCount = 0;
- for (Operator<?> op : aliasToWork.values()) {
+ String alias = "";
+ TableScanOperator tableScanOperator = null;
+ for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+ Operator<?> op = entry.getValue();
if (op == null) {
LOG.warn("Map work has invalid aliases to work with. Fail validation!");
- return false;
+ return null;
}
if (op instanceof TableScanOperator) {
tableScanCount++;
+ alias = entry.getKey();
+ tableScanOperator = (TableScanOperator) op;
}
}
if (tableScanCount > 1) {
- LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!");
- return false;
+ LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!");
+ return null;
+ }
+ return new ImmutablePair(alias, tableScanOperator);
+ }
+
+ private void getTableScanOperatorSchemaInfo(TableScanOperator tableScanOperator,
+ List<String> logicalColumnNameList, List<TypeInfo> logicalTypeInfoList) {
+
+ TableScanDesc tableScanDesc = tableScanOperator.getConf();
+
+ // Add all non-virtual columns to make a vectorization context for
+ // the TableScan operator.
+ RowSchema rowSchema = tableScanOperator.getSchema();
+ for (ColumnInfo c : rowSchema.getSignature()) {
+ // Validation will later exclude vectorization of virtual columns usage (HIVE-5560).
+ if (!isVirtualColumn(c)) {
+ String columnName = c.getInternalName();
+ String typeName = c.getTypeName();
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+ logicalColumnNameList.add(columnName);
+ logicalTypeInfoList.add(typeInfo);
+ }
+ }
+ }
+
+ private String getColumns(List<String> columnNames, int start, int length,
+ Character separator) {
+ return Joiner.on(separator).join(columnNames.subList(start, start + length));
+ }
+
+ private String getTypes(List<TypeInfo> typeInfos, int start, int length) {
+ return TypeInfoUtils.getTypesString(typeInfos.subList(start, start + length));
+ }
+
+ private boolean verifyAndSetVectorPartDesc(PartitionDesc pd) {
+
+ // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface
+ // and reads VectorizedRowBatch as a "row".
+
+ if (Utilities.isInputFileFormatVectorized(pd)) {
+
+ pd.setVectorPartitionDesc(VectorPartitionDesc.createVectorizedInputFileFormat());
+
+ return true;
}
+ LOG.info("Input format: " + pd.getInputFileFormatClassName()
+ + ", doesn't provide vectorized input");
+
+ return false;
+ }
+
+ private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
+ TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) {
+
+ // These names/types are the data columns plus partition columns.
+ final List<String> allColumnNameList = new ArrayList<String>();
+ final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>();
+
+ getTableScanOperatorSchemaInfo(tableScanOperator, allColumnNameList, allTypeInfoList);
+ final int allColumnCount = allColumnNameList.size();
+
+ // Validate input format and schema evolution capability.
+
+ // For the table, enter a null value in the multi-key map indicating no conversion necessary
+ // if the schema matches the table.
+
+ HashMap<ImmutablePair, boolean[]> conversionMap = new HashMap<ImmutablePair, boolean[]>();
+
+ boolean isFirst = true;
+ int dataColumnCount = 0;
+ int partitionColumnCount = 0;
+
+ List<String> dataColumnList = null;
+ String dataColumnsString = "";
+ List<TypeInfo> dataTypeInfoList = null;
+
// Validate the input format
- for (String path : mapWork.getPathToPartitionInfo().keySet()) {
- PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
- List<Class<?>> interfaceList =
- Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
- if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
- LOG.info("Input format: " + pd.getInputFileFormatClassName()
- + ", doesn't provide vectorized input");
+ VectorPartitionConversion partitionConversion = new VectorPartitionConversion();
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = mapWork.getPathToAliases();
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+ for (Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) {
+ String path = entry.getKey();
+ List<String> aliases = entry.getValue();
+ boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
+ if (!isPresent) {
+ LOG.info("Alias " + alias + " not present in aliases " + aliases);
return false;
}
+ PartitionDesc partDesc = pathToPartitionInfo.get(path);
+ if (partDesc.getVectorPartitionDesc() != null) {
+ // We seen this already.
+ continue;
+ }
+ if (!verifyAndSetVectorPartDesc(partDesc)) {
+ return false;
+ }
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+ LOG.info("Vectorizer path: " + path + ", read type " +
+ vectorPartDesc.getVectorMapOperatorReadType().name() + ", aliases " + aliases);
+
+ Properties partProps = partDesc.getProperties();
+
+ String nextDataColumnsString =
+ partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+ String[] nextDataColumns = nextDataColumnsString.split(",");
+
+ String nextDataTypesString =
+ partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+
+ // We convert to an array of TypeInfo using a library routine since it parses the information
+ // and can handle use of different separators, etc. We cannot use the raw type string
+ // for comparison in the map because of the different separators used.
+ List<TypeInfo> nextDataTypeInfoList =
+ TypeInfoUtils.getTypeInfosFromTypeString(nextDataTypesString);
+
+ if (isFirst) {
+
+ // We establish with the first one whether the table is partitioned or not.
+
+ LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+ if (partSpec != null && partSpec.size() > 0) {
+ partitionColumnCount = partSpec.size();
+ dataColumnCount = allColumnCount - partitionColumnCount;
+ } else {
+ partitionColumnCount = 0;
+ dataColumnCount = allColumnCount;
+ }
+
+ dataColumnList = allColumnNameList.subList(0, dataColumnCount);
+ dataColumnsString = getColumns(allColumnNameList, 0, dataColumnCount, ',');
+ dataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount);
+
+ // Add the table (non-partitioned) columns and types into the map as not needing
+ // conversion (i.e. null).
+ conversionMap.put(
+ new ImmutablePair(dataColumnsString, dataTypeInfoList), null);
+
+ isFirst = false;
+ }
+
+ ImmutablePair columnNamesAndTypesCombination =
+ new ImmutablePair(nextDataColumnsString, nextDataTypeInfoList);
+
+ boolean[] conversionFlags;
+ if (conversionMap.containsKey(columnNamesAndTypesCombination)) {
+
+ conversionFlags = conversionMap.get(columnNamesAndTypesCombination);
+
+ } else {
+
+ List<String> nextDataColumnList = Arrays.asList(nextDataColumns);
+
+ // Validate the column names that are present are the same. Missing columns will be
+ // implicitly defaulted to null.
+
+ if (nextDataColumnList.size() > dataColumnList.size()) {
+ LOG.info(
+ String.format("Could not vectorize partition %s. The partition column names %d is greater than the number of table columns %d",
+ path, nextDataColumnList.size(), dataColumnList.size()));
+ return false;
+ }
+ for (int i = 0; i < nextDataColumnList.size(); i++) {
+ String nextColumnName = nextDataColumnList.get(i);
+ String tableColumnName = dataColumnList.get(i);
+ if (!nextColumnName.equals(tableColumnName)) {
+ LOG.info(
+ String.format("Could not vectorize partition %s. The partition column name %s is does not match table column name %s",
+ path, nextColumnName, tableColumnName));
+ return false;
+ }
+ }
+
+ // The table column types might have been changed with ALTER. There are restrictions
+ // here for vectorization.
+
+ // Some readers / deserializers take responsibility for conversion themselves.
+
+ // If we need to check for conversion, the conversion object may come back null
+ // indicating from a vectorization point of view the conversion is implicit. That is,
+ // all implicit integer upgrades.
+
+ if (vectorPartDesc.getNeedsDataTypeConversionCheck() &&
+ !nextDataTypeInfoList.equals(dataTypeInfoList)) {
+
+ // The results will be in 2 members: validConversion and conversionFlags
+ partitionConversion.validateConversion(nextDataTypeInfoList, dataTypeInfoList);
+ if (!partitionConversion.getValidConversion()) {
+ return false;
+ }
+ conversionFlags = partitionConversion.getResultConversionFlags();
+ } else {
+ conversionFlags = null;
+ }
+
+ // We enter this in our map so we don't have to check again for subsequent partitions.
+
+ conversionMap.put(columnNamesAndTypesCombination, conversionFlags);
+ }
+
+ vectorPartDesc.setConversionFlags(conversionFlags);
+
+ vectorPartDesc.setTypeInfos(nextDataTypeInfoList);
+ }
+
+ vectorTaskColumnInfo.setColumnNames(allColumnNameList);
+ vectorTaskColumnInfo.setTypeInfos(allTypeInfoList);
+ vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount);
+
+ return true;
+ }
+
+ private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez)
+ throws SemanticException {
+
+ LOG.info("Validating MapWork...");
+
+ ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork);
+ if (pair == null) {
+ return false;
}
+ String alias = pair.left;
+ TableScanOperator tableScanOperator = pair.right;
+
+ // This call fills in the column names, types, and partition column count in
+ // vectorTaskColumnInfo.
+ if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) {
+ return false;
+ }
+
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
addMapWorkRules(opRules, vnp);
@@ -437,11 +703,14 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+ private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
+ boolean isTez) throws SemanticException {
+
LOG.info("Vectorizing MapWork...");
mapWork.setVectorMode(true);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez);
+ MapWorkVectorizationNodeProcessor vnp =
+ new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo);
addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderOnceWalker(disp);
@@ -451,9 +720,9 @@ public class Vectorizer implements PhysicalPlanResolver {
HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
ogw.startWalking(topNodes, nodeOutput);
- mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
- mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+ vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+ vectorTaskColumnInfo.transferToBaseWork(mapWork);
if (LOG.isDebugEnabled()) {
debugDisplayAllMaps(mapWork);
@@ -463,13 +732,19 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
- boolean ret = validateReduceWork(reduceWork);
+ VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+ boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
if (ret) {
- vectorizeReduceWork(reduceWork, isTez);
+ vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
}
}
- private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
+
+ ArrayList<String> reduceColumnNames = new ArrayList<String>();
+ ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
+
try {
// Check key ObjectInspector.
ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
@@ -493,9 +768,6 @@ public class Vectorizer implements PhysicalPlanResolver {
StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
- reduceColumnNames = new ArrayList<String>();
- reduceTypeInfos = new ArrayList<TypeInfo>();
-
for (StructField field: keyFields) {
reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
@@ -507,6 +779,10 @@ public class Vectorizer implements PhysicalPlanResolver {
} catch (Exception e) {
throw new SemanticException(e);
}
+
+ vectorTaskColumnInfo.setColumnNames(reduceColumnNames);
+ vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos);
+
return true;
}
@@ -515,11 +791,13 @@ public class Vectorizer implements PhysicalPlanResolver {
opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
}
- private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+ private boolean validateReduceWork(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
LOG.info("Validating ReduceWork...");
// Validate input to ReduceWork.
- if (!getOnlyStructObjectInspectors(reduceWork)) {
+ if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
return false;
}
// Now check the reduce operator tree.
@@ -543,7 +821,9 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+ private void vectorizeReduceWork(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
LOG.info("Vectorizing ReduceWork...");
reduceWork.setVectorMode(true);
@@ -552,7 +832,7 @@ public class Vectorizer implements PhysicalPlanResolver {
// VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
ReduceWorkVectorizationNodeProcessor vnp =
- new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez);
+ new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez);
addReduceWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
@@ -567,9 +847,9 @@ public class Vectorizer implements PhysicalPlanResolver {
// Necessary since we are vectorizing the root operator in reduce.
reduceWork.setReducer(vnp.getRootVectorOp());
- reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
- reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+ vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+ vectorTaskColumnInfo.transferToBaseWork(reduceWork);
if (LOG.isDebugEnabled()) {
debugDisplayAllMaps(reduceWork);
@@ -637,23 +917,11 @@ public class Vectorizer implements PhysicalPlanResolver {
// The vectorization context for the Map or Reduce task.
protected VectorizationContext taskVectorizationContext;
- // The input projection column type name map for the Map or Reduce task.
- protected Map<Integer, String> taskColumnTypeNameMap;
-
VectorizationNodeProcessor() {
- taskColumnTypeNameMap = new HashMap<Integer, String>();
- }
-
- public Map<String, Integer> getVectorColumnNameMap() {
- return taskVectorizationContext.getProjectionColumnMap();
- }
-
- public Map<Integer, String> getVectorColumnTypeMap() {
- return taskColumnTypeNameMap;
}
- public Map<Integer, String> getVectorScratchColumnTypeMap() {
- return taskVectorizationContext.getScratchColumnTypeMap();
+ public String[] getVectorScratchColumnTypeNames() {
+ return taskVectorizationContext.getScratchColumnTypeNames();
}
protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -722,10 +990,15 @@ public class Vectorizer implements PhysicalPlanResolver {
class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+ private final MapWork mWork;
+ private VectorTaskColumnInfo vectorTaskColumnInfo;
private final boolean isTez;
- public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
+ public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+ VectorTaskColumnInfo vectorTaskColumnInfo) {
super();
+ this.mWork = mWork;
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
this.isTez = isTez;
}
@@ -739,8 +1012,7 @@ public class Vectorizer implements PhysicalPlanResolver {
if (op instanceof TableScanOperator) {
if (taskVectorizationContext == null) {
- taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(),
- taskColumnTypeNameMap);
+ taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
}
vContext = taskVectorizationContext;
} else {
@@ -784,10 +1056,9 @@ public class Vectorizer implements PhysicalPlanResolver {
class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
- private final List<String> reduceColumnNames;
- private final List<TypeInfo> reduceTypeInfos;
+ private VectorTaskColumnInfo vectorTaskColumnInfo;
- private final boolean isTez;
+ private boolean isTez;
private Operator<? extends OperatorDesc> rootVectorOp;
@@ -795,11 +1066,11 @@ public class Vectorizer implements PhysicalPlanResolver {
return rootVectorOp;
}
- public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
- List<TypeInfo> reduceTypeInfos, boolean isTez) {
+ public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
+ boolean isTez) {
+
super();
- this.reduceColumnNames = reduceColumnNames;
- this.reduceTypeInfos = reduceTypeInfos;
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
rootVectorOp = null;
this.isTez = isTez;
}
@@ -815,15 +1086,11 @@ public class Vectorizer implements PhysicalPlanResolver {
boolean saveRootVectorOp = false;
if (op.getParentOperators().size() == 0) {
- LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+ LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString());
- vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames);
+ vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.columnNames);
taskVectorizationContext = vContext;
- int i = 0;
- for (TypeInfo typeInfo : reduceTypeInfos) {
- taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
- i++;
- }
+
saveRootVectorOp = true;
if (LOG.isDebugEnabled()) {
@@ -887,6 +1154,7 @@ public class Vectorizer implements PhysicalPlanResolver {
@Override
public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
+
hiveConf = physicalContext.getConf();
boolean vectorPath = HiveConf.getBoolVar(hiveConf,
@@ -1026,65 +1294,6 @@ public class Vectorizer implements PhysicalPlanResolver {
return false;
}
- String columns = "";
- String types = "";
- String partitionColumns = "";
- String partitionTypes = "";
- boolean haveInfo = false;
-
- // This over-reaches slightly, since we can have > 1 table-scan per map-work.
- // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate.
- // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later.
- LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo();
-
- // For vectorization, compare each partition information for against the others.
- // We assume the table information will be from one of the partitions, so it will
- // work to focus on the partition information and not compare against the TableScanOperator
- // columns (in the VectorizationContext)....
- for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
- PartitionDesc partDesc = entry.getValue();
- if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
- // No partition information -- we match because we would default to using the table description.
- continue;
- }
- Properties partProps = partDesc.getProperties();
- if (!haveInfo) {
- columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- haveInfo = true;
- } else {
- String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- if (!columns.equalsIgnoreCase(nextColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column names %s do not match the other column names %s",
- entry.getKey(), nextColumns, columns));
- return false;
- }
- if (!types.equalsIgnoreCase(nextTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column types %s do not match the other column types %s",
- entry.getKey(), nextTypes, types));
- return false;
- }
- if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its partition column names %s do not match the other partition column names %s",
- entry.getKey(), nextPartitionColumns, partitionColumns));
- return false;
- }
- if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its partition column types %s do not match the other partition column types %s",
- entry.getKey(), nextPartitionTypes, partitionTypes));
- return false;
- }
- }
- }
return true;
}
@@ -1440,23 +1649,10 @@ public class Vectorizer implements PhysicalPlanResolver {
return result;
}
- private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName,
- Map<Integer, String> typeNameMap) {
+ private VectorizationContext getVectorizationContext(String contextName,
+ VectorTaskColumnInfo vectorTaskColumnInfo) {
- VectorizationContext vContext = new VectorizationContext(contextName);
-
- // Add all non-virtual columns to make a vectorization context for
- // the TableScan operator.
- int i = 0;
- for (ColumnInfo c : rowSchema.getSignature()) {
- // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
- if (!isVirtualColumn(c)) {
- vContext.addInitialColumn(c.getInternalName());
- typeNameMap.put(i, c.getTypeName());
- i++;
- }
- }
- vContext.finishedAddingInitialColumns();
+ VectorizationContext vContext = new VectorizationContext(contextName, vectorTaskColumnInfo.columnNames);
return vContext;
}
@@ -2011,12 +2207,16 @@ public class Vectorizer implements PhysicalPlanResolver {
public void debugDisplayAllMaps(BaseWork work) {
- Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
- Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
- Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap();
+ VectorizedRowBatchCtx vectorizedRowBatchCtx = work.getVectorizedRowBatchCtx();
+
+ String[] columnNames = vectorizedRowBatchCtx.getRowColumnNames();
+ Object columnTypeInfos = vectorizedRowBatchCtx.getRowColumnTypeInfos();
+ int partitionColumnCount = vectorizedRowBatchCtx.getPartitionColumnCount();
+ String[] scratchColumnTypeNames =vectorizedRowBatchCtx.getScratchColumnTypeNames();
- LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
- LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
- LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString());
+ LOG.debug("debugDisplayAllMaps columnNames " + Arrays.toString(columnNames));
+ LOG.debug("debugDisplayAllMaps columnTypeInfos " + Arrays.deepToString((Object[]) columnTypeInfos));
+ LOG.debug("debugDisplayAllMaps partitionColumnCount " + partitionColumnCount);
+ LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " + Arrays.toString(scratchColumnTypeNames));
}
}