You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/14 00:20:07 UTC
[46/57] [abbrv] [partial] hive git commit: HIVE-11394: Enhance
EXPLAIN display for vectorization (Matt McCline,
reviewed by Gopal Vijayaraghavan)
http://git-wip-us.apache.org/repos/asf/hive/blob/f923db0b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 3a179a3..6167f48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
import java.io.Serializable;
+import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -33,6 +34,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.regex.Pattern;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.calcite.util.Pair;
import org.apache.commons.lang.ArrayUtils;
@@ -43,6 +45,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -62,7 +66,11 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOpe
import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator;
import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator;
import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
+import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
@@ -73,6 +81,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -91,18 +100,36 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFilterDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc;
+import org.apache.hadoop.hive.ql.plan.VectorizationCondition;
import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode;
+import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorLimitDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo;
import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
+import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -117,10 +144,13 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.OperatorVariation;
import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -170,6 +200,9 @@ import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.NullStructSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -182,6 +215,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hive.common.util.AnnotationUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.ReflectionUtil;
import com.google.common.base.Preconditions;
@@ -234,12 +270,39 @@ public class Vectorizer implements PhysicalPlanResolver {
private boolean isSpark;
- boolean useVectorizedInputFileFormat;
- boolean useVectorDeserialize;
- boolean useRowDeserialize;
+ private boolean useVectorizedInputFileFormat;
+ private boolean useVectorDeserialize;
+ private boolean useRowDeserialize;
+ private boolean isReduceVectorizationEnabled;
boolean isSchemaEvolution;
+ private BaseWork currentBaseWork;
+ private Operator<? extends OperatorDesc> currentOperator;
+
+ public void testSetCurrentBaseWork(BaseWork testBaseWork) {
+ currentBaseWork = testBaseWork;
+ }
+
+ private void setNodeIssue(String issue) {
+ currentBaseWork.setNotVectorizedReason(
+ VectorizerReason.createNodeIssue(issue));
+ }
+
+ private void setOperatorIssue(String issue) {
+ currentBaseWork.setNotVectorizedReason(
+ VectorizerReason.createOperatorIssue(currentOperator, issue));
+ }
+
+ private void setExpressionIssue(String expressionTitle, String issue) {
+ currentBaseWork.setNotVectorizedReason(
+ VectorizerReason.createExpressionIssue(currentOperator, expressionTitle, issue));
+ }
+
+ private void clearNotVectorizedReason() {
+ currentBaseWork.setNotVectorizedReason(null);
+ }
+
public Vectorizer() {
supportedGenericUDFs.add(GenericUDFOPPlus.class);
@@ -369,6 +432,10 @@ public class Vectorizer implements PhysicalPlanResolver {
int partitionColumnCount;
boolean useVectorizedInputFileFormat;
+ boolean groupByVectorOutput;
+ boolean allNative;
+ boolean usesVectorUDFAdaptor;
+
String[] scratchTypeNameArray;
Set<Operator<? extends OperatorDesc>> nonVectorizedOps;
@@ -379,6 +446,12 @@ public class Vectorizer implements PhysicalPlanResolver {
partitionColumnCount = 0;
}
+ public void assume() {
+ groupByVectorOutput = true;
+ allNative = true;
+ usesVectorUDFAdaptor = false;
+ }
+
public void setAllColumnNames(List<String> allColumnNames) {
this.allColumnNames = allColumnNames;
}
@@ -394,9 +467,19 @@ public class Vectorizer implements PhysicalPlanResolver {
public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
this.scratchTypeNameArray = scratchTypeNameArray;
}
+ public void setGroupByVectorOutput(boolean groupByVectorOutput) {
+ this.groupByVectorOutput = groupByVectorOutput;
+ }
+ public void setAllNative(boolean allNative) {
+ this.allNative = allNative;
+ }
+ public void setUsesVectorUDFAdaptor(boolean usesVectorUDFAdaptor) {
+ this.usesVectorUDFAdaptor = usesVectorUDFAdaptor;
+ }
public void setUseVectorizedInputFileFormat(boolean useVectorizedInputFileFormat) {
this.useVectorizedInputFileFormat = useVectorizedInputFileFormat;
}
+
public void setNonVectorizedOps(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
this.nonVectorizedOps = nonVectorizedOps;
}
@@ -428,7 +511,14 @@ public class Vectorizer implements PhysicalPlanResolver {
scratchTypeNameArray);
baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
- baseWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat);
+ if (baseWork instanceof MapWork) {
+ MapWork mapWork = (MapWork) baseWork;
+ mapWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat);
+ }
+
+ baseWork.setAllNative(allNative);
+ baseWork.setGroupByVectorOutput(groupByVectorOutput);
+ baseWork.setUsesVectorUDFAdaptor(usesVectorUDFAdaptor);
}
}
@@ -445,17 +535,29 @@ public class Vectorizer implements PhysicalPlanResolver {
throws SemanticException {
Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
if (currTask instanceof MapRedTask) {
- convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
+ MapredWork mapredWork = ((MapRedTask) currTask).getWork();
+ convertMapWork(mapredWork.getMapWork(), false);
+ ReduceWork reduceWork = mapredWork.getReduceWork();
+ if (reduceWork != null) {
+ // Always set the EXPLAIN conditions.
+ setReduceWorkExplainConditions(reduceWork);
+
+ // We do not vectorize MR Reduce.
+ }
} else if (currTask instanceof TezTask) {
TezWork work = ((TezTask) currTask).getWork();
- for (BaseWork w: work.getAllWork()) {
- if (w instanceof MapWork) {
- convertMapWork((MapWork) w, true);
- } else if (w instanceof ReduceWork) {
- // We are only vectorizing Reduce under Tez.
- if (HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
- convertReduceWork((ReduceWork) w, true);
+ for (BaseWork baseWork: work.getAllWork()) {
+ if (baseWork instanceof MapWork) {
+ convertMapWork((MapWork) baseWork, true);
+ } else if (baseWork instanceof ReduceWork) {
+ ReduceWork reduceWork = (ReduceWork) baseWork;
+
+ // Always set the EXPLAIN conditions.
+ setReduceWorkExplainConditions(reduceWork);
+
+ // We are only vectorizing Reduce under Tez/Spark.
+ if (isReduceVectorizationEnabled) {
+ convertReduceWork(reduceWork);
}
}
}
@@ -463,22 +565,43 @@ public class Vectorizer implements PhysicalPlanResolver {
SparkWork sparkWork = (SparkWork) currTask.getWork();
for (BaseWork baseWork : sparkWork.getAllWork()) {
if (baseWork instanceof MapWork) {
- convertMapWork((MapWork) baseWork, false);
- } else if (baseWork instanceof ReduceWork
- && HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
- convertReduceWork((ReduceWork) baseWork, false);
+ convertMapWork((MapWork) baseWork, true);
+ } else if (baseWork instanceof ReduceWork) {
+ ReduceWork reduceWork = (ReduceWork) baseWork;
+
+ // Always set the EXPLAIN conditions.
+ setReduceWorkExplainConditions(reduceWork);
+
+ if (isReduceVectorizationEnabled) {
+ convertReduceWork(reduceWork);
+ }
}
}
}
+
return null;
}
- private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+ private void convertMapWork(MapWork mapWork, boolean isTezOrSpark) throws SemanticException {
+
+ mapWork.setVectorizationExamined(true);
+
+ // Global used when setting errors, etc.
+ currentBaseWork = mapWork;
+
VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
- boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
+ vectorTaskColumnInfo.assume();
+
+ boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
if (ret) {
- vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
+ vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
+ } else if (currentBaseWork.getVectorizationEnabled()) {
+ VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason();
+ if (notVectorizedReason == null) {
+ LOG.info("Cannot vectorize: unknown");
+ } else {
+ LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+ }
}
}
@@ -499,6 +622,7 @@ public class Vectorizer implements PhysicalPlanResolver {
LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
+ setNodeIssue("Vectorized map work requires work");
return null;
}
int tableScanCount = 0;
@@ -507,7 +631,7 @@ public class Vectorizer implements PhysicalPlanResolver {
for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
Operator<?> op = entry.getValue();
if (op == null) {
- LOG.warn("Map work has invalid aliases to work with. Fail validation!");
+ setNodeIssue("Vectorized map work requires a valid alias");
return null;
}
if (op instanceof TableScanOperator) {
@@ -517,7 +641,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
}
if (tableScanCount > 1) {
- LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!");
+ setNodeIssue("Vectorized map work only works with 1 TableScanOperator");
return null;
}
return new ImmutablePair(alias, tableScanOperator);
@@ -558,22 +682,6 @@ public class Vectorizer implements PhysicalPlanResolver {
}
}
- private String getHiveOptionsString() {
- StringBuilder sb = new StringBuilder();
- sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
- sb.append("=");
- sb.append(useVectorizedInputFileFormat);
- sb.append(", ");
- sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
- sb.append("=");
- sb.append(useVectorDeserialize);
- sb.append(", and ");
- sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
- sb.append("=");
- sb.append(useRowDeserialize);
- return sb.toString();
- }
-
/*
* There are 3 modes of reading for vectorization:
*
@@ -588,44 +696,58 @@ public class Vectorizer implements PhysicalPlanResolver {
* the row object into the VectorizedRowBatch with VectorAssignRow.
* This picks up Input File Format not supported by the other two.
*/
- private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable) {
+ private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable,
+ HashSet<String> inputFileFormatClassNameSet, HashSet<String> enabledConditionsMetSet,
+ ArrayList<String> enabledConditionsNotMetList) {
String inputFileFormatClassName = pd.getInputFileFormatClassName();
+ // Always collect input file formats.
+ inputFileFormatClassNameSet.add(inputFileFormatClassName);
+
+ boolean isInputFileFormatVectorized = Utilities.isInputFileFormatVectorized(pd);
+
+ if (isAcidTable) {
+
+ // Today, ACID tables are only ORC and that format is vectorizable. Verify these
+ // assumptions.
+ Preconditions.checkState(isInputFileFormatVectorized);
+ Preconditions.checkState(inputFileFormatClassName.equals(OrcInputFormat.class.getName()));
+
+ if (!useVectorizedInputFileFormat) {
+ enabledConditionsNotMetList.add(
+ "Vectorizing ACID tables requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+ return false;
+ }
+
+ pd.setVectorPartitionDesc(
+ VectorPartitionDesc.createVectorizedInputFileFormat(
+ inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd)));
+
+ enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+ return true;
+ }
+
// Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface
// and reads VectorizedRowBatch as a "row".
- if (isAcidTable || useVectorizedInputFileFormat) {
+ if (useVectorizedInputFileFormat) {
- if (Utilities.isInputFileFormatVectorized(pd)) {
-
- if (!useVectorizedInputFileFormat) {
- LOG.info("ACID tables con only be vectorized for the input file format -- " +
- "i.e. when Hive Configuration option " +
- HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname +
- "=true");
- return false;
- }
+ if (isInputFileFormatVectorized) {
pd.setVectorPartitionDesc(
VectorPartitionDesc.createVectorizedInputFileFormat(
inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd)));
+ enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
return true;
}
-
- // Today, ACID tables are only ORC and that format is vectorizable. Verify this
- // assumption.
- Preconditions.checkState(!isAcidTable);
+ // Fall through and look for other options...
}
- if (!(isSchemaEvolution || isAcidTable) &&
- (useVectorDeserialize || useRowDeserialize)) {
- LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
- " when both " + HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION.varname + "=false and " +
- " ACID table is " + isAcidTable + " and " +
- " given the Hive Configuration options " + getHiveOptionsString());
- return false;
+ if (!isSchemaEvolution) {
+ enabledConditionsNotMetList.add(
+ "Vectorizing tables without Schema Evolution requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
}
String deserializerClassName = pd.getDeserializerClassName();
@@ -635,6 +757,12 @@ public class Vectorizer implements PhysicalPlanResolver {
//
// Do the "vectorized" row-by-row deserialization into a VectorizedRowBatch in the
// VectorMapOperator.
+ boolean isTextFormat = inputFileFormatClassName.equals(TextInputFormat.class.getName()) &&
+ deserializerClassName.equals(LazySimpleSerDe.class.getName());
+ boolean isSequenceFormat =
+ inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) &&
+ deserializerClassName.equals(LazyBinarySerDe.class.getName());
+ boolean isVectorDeserializeEligable = isTextFormat || isSequenceFormat;
if (useVectorDeserialize) {
@@ -648,8 +776,7 @@ public class Vectorizer implements PhysicalPlanResolver {
// org.apache.hadoop.mapred.SequenceFileInputFormat
// org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
- if (inputFileFormatClassName.equals(TextInputFormat.class.getName()) &&
- deserializerClassName.equals(LazySimpleSerDe.class.getName())) {
+ if (isTextFormat) {
Properties properties = pd.getTableDesc().getProperties();
String lastColumnTakesRestString =
@@ -659,10 +786,11 @@ public class Vectorizer implements PhysicalPlanResolver {
lastColumnTakesRestString.equalsIgnoreCase("true"));
if (lastColumnTakesRest) {
- // If row mode will not catch this, then inform.
+ // If row mode will not catch this input file format, then not enabled.
if (useRowDeserialize) {
- LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
- " when " + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + "is true");
+ enabledConditionsNotMetList.add(
+ inputFileFormatClassName + " " +
+ serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + " must be disabled ");
return false;
}
} else {
@@ -670,17 +798,19 @@ public class Vectorizer implements PhysicalPlanResolver {
VectorPartitionDesc.createVectorDeserialize(
inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE));
+ enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
return true;
}
- } else if (inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) &&
- deserializerClassName.equals(LazyBinarySerDe.class.getName())) {
+ } else if (isSequenceFormat) {
pd.setVectorPartitionDesc(
VectorPartitionDesc.createVectorDeserialize(
inputFileFormatClassName, VectorDeserializeType.LAZY_BINARY));
+ enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
return true;
}
+ // Fall through and look for other options...
}
// Otherwise, if enabled, deserialize rows using regular Serde and add the object
@@ -694,17 +824,29 @@ public class Vectorizer implements PhysicalPlanResolver {
Utilities.isInputFileFormatSelfDescribing(pd),
deserializerClassName));
+ enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
return true;
}
- LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
- " given the Hive Configuration options " + getHiveOptionsString());
-
+ if (isInputFileFormatVectorized) {
+ Preconditions.checkState(!useVectorizedInputFileFormat);
+ enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+ } else {
+ // Only offer these when the input file format is not the fast vectorized formats.
+ if (isVectorDeserializeEligable) {
+ Preconditions.checkState(!useVectorDeserialize);
+ enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
+ } else {
+ // Since row mode takes everyone.
+ enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
+ }
+ }
+
return false;
}
- private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
+ private ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo)
throws SemanticException {
@@ -732,27 +874,39 @@ public class Vectorizer implements PhysicalPlanResolver {
LinkedHashMap<Path, ArrayList<String>> pathToAliases = mapWork.getPathToAliases();
LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+
+ // Remember the input file formats we validated and why.
+ HashSet<String> inputFileFormatClassNameSet = new HashSet<String>();
+ HashSet<String> enabledConditionsMetSet = new HashSet<String>();
+ ArrayList<String> enabledConditionsNotMetList = new ArrayList<String>();
+
for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) {
Path path = entry.getKey();
List<String> aliases = entry.getValue();
boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
if (!isPresent) {
- LOG.info("Alias " + alias + " not present in aliases " + aliases);
- return false;
+ setOperatorIssue("Alias " + alias + " not present in aliases " + aliases);
+ return new ImmutablePair<Boolean,Boolean>(false, false);
}
PartitionDesc partDesc = pathToPartitionInfo.get(path);
if (partDesc.getVectorPartitionDesc() != null) {
// We seen this already.
continue;
}
- if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable)) {
- return false;
+ if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable, inputFileFormatClassNameSet,
+ enabledConditionsMetSet, enabledConditionsNotMetList)) {
+
+ // Always set these so EXPLAIN can see.
+ mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet);
+ mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet));
+ mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList);
+
+ // We consider this an enable issue, not a not vectorized issue.
+ LOG.info("Cannot enable vectorization because input file format(s) " + inputFileFormatClassNameSet +
+ " do not met conditions " + VectorizationCondition.addBooleans(enabledConditionsNotMetList, false));
+ return new ImmutablePair<Boolean,Boolean>(false, true);
}
VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
- if (LOG.isInfoEnabled()) {
- LOG.info("Vectorizer path: " + path + ", " + vectorPartDesc.toString() +
- ", aliases " + aliases);
- }
if (isFirst) {
@@ -796,13 +950,13 @@ public class Vectorizer implements PhysicalPlanResolver {
* implicitly defaulted to null.
*/
if (nextDataColumnList.size() > tableDataColumnList.size()) {
- LOG.info(
+ setOperatorIssue(
String.format(
"Could not vectorize partition %s " +
"(deserializer " + deserializer.getClass().getName() + ")" +
"The partition column names %d is greater than the number of table columns %d",
path, nextDataColumnList.size(), tableDataColumnList.size()));
- return false;
+ return new ImmutablePair<Boolean,Boolean>(false, false);
}
if (!(deserializer instanceof NullStructSerDe)) {
@@ -811,13 +965,13 @@ public class Vectorizer implements PhysicalPlanResolver {
String nextColumnName = nextDataColumnList.get(i);
String tableColumnName = tableDataColumnList.get(i);
if (!nextColumnName.equals(tableColumnName)) {
- LOG.info(
+ setOperatorIssue(
String.format(
"Could not vectorize partition %s " +
"(deserializer " + deserializer.getClass().getName() + ")" +
"The partition column name %s is does not match table column name %s",
path, nextColumnName, tableColumnName));
- return false;
+ return new ImmutablePair<Boolean,Boolean>(false, false);
}
}
}
@@ -852,29 +1006,50 @@ public class Vectorizer implements PhysicalPlanResolver {
// Helps to keep this for debugging.
vectorTaskColumnInfo.setTableScanOperator(tableScanOperator);
- return true;
+ // Always set these so EXPLAIN can see.
+ mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet);
+ mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet));
+ mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList);
+
+ return new ImmutablePair<Boolean,Boolean>(true, false);
}
- private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez)
+ private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTezOrSpark)
throws SemanticException {
LOG.info("Validating MapWork...");
- ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork);
- if (pair == null) {
+ ImmutablePair<String,TableScanOperator> onlyOneTableScanPair = verifyOnlyOneTableScanOperator(mapWork);
+ if (onlyOneTableScanPair == null) {
+ VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason();
+ Preconditions.checkState(notVectorizedReason != null);
+ mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()}));
return false;
}
- String alias = pair.left;
- TableScanOperator tableScanOperator = pair.right;
+ String alias = onlyOneTableScanPair.left;
+ TableScanOperator tableScanOperator = onlyOneTableScanPair.right;
// This call fills in the column names, types, and partition column count in
// vectorTaskColumnInfo.
- if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) {
+ currentOperator = tableScanOperator;
+ ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolutionPair =
+ validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo);
+ if (!validateInputFormatAndSchemaEvolutionPair.left) {
+ // Have we already set the enabled conditions not met?
+ if (!validateInputFormatAndSchemaEvolutionPair.right) {
+ VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason();
+ Preconditions.checkState(notVectorizedReason != null);
+ mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()}));
+ }
return false;
}
+ // Now we are enabled and any issues found from here on out are considered
+ // not vectorized issues.
+ mapWork.setVectorizationEnabled(true);
+
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
+ MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTezOrSpark);
addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -896,13 +1071,13 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
- boolean isTez) throws SemanticException {
+ boolean isTezOrSpark) throws SemanticException {
LOG.info("Vectorizing MapWork...");
mapWork.setVectorMode(true);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
MapWorkVectorizationNodeProcessor vnp =
- new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo);
+ new MapWorkVectorizationNodeProcessor(mapWork, isTezOrSpark, vectorTaskColumnInfo);
addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderOnceWalker(disp);
@@ -923,11 +1098,34 @@ public class Vectorizer implements PhysicalPlanResolver {
return;
}
- private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+ private void setReduceWorkExplainConditions(ReduceWork reduceWork) {
+
+ reduceWork.setVectorizationExamined(true);
+
+ reduceWork.setReduceVectorizationEnabled(isReduceVectorizationEnabled);
+ reduceWork.setVectorReduceEngine(
+ HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE));
+ }
+
+ private void convertReduceWork(ReduceWork reduceWork) throws SemanticException {
+
+ // Global used when setting errors, etc.
+ currentBaseWork = reduceWork;
+ currentBaseWork.setVectorizationEnabled(true);
+
VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
- boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
+ vectorTaskColumnInfo.assume();
+
+ boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo);
if (ret) {
- vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
+ vectorizeReduceWork(reduceWork, vectorTaskColumnInfo);
+ } else if (currentBaseWork.getVectorizationEnabled()) {
+ VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason();
+ if (notVectorizedReason == null) {
+ LOG.info("Cannot vectorize: unknown");
+ } else {
+ LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+ }
}
}
@@ -941,13 +1139,14 @@ public class Vectorizer implements PhysicalPlanResolver {
// Check key ObjectInspector.
ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+ setNodeIssue("Key object inspector missing or not StructObjectInspector");
return false;
}
StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
- // Tez doesn't use tagging...
if (reduceWork.getNeedsTagging()) {
+ setNodeIssue("Tez doesn't use tagging");
return false;
}
@@ -955,6 +1154,7 @@ public class Vectorizer implements PhysicalPlanResolver {
ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
if (valueObjectInspector == null ||
!(valueObjectInspector instanceof StructObjectInspector)) {
+ setNodeIssue("Value object inspector missing or not StructObjectInspector");
return false;
}
StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
@@ -984,7 +1184,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private boolean validateReduceWork(ReduceWork reduceWork,
- VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+ VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
LOG.info("Validating ReduceWork...");
@@ -1015,7 +1215,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void vectorizeReduceWork(ReduceWork reduceWork,
- VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+ VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
LOG.info("Vectorizing ReduceWork...");
reduceWork.setVectorMode(true);
@@ -1025,7 +1225,7 @@ public class Vectorizer implements PhysicalPlanResolver {
// VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
ReduceWorkVectorizationNodeProcessor vnp =
- new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez);
+ new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo);
addReduceWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
@@ -1053,7 +1253,7 @@ public class Vectorizer implements PhysicalPlanResolver {
class MapWorkValidationNodeProcessor implements NodeProcessor {
private final MapWork mapWork;
- private final boolean isTez;
+ private final boolean isTezOrSpark;
// Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs.
protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps =
@@ -1063,9 +1263,9 @@ public class Vectorizer implements PhysicalPlanResolver {
return nonVectorizedOps;
}
- public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) {
+ public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTezOrSpark) {
this.mapWork = mapWork;
- this.isTez = isTez;
+ this.isTezOrSpark = isTezOrSpark;
}
@Override
@@ -1077,13 +1277,13 @@ public class Vectorizer implements PhysicalPlanResolver {
return new Boolean(true);
}
boolean ret;
+ currentOperator = op;
try {
- ret = validateMapWorkOperator(op, mapWork, isTez);
+ ret = validateMapWorkOperator(op, mapWork, isTezOrSpark);
} catch (Exception e) {
throw new SemanticException(e);
}
if (!ret) {
- LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
return new Boolean(false);
}
// When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't
@@ -1119,9 +1319,9 @@ public class Vectorizer implements PhysicalPlanResolver {
if (nonVectorizedOps.contains(op)) {
return new Boolean(true);
}
+ currentOperator = op;
boolean ret = validateReduceWorkOperator(op);
if (!ret) {
- LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
return new Boolean(false);
}
// When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't
@@ -1142,9 +1342,12 @@ public class Vectorizer implements PhysicalPlanResolver {
// The vectorization context for the Map or Reduce task.
protected VectorizationContext taskVectorizationContext;
+ protected final VectorTaskColumnInfo vectorTaskColumnInfo;
protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps;
- VectorizationNodeProcessor(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
+ VectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
+ Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
this.nonVectorizedOps = nonVectorizedOps;
}
@@ -1192,11 +1395,11 @@ public class Vectorizer implements PhysicalPlanResolver {
}
public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
- VectorizationContext vContext, boolean isTez) throws SemanticException {
+ VectorizationContext vContext, boolean isTezOrSpark) throws SemanticException {
Operator<? extends OperatorDesc> vectorOp = op;
try {
if (!opsDone.contains(op)) {
- vectorOp = vectorizeOperator(op, vContext, isTez);
+ vectorOp = vectorizeOperator(op, vContext, isTezOrSpark, vectorTaskColumnInfo);
opsDone.add(op);
if (vectorOp != op) {
opToVectorOpMap.put(op, vectorOp);
@@ -1220,14 +1423,14 @@ public class Vectorizer implements PhysicalPlanResolver {
private final MapWork mWork;
private final VectorTaskColumnInfo vectorTaskColumnInfo;
- private final boolean isTez;
+ private final boolean isTezOrSpark;
- public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+ public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTezOrSpark,
VectorTaskColumnInfo vectorTaskColumnInfo) {
- super(vectorTaskColumnInfo.getNonVectorizedOps());
+ super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps());
this.mWork = mWork;
this.vectorTaskColumnInfo = vectorTaskColumnInfo;
- this.isTez = isTez;
+ this.isTezOrSpark = isTezOrSpark;
}
@Override
@@ -1241,6 +1444,7 @@ public class Vectorizer implements PhysicalPlanResolver {
VectorizationContext vContext = null;
+ currentOperator = op;
if (op instanceof TableScanOperator) {
if (taskVectorizationContext == null) {
taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
@@ -1261,7 +1465,7 @@ public class Vectorizer implements PhysicalPlanResolver {
+ " using vectorization context" + vContext.toString());
}
- Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
+ Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTezOrSpark);
if (LOG.isDebugEnabled()) {
if (vectorOp instanceof VectorizationContextRegion) {
@@ -1279,7 +1483,6 @@ public class Vectorizer implements PhysicalPlanResolver {
private final VectorTaskColumnInfo vectorTaskColumnInfo;
- private final boolean isTez;
private Operator<? extends OperatorDesc> rootVectorOp;
@@ -1287,13 +1490,11 @@ public class Vectorizer implements PhysicalPlanResolver {
return rootVectorOp;
}
- public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
- boolean isTez) {
+ public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo) {
- super(vectorTaskColumnInfo.getNonVectorizedOps());
+ super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps());
this.vectorTaskColumnInfo = vectorTaskColumnInfo;
rootVectorOp = null;
- this.isTez = isTez;
}
@Override
@@ -1309,6 +1510,7 @@ public class Vectorizer implements PhysicalPlanResolver {
boolean saveRootVectorOp = false;
+ currentOperator = op;
if (op.getParentOperators().size() == 0) {
LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
@@ -1333,7 +1535,7 @@ public class Vectorizer implements PhysicalPlanResolver {
assert vContext != null;
LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
- Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
+ Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, true);
if (LOG.isDebugEnabled()) {
if (vectorOp instanceof VectorizationContextRegion) {
@@ -1390,6 +1592,10 @@ public class Vectorizer implements PhysicalPlanResolver {
HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE);
+ isReduceVectorizationEnabled =
+ HiveConf.getBoolVar(hiveConf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED);
+
isSchemaEvolution =
HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION);
@@ -1407,18 +1613,32 @@ public class Vectorizer implements PhysicalPlanResolver {
return physicalContext;
}
- boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTez) {
- boolean ret = false;
+ private void setOperatorNotSupported(Operator<? extends OperatorDesc> op) {
+ OperatorDesc desc = op.getConf();
+ Annotation note = AnnotationUtils.getAnnotation(desc.getClass(), Explain.class);
+ if (note != null) {
+ Explain explainNote = (Explain) note;
+ setNodeIssue(explainNote.displayName() + " (" + op.getType() + ") not supported");
+ } else {
+ setNodeIssue("Operator " + op.getType() + " not supported");
+ }
+ }
+
+ boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTezOrSpark) {
+ boolean ret;
switch (op.getType()) {
case MAPJOIN:
if (op instanceof MapJoinOperator) {
ret = validateMapJoinOperator((MapJoinOperator) op);
} else if (op instanceof SMBMapJoinOperator) {
ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+ } else {
+ setOperatorNotSupported(op);
+ ret = false;
}
break;
case GROUPBY:
- ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
+ ret = validateGroupByOperator((GroupByOperator) op, false, isTezOrSpark);
break;
case FILTER:
ret = validateFilterOperator((FilterOperator) op);
@@ -1443,6 +1663,7 @@ public class Vectorizer implements PhysicalPlanResolver {
validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
break;
default:
+ setOperatorNotSupported(op);
ret = false;
break;
}
@@ -1450,7 +1671,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) {
- boolean ret = false;
+ boolean ret;
switch (op.getType()) {
case MAPJOIN:
// Does MAPJOIN actually get planned in Reduce?
@@ -1458,6 +1679,9 @@ public class Vectorizer implements PhysicalPlanResolver {
ret = validateMapJoinOperator((MapJoinOperator) op);
} else if (op instanceof SMBMapJoinOperator) {
ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+ } else {
+ setOperatorNotSupported(op);
+ ret = false;
}
break;
case GROUPBY:
@@ -1465,6 +1689,7 @@ public class Vectorizer implements PhysicalPlanResolver {
HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
ret = validateGroupByOperator((GroupByOperator) op, true, true);
} else {
+ setNodeIssue("Operator " + op.getType() + " not enabled (" + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED.name() + "=true IS false)");
ret = false;
}
break;
@@ -1490,6 +1715,7 @@ public class Vectorizer implements PhysicalPlanResolver {
validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
break;
default:
+ setOperatorNotSupported(op);
ret = false;
break;
}
@@ -1512,7 +1738,7 @@ public class Vectorizer implements PhysicalPlanResolver {
throws SemanticException {
if (op.getType().equals(OperatorType.GROUPBY)) {
GroupByDesc desc = (GroupByDesc) op.getConf();
- return !desc.getVectorDesc().isVectorOutput();
+ return !((VectorGroupByDesc) desc.getVectorDesc()).isVectorOutput();
}
return false;
}
@@ -1526,6 +1752,7 @@ public class Vectorizer implements PhysicalPlanResolver {
private boolean validateTableScanOperator(TableScanOperator op, MapWork mWork) {
TableScanDesc desc = op.getConf();
if (desc.isGatherStats()) {
+ setOperatorIssue("gather stats not supported");
return false;
}
@@ -1540,25 +1767,21 @@ public class Vectorizer implements PhysicalPlanResolver {
private boolean validateMapJoinDesc(MapJoinDesc desc) {
byte posBigTable = (byte) desc.getPosBigTable();
List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable);
- if (!validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER)) {
- LOG.info("Cannot vectorize map work filter expression");
+ if (!validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER)) {
return false;
}
List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable);
- if (!validateExprNodeDesc(keyExprs)) {
- LOG.info("Cannot vectorize map work key expression");
+ if (!validateExprNodeDesc(keyExprs, "Key")) {
return false;
}
List<ExprNodeDesc> valueExprs = desc.getExprs().get(posBigTable);
- if (!validateExprNodeDesc(valueExprs)) {
- LOG.info("Cannot vectorize map work value expression");
+ if (!validateExprNodeDesc(valueExprs, "Value")) {
return false;
}
Byte[] order = desc.getTagOrder();
Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable);
- if (!validateExprNodeDesc(smallTableExprs)) {
- LOG.info("Cannot vectorize map work small table expression");
+ if (!validateExprNodeDesc(smallTableExprs, "Small Table")) {
return false;
}
return true;
@@ -1571,24 +1794,23 @@ public class Vectorizer implements PhysicalPlanResolver {
List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag);
List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag);
List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag);
- return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) &&
- validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs);
+ return validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER) &&
+ validateExprNodeDesc(keyExprs, "Key") && validateExprNodeDesc(valueExprs, "Value");
}
private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
List<ExprNodeDesc> valueDesc = op.getConf().getValueCols();
- return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) &&
- validateExprNodeDesc(valueDesc);
+ return validateExprNodeDesc(keyDescs, "Key") && validateExprNodeDesc(partitionDescs, "Partition") &&
+ validateExprNodeDesc(valueDesc, "Value");
}
private boolean validateSelectOperator(SelectOperator op) {
List<ExprNodeDesc> descList = op.getConf().getColList();
for (ExprNodeDesc desc : descList) {
- boolean ret = validateExprNodeDesc(desc);
+ boolean ret = validateExprNodeDesc(desc, "Select");
if (!ret) {
- LOG.info("Cannot vectorize select expression: " + desc.toString());
return false;
}
}
@@ -1597,28 +1819,26 @@ public class Vectorizer implements PhysicalPlanResolver {
private boolean validateFilterOperator(FilterOperator op) {
ExprNodeDesc desc = op.getConf().getPredicate();
- return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
+ return validateExprNodeDesc(desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER);
}
- private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
+ private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTezOrSpark) {
GroupByDesc desc = op.getConf();
- VectorGroupByDesc vectorDesc = desc.getVectorDesc();
if (desc.isGroupingSetsPresent()) {
- LOG.info("Grouping sets not supported in vector mode");
+ setOperatorIssue("Grouping sets not supported");
return false;
}
if (desc.pruneGroupingSetId()) {
- LOG.info("Pruning grouping set id not supported in vector mode");
+ setOperatorIssue("Pruning grouping set id not supported");
return false;
}
if (desc.getMode() != GroupByDesc.Mode.HASH && desc.isDistinct()) {
- LOG.info("DISTINCT not supported in vector mode");
+ setOperatorIssue("DISTINCT not supported");
return false;
}
- boolean ret = validateExprNodeDesc(desc.getKeys());
+ boolean ret = validateExprNodeDesc(desc.getKeys(), "Key");
if (!ret) {
- LOG.info("Cannot vectorize groupby key expression " + desc.getKeys().toString());
return false;
}
@@ -1731,6 +1951,9 @@ public class Vectorizer implements PhysicalPlanResolver {
// If all the aggregation outputs are primitive, we can output VectorizedRowBatch.
// Otherwise, we the rest of the operator tree will be row mode.
+ VectorGroupByDesc vectorDesc = new VectorGroupByDesc();
+ desc.setVectorDesc(vectorDesc);
+
vectorDesc.setVectorOutput(retPair.right);
vectorDesc.setProcessingMode(processingMode);
@@ -1745,14 +1968,15 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
- return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
+ private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, String expressionTitle) {
+ return validateExprNodeDesc(descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION);
}
private boolean validateExprNodeDesc(List<ExprNodeDesc> descs,
+ String expressionTitle,
VectorExpressionDescriptor.Mode mode) {
for (ExprNodeDesc d : descs) {
- boolean ret = validateExprNodeDesc(d, mode);
+ boolean ret = validateExprNodeDesc(d, expressionTitle, mode);
if (!ret) {
return false;
}
@@ -1775,19 +1999,20 @@ public class Vectorizer implements PhysicalPlanResolver {
return new Pair<Boolean, Boolean>(true, outputIsPrimitive);
}
- private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
+ private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, String expressionTitle,
+ VectorExpressionDescriptor.Mode mode) {
if (desc instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc;
// Currently, we do not support vectorized virtual columns (see HIVE-5570).
if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) {
- LOG.info("Cannot vectorize virtual column " + c.getColumn());
+ setExpressionIssue(expressionTitle, "Virtual columns not supported (" + c.getColumn() + ")");
return false;
}
}
String typeName = desc.getTypeInfo().getTypeName();
boolean ret = validateDataType(typeName, mode);
if (!ret) {
- LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
+ setExpressionIssue(expressionTitle, "Data type " + typeName + " of " + desc.toString() + " not supported");
return false;
}
boolean isInExpression = false;
@@ -1795,7 +2020,7 @@ public class Vectorizer implements PhysicalPlanResolver {
ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
boolean r = validateGenericUdf(d);
if (!r) {
- LOG.info("Cannot vectorize UDF " + d);
+ setExpressionIssue(expressionTitle, "UDF " + d + " not supported");
return false;
}
GenericUDF genericUDF = d.getGenericUDF();
@@ -1806,14 +2031,14 @@ public class Vectorizer implements PhysicalPlanResolver {
&& desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) {
// Don't restrict child expressions for projection.
// Always use loose FILTER mode.
- if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) {
+ if (!validateStructInExpression(desc, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) {
return false;
}
} else {
for (ExprNodeDesc d : desc.getChildren()) {
// Don't restrict child expressions for projection.
// Always use loose FILTER mode.
- if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) {
+ if (!validateExprNodeDescRecursive(d, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) {
return false;
}
}
@@ -1823,7 +2048,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private boolean validateStructInExpression(ExprNodeDesc desc,
- VectorExpressionDescriptor.Mode mode) {
+ String expressionTitle, VectorExpressionDescriptor.Mode mode) {
for (ExprNodeDesc d : desc.getChildren()) {
TypeInfo typeInfo = d.getTypeInfo();
if (typeInfo.getCategory() != Category.STRUCT) {
@@ -1839,7 +2064,8 @@ public class Vectorizer implements PhysicalPlanResolver {
TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
Category category = fieldTypeInfo.getCategory();
if (category != Category.PRIMITIVE) {
- LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+ setExpressionIssue(expressionTitle,
+ "Cannot vectorize struct field " + fieldNames.get(f)
+ " of type " + fieldTypeInfo.getTypeName());
return false;
}
@@ -1852,7 +2078,8 @@ public class Vectorizer implements PhysicalPlanResolver {
if (inConstantType != InConstantType.INT_FAMILY
&& inConstantType != InConstantType.FLOAT_FAMILY
&& inConstantType != InConstantType.STRING_FAMILY) {
- LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+ setExpressionIssue(expressionTitle,
+ "Cannot vectorize struct field " + fieldNames.get(f)
+ " of type " + fieldTypeInfo.getTypeName());
return false;
}
@@ -1861,31 +2088,28 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private boolean validateExprNodeDesc(ExprNodeDesc desc) {
- return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.PROJECTION);
+ private boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle) {
+ return validateExprNodeDesc(desc, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION);
}
- boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
- if (!validateExprNodeDescRecursive(desc, mode)) {
+ boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle,
+ VectorExpressionDescriptor.Mode mode) {
+ if (!validateExprNodeDescRecursive(desc, expressionTitle, mode)) {
return false;
}
try {
VectorizationContext vc = new ValidatorVectorizationContext(hiveConf);
if (vc.getVectorExpression(desc, mode) == null) {
// TODO: this cannot happen - VectorizationContext throws in such cases.
- LOG.info("getVectorExpression returned null");
+ setExpressionIssue(expressionTitle, "getVectorExpression returned null");
return false;
}
} catch (Exception e) {
if (e instanceof HiveException) {
- LOG.info(e.getMessage());
+ setExpressionIssue(expressionTitle, e.getMessage());
} else {
- if (LOG.isDebugEnabled()) {
- // Show stack trace.
- LOG.debug("Failed to vectorize", e);
- } else {
- LOG.info("Failed to vectorize", e.getMessage());
- }
+ String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e);
+ setExpressionIssue(expressionTitle, issue);
}
return false;
}
@@ -1905,9 +2129,9 @@ public class Vectorizer implements PhysicalPlanResolver {
}
}
- private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorAggrExpr) {
+ public static ObjectInspector.Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) {
ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
- return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE);
+ return outputObjInspector.getCategory();
}
private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, ProcessingMode processingMode,
@@ -1915,11 +2139,10 @@ public class Vectorizer implements PhysicalPlanResolver {
String udfName = aggDesc.getGenericUDAFName().toLowerCase();
if (!supportedAggregationUdfs.contains(udfName)) {
- LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported");
+ setExpressionIssue("Aggregation Function", "UDF " + udfName + " not supported");
return new Pair<Boolean,Boolean>(false, false);
}
- if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
- LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported");
+ if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters(), "Aggregation Function UDF " + udfName + " parameter")) {
return new Pair<Boolean,Boolean>(false, false);
}
@@ -1933,6 +2156,7 @@ public class Vectorizer implements PhysicalPlanResolver {
if (LOG.isDebugEnabled()) {
LOG.debug("Vectorization of aggreation should have succeeded ", e);
}
+ setExpressionIssue("Aggregation Function", "Vectorization of aggreation should have succeeded " + e);
return new Pair<Boolean,Boolean>(false, false);
}
if (LOG.isDebugEnabled()) {
@@ -1940,11 +2164,12 @@ public class Vectorizer implements PhysicalPlanResolver {
" vector expression " + vectorAggrExpr.toString());
}
- boolean outputIsPrimitive = validateAggregationIsPrimitive(vectorAggrExpr);
+ ObjectInspector.Category outputCategory = aggregationOutputCategory(vectorAggrExpr);
+ boolean outputIsPrimitive = (outputCategory == ObjectInspector.Category.PRIMITIVE);
if (processingMode == ProcessingMode.MERGE_PARTIAL &&
hasKeys &&
!outputIsPrimitive) {
- LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
+ setOperatorIssue("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
return new Pair<Boolean,Boolean>(false, false);
}
@@ -2012,12 +2237,12 @@ public class Vectorizer implements PhysicalPlanResolver {
if (smallTableIndices[i] < 0) {
// Negative numbers indicate a column to be (deserialize) read from the small table's
// LazyBinary value row.
- LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false");
+ setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false");
return false;
}
}
} else if (smallTableRetainSize > 0) {
- LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false");
+ setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false");
return false;
}
@@ -2026,20 +2251,21 @@ public class Vectorizer implements PhysicalPlanResolver {
}
Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op,
- VectorizationContext vContext, MapJoinDesc desc) throws HiveException {
+ VectorizationContext vContext, MapJoinDesc desc, VectorMapJoinInfo vectorMapJoinInfo)
+ throws HiveException {
Operator<? extends OperatorDesc> vectorOp = null;
Class<? extends Operator<?>> opClass = null;
- VectorMapJoinDesc.HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE;
- VectorMapJoinDesc.HashTableKind hashTableKind = HashTableKind.NONE;
- VectorMapJoinDesc.HashTableKeyType hashTableKeyType = HashTableKeyType.NONE;
+ VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc();
+
+ HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE;
+ HashTableKind hashTableKind = HashTableKind.NONE;
+ HashTableKeyType hashTableKeyType = HashTableKeyType.NONE;
+ OperatorVariation operatorVariation = OperatorVariation.NONE;
- if (HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+ if (vectorDesc.getIsFastHashTableEnabled()) {
hashTableImplementationType = HashTableImplementationType.FAST;
} else {
- // Restrict to using BytesBytesMultiHashMap via MapJoinBytesTableContainer or
- // HybridHashTableContainer.
hashTableImplementationType = HashTableImplementationType.OPTIMIZED;
}
@@ -2061,20 +2287,31 @@ public class Vectorizer implements PhysicalPlanResolver {
Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
if (bigTableKeyExprs.size() == 1) {
- String typeName = bigTableKeyExprs.get(0).getTypeString();
- LOG.info("Vectorizer vectorizeOperator map join typeName " + typeName);
- if (typeName.equals("boolean")) {
+ TypeInfo typeInfo = bigTableKeyExprs.get(0).getTypeInfo();
+ LOG.info("Vectorizer vectorizeOperator map join typeName " + typeInfo.getTypeName());
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case BOOLEAN:
hashTableKeyType = HashTableKeyType.BOOLEAN;
- } else if (typeName.equals("tinyint")) {
+ break;
+ case BYTE:
hashTableKeyType = HashTableKeyType.BYTE;
- } else if (typeName.equals("smallint")) {
+ break;
+ case SHORT:
hashTableKeyType = HashTableKeyType.SHORT;
- } else if (typeName.equals("int")) {
+ break;
+ case INT:
hashTableKeyType = HashTableKeyType.INT;
- } else if (typeName.equals("bigint") || typeName.equals("long")) {
+ break;
+ case LONG:
hashTableKeyType = HashTableKeyType.LONG;
- } else if (VectorizationContext.isStringFamily(typeName)) {
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
hashTableKeyType = HashTableKeyType.STRING;
+ default:
+ // Stay with multi-key.
}
}
}
@@ -2082,16 +2319,20 @@ public class Vectorizer implements PhysicalPlanResolver {
switch (joinType) {
case JoinDesc.INNER_JOIN:
if (!isInnerBigOnly) {
+ operatorVariation = OperatorVariation.INNER;
hashTableKind = HashTableKind.HASH_MAP;
} else {
+ operatorVariation = OperatorVariation.INNER_BIG_ONLY;
hashTableKind = HashTableKind.HASH_MULTISET;
}
break;
case JoinDesc.LEFT_OUTER_JOIN:
case JoinDesc.RIGHT_OUTER_JOIN:
+ operatorVariation = OperatorVariation.OUTER;
hashTableKind = HashTableKind.HASH_MAP;
break;
case JoinDesc.LEFT_SEMI_JOIN:
+ operatorVariation = OperatorVariation.LEFT_SEMI;
hashTableKind = HashTableKind.HASH_SET;
break;
default:
@@ -2106,86 +2347,84 @@ public class Vectorizer implements PhysicalPlanResolver {
case SHORT:
case INT:
case LONG:
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- opClass = VectorMapJoinInnerLongOperator.class;
- } else {
- opClass = VectorMapJoinInnerBigOnlyLongOperator.class;
- }
+ switch (operatorVariation) {
+ case INNER:
+ opClass = VectorMapJoinInnerLongOperator.class;
break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- opClass = VectorMapJoinOuterLongOperator.class;
+ case INNER_BIG_ONLY:
+ opClass = VectorMapJoinInnerBigOnlyLongOperator.class;
break;
- case JoinDesc.LEFT_SEMI_JOIN:
+ case LEFT_SEMI:
opClass = VectorMapJoinLeftSemiLongOperator.class;
break;
+ case OUTER:
+ opClass = VectorMapJoinOuterLongOperator.class;
+ break;
default:
- throw new HiveException("Unknown join type " + joinType);
+ throw new HiveException("Unknown operator variation " + operatorVariation);
}
break;
case STRING:
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- opClass = VectorMapJoinInnerStringOperator.class;
- } else {
- opClass = VectorMapJoinInnerBigOnlyStringOperator.class;
- }
+ switch (operatorVariation) {
+ case INNER:
+ opClass = VectorMapJoinInnerStringOperator.class;
break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- opClass = VectorMapJoinOuterStringOperator.class;
+ case INNER_BIG_ONLY:
+ opClass = VectorMapJoinInnerBigOnlyStringOperator.class;
break;
- case JoinDesc.LEFT_SEMI_JOIN:
+ case LEFT_SEMI:
opClass = VectorMapJoinLeftSemiStringOperator.class;
break;
+ case OUTER:
+ opClass = VectorMapJoinOuterStringOperator.class;
+ break;
default:
- throw new HiveException("Unknown join type " + joinType);
+ throw new HiveException("Unknown operator variation " + operatorVariation);
}
break;
case MULTI_KEY:
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- opClass = VectorMapJoinInnerMultiKeyOperator.class;
- } else {
- opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class;
- }
+ switch (operatorVariation) {
+ case INNER:
+ opClass = VectorMapJoinInnerMultiKeyOperator.class;
break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- opClass = VectorMapJoinOuterMultiKeyOperator.class;
+ case INNER_BIG_ONLY:
+ opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class;
break;
- case JoinDesc.LEFT_SEMI_JOIN:
+ case LEFT_SEMI:
opClass = VectorMapJoinLeftSemiMultiKeyOperator.class;
break;
+ case OUTER:
+ opClass = VectorMapJoinOuterMultiKeyOperator.class;
+ break;
default:
- throw new HiveException("Unknown join type " + joinType);
+ throw new HiveException("Unknown operator variation " + operatorVariation);
}
break;
+ default:
+ throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name());
}
- vectorOp = OperatorFactory.getVectorOperator(
- opClass, op.getCompilationOpContext(), op.getConf(), vContext);
- LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName());
-
boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MINMAX_ENABLED);
- VectorMapJoinDesc vectorDesc = desc.getVectorDesc();
vectorDesc.setHashTableImplementationType(hashTableImplementationType);
vectorDesc.setHashTableKind(hashTableKind);
vectorDesc.setHashTableKeyType(hashTableKeyType);
+ vectorDesc.setOperatorVariation(operatorVariation);
vectorDesc.setMinMaxEnabled(minMaxEnabled);
+ vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo);
+
+ vectorOp = OperatorFactory.getVectorOperator(
+ opClass, op.getCompilationOpContext(), op.getConf(), vContext);
+ LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName());
+
return vectorOp;
}
- private boolean onExpressionHasNullSafes(MapJoinDesc desc) {
+ public static boolean onExpressionHasNullSafes(MapJoinDesc desc) {
boolean[] nullSafes = desc.getNullSafes();
if (nullSafes == null) {
- return false;
+ return false;
}
for (boolean nullSafe : nullSafes) {
if (nullSafe) {
@@ -2196,53 +2435,372 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc,
- boolean isTez) {
+ boolean isTezOrSpark, VectorizationContext vContext, VectorMapJoinInfo vectorMapJoinInfo)
+ throws HiveException {
+
+ Preconditions.checkState(op instanceof MapJoinOperator);
+
+ // Allocate a VectorReduceSinkDesc initially with implementation type NONE so EXPLAIN
+ // can report this operator was vectorized, but not native. And, the conditions.
+ VectorMapJoinDesc vectorDesc = new VectorMapJoinDesc();
+ desc.setVectorDesc(vectorDesc);
+
+ boolean isVectorizationMapJoinNativeEnabled = HiveConf.getBoolVar(hiveConf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED);
+
+ String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+
+ boolean oneMapJoinCondition = (desc.getConds().length == 1);
+
+ boolean hasNullSafes = onExpressionHasNullSafes(desc);
+
+ byte posBigTable = (byte) desc.getPosBigTable();
+
+ // Since we want to display all the met and not met conditions in EXPLAIN, we determine all
+ // information first....
+
+ List<ExprNodeDesc> keyDesc = desc.getKeys().get(posBigTable);
+ VectorExpression[] allBigTableKeyExpressions = vContext.getVectorExpressions(keyDesc);
+ final int allBigTableKeyExpressionsLength = allBigTableKeyExpressions.length;
+ boolean isEmptyKey = (allBigTableKeyExpressionsLength == 0);
- boolean specialize = false;
+ boolean supportsKeyTypes = true; // Assume.
+ HashSet<String> notSupportedKeyTypes = new HashSet<String>();
- if (op instanceof MapJoinOperator &&
+ // Since a key expression can be a calculation and the key will go into a scratch column,
+ // we need the mapping and type information.
+ int[] bigTableKeyColumnMap = new int[allBigTableKeyExpressionsLength];
+ String[] bigTableKeyColumnNames = new String[allBigTableKeyExpressionsLength];
+ TypeInfo[] bigTableKeyTypeInfos = new TypeInfo[allBigTableKeyExpressionsLength];
+ ArrayList<VectorExpression> bigTableKeyExpressionsList = new ArrayList<VectorExpression>();
+ VectorExpression[] bigTableKeyExpressions;
+ for (int i = 0; i < allBigTableKeyExpressionsLength; i++) {
+ VectorExpression ve = allBigTableKeyExpressions[i];
+ if (!IdentityExpression.isColumnOnly(ve)) {
+ bigTableKeyExpressionsList.add(ve);
+ }
+ bigTableKeyColumnMap[i] = ve.getOutputColumn();
+
+ ExprNodeDesc exprNode = keyDesc.get(i);
+ bigTableKeyColumnNames[i] = exprNode.toString();
+
+ TypeInfo typeInfo = exprNode.getTypeInfo();
+ // Verify we handle the key column types for an optimized table. This is the effectively the
+ // same check used in HashTableLoader.
+ if (!MapJoinKey.isSupportedField(typeInfo)) {
+ supportsKeyTypes = false;
+ Category category = typeInfo.getCategory();
+ notSupportedKeyTypes.add(
+ (category != Category.PRIMITIVE ? category.toString() :
+ ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory().toString()));
+ }
+ bigTableKeyTypeInfos[i] = typeInfo;
+ }
+ if (bigTableKeyExpressionsList.size() == 0) {
+ bigTableKeyExpressions = null;
+ } else {
+ bigTableKeyExpressions = bigTableKeyExpressionsList.toArray(new VectorExpression[0]);
+ }
+
+ List<ExprNodeDesc> bigTableExprs = desc.getExprs().get(posBigTable);
+ VectorExpression[] allBigTableValueExpressions = vContext.getVectorExpressions(bigTableExprs);
+
+ boolean isFastHashTableEnabled =
HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED)) {
+ HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED);
+ vectorDesc.setIsFastHashTableEnabled(isFastHashTableEnabled);
- // Currently, only under Tez and non-N-way joins.
- if (isTez && desc.getConds().length == 1 && !onExpressionHasNullSafes(desc)) {
+ // Especially since LLAP is prone to turn it off in the MapJoinDesc in later
+ // physical optimizer stages...
+ boolean isHybridHashJoin = desc.isHybridHashJoin();
+ vectorDesc.setIsHybridHashJoin(isHybridHashJoin);
- // Ok, all basic restrictions satisfied so far...
- specialize = true;
+ /*
+ * Populate vectorMapJoininfo.
+ */
- if (!HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+ /*
+ * Similarly, we need a mapping since a value expression can be a calculation and the value
+ * will go into a scratch column.
+ */
+ int[] bigTableValueColumnMap = new int[allBigTableValueExpressions.length];
+ String[] bigTableValueColumnNames = new String[allBigTableValueExpressions.length];
+ TypeInfo[] bigTableValueTypeInfos = new TypeInfo[allBigTableValueExpressions.length];
+ ArrayList<VectorExpression> bigTableValueExpressionsList = new ArrayList<VectorExpression>();
+ VectorExpression[] bigTableValueExpressions;
+ for (int i = 0; i < bigTableValueColumnMap.length; i++) {
+ VectorExpression ve = allBigTableValueExpressions[i];
+ if (!IdentityExpression.isColumnOnly(ve)) {
+ bigTableValueExpressionsList.add(ve);
+ }
+ bigTableValueColumnMap[i] = ve.getOutputColumn();
- // We are using the optimized hash table we have further
- // restrictions (using optimized and key type).
+ ExprNodeDesc exprNode = bigTableExprs.get(i);
+ bigTableValueColumnNames[i] = exprNode.toString();
+ bigTableValueTypeInfos[i] = exprNode.getTypeInfo();
+ }
+ if (bigTableValueExpressionsList.size() == 0) {
+ bigTableValueExpressions = null;
+ } else {
+ bigTableValueExpressions = bigTableValueExpressionsList.toArray(new VectorExpression[0]);
+ }
- if (!HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE)) {
- specialize = false;
- } else {
- byte posBigTable = (byte) desc.getPosBigTable();
- Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
- List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
- for (ExprNodeDesc exprNodeDesc : bigTableKeyExprs) {
- String typeName = exprNodeDesc.getTypeString();
- if (!MapJoinKey.isSupportedField(typeName)) {
- specialize = false;
- break;
- }
+ vectorMapJoinInfo.setBigTableKeyColumnMap(bigTableKeyColumnMap);
+ vectorMapJoinInfo.setBigTableKeyColumnNames(bigTableKeyColumnNames);
+ vectorMapJoinInfo.setBigTableKeyTypeInfos(bigTableKeyTypeInfos);
+ vectorMapJoinInfo.setBigTableKeyExpressions(bigTableKeyExpressions);
+
+ vectorMapJoinInfo.setBigTableValueColumnMap(bigTableValueColumnMap);
+ vectorMapJoinInfo.setBigTableValueColumnNames(bigTableValueColumnNames);
+ vectorMapJoinInfo.setBigTableValueTypeInfos(bigTableValueTypeInfos);
+ vectorMapJoinInfo.setBigTableValueExpressions(bigTableValueExpressions);
+
+ /*
+ * Small table information.
+ */
+ VectorColumnOutputMapping bigTableRetainedMapping =
+ new VectorColumnOutputMapping("Big Table Retained Mapping");
+
+ VectorColumnOutputMapping bigTableOuterKeyMapping =
+ new VectorColumnOutputMapping("Big Table Outer Key Mapping");
+
+ // The order of the fields in the LazyBinary small table value must be used, so
+ // we use the source ordering flavor for the mapping.
+ VectorColumnSourceMapping smallTableMapping =
+ new VectorColumnSourceMapping("Small Table Mapping");
+
+ Byte[] order = desc.getTagOrder();
+ Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
+ boolean isOuterJoin = !desc.getNoOuterJoin();
+
+ /*
+ * Gather up big and small table output result information from the MapJoinDesc.
+ */
+ List<Integer> bigTableRetainList = desc.getRetainList().get(posBigTable);
+ int bigTableRetainSize = bigTableRetainList.size();
+
+ int[] smallTableIndices;
+ int smallTableIndicesSize;
+ List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable);
+ if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) {
+ smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable);
+ smallTableIndicesSize = smallTableIndices.length;
+ } else {
+ smallTableIndices = null;
+ smallTableIndicesSize = 0;
+ }
+
+ List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable);
+ int smallTableRetainSize = smallTableRetainList.size();
+
+ int smallTableResultSize = 0;
+ if (smallTableIndicesSize > 0) {
+ smallTableResultSize = smallTableIndicesSize;
+ } else if (smallTableRetainSize > 0) {
+ smallTableResultSize = smallTableRetainSize;
+ }
+
+ /*
+ * Determine the big table retained mapping first so we can optimize out (with
+ * projection) copying inner join big table keys in the subsequent small table results section.
+ */
+
+ // We use a mapping object here so we can build the projection in any order and
+ // get the ordered by 0 to n-1 output columns at the end.
+ //
+ // Also, to avoid copying a big table key into the small table result area for inner joins,
+ // we reference it with the projection so there can be duplicate output columns
+ // in the projection.
+ VectorColumnSourceMapping projectionMapping = new VectorColumnSourceMapping("Projection Mapping");
+
+ int nextOutputColumn = (order[0] == posBigTable ? 0 : smallTableResultSize);
+ for (int i = 0; i < bigTableRetainSize; i++) {
+
+ // Since bigTableValueExpressions may do a calculation and produce a scratch column, we
+ // need to map to the right batch column.
+
+ int retainColumn = bigTableRetainList.get(i);
+ int batchColumnIndex = bigTableValueColumnMap[retainColumn];
+ TypeInfo typeInfo = bigTableValueTypeInfos[i];
+
+ // With this map we project the big table batch to make it look like an output batch.
+ projectionMapping.add(nextOutputColumn, batchColumnIndex, typeInfo);
+
+ // Collect columns we copy from the big table batch to the overflow batch.
+ if (!bigTableRetainedMapping.containsOutputColumn(batchColumnIndex)) {
+ // Tolerate repeated use of a big table column.
+ bigTableRetainedMapping.add(batchColumnIndex, batchColumnIndex, typeInfo);
+ }
+
+ nextOutputColumn++;
+ }
+
+ /*
+ * Now determine the small table results.
+ */
+ boolean smallTableExprVectorizes = true;
+
+ int firstSmallTableOutputColumn;
+ firstSmallTableOutputColumn = (order[0] == posBigTable ? bigTableRetainSize : 0);
+ int smallTableOutputCount = 0;
+ nextOutputColumn = firstSmallTableOutputColumn;
+
+ // Small table indices has more information (i.e. keys) than retain, so use it if it exists...
+ String[] bigTableRetainedNames;
+ if (smallTableIndicesSize > 0) {
+ smallTableOutputCount = smallTableIndicesSize;
+ bigTableRetainedNames = new String[smallTableOutputCount];
+
+ for (int i = 0; i < smallTableIndicesSize; i++) {
+ if (smallTableIndices[i] >= 0) {
+
+ // Zero and above numbers indicate a big table key is needed for
+ // small table result "area".
+
+ int keyIndex = smallTableIndices[i];
+
+ // Since bigTableKeyExpressions may do a calculation and produce a scratch column, we
+ // need to map the right column.
+ int batchKeyColumn = bigTableKeyColumnMap[keyIndex];
+ bigTableRetainedNames[i] = bigTableKeyColumnNames[keyIndex];
+ TypeInfo typeInfo = bigTableKeyTypeInfos[keyIndex];
+
+ if (!isOuterJoin) {
+
+ // Optimize inner join keys of small table results.
+
+ // Project the big table key into the small table result "area".
+ projectionMapping.add(nextOutputColumn, batchKeyColumn, typeInfo);
+
+ if (!bigTableRetainedMapping.containsOutputColumn(batchKeyColumn)) {
+ // If necessary, copy the big table key into the overflow batch's small table
+ // result "area".
+ bigTableRetainedMapping.add(batchKeyColumn, batchKeyColumn, typeInfo);
}
+ } else {
+
+ // For outer joins, since the small table key can be null when there is no match,
+ // we must have a physical (scratch) column for those keys. We cannot use the
+ // projection optimization used by inner joins above.
+
+ int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName());
+ projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);
+
+ bigTableRetainedMapping.add(batchKeyColumn, scratchColumn, typeInfo);
+
+ bigTableOuterKeyMapping.add(batchKeyColumn, scratchColumn, typeInfo);
}
} else {
- // With the fast hash table implementation, we currently do not support
- // Hybrid Grace Hash Join.
+ // Negative numbers indicate a column to be (deserialize) read from the small table's
+ // LazyBinary value row.
+ int smallTableValueIndex = -smallTableIndices[i] - 1;
- if (desc.isHybridHashJoin()) {
- specialize = false;
+ ExprNodeDesc smallTableExprNode = smallTableExprs.get(i);
+ if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) {
+ clearNotVectorizedReason();
+ smallTableExprVectorizes = false;
}
+
+ bigTableRetainedNames[i] = smallTableExprNode.toString();
+
+ TypeInfo typeInfo = smallTableExprNode.getTypeInfo();
+
+ // Make a new big table scratch column for the small table value.
+ int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName());
+ projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);
+
+ smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo);
+ }
+ nextOutputColumn++;
+ }
+ } else if (smallTableRetainSize > 0) {
+ smallTableOutputCount = smallTableRetainSize;
+ bigTableRetainedNames = new String[smallTableOutputCount];
+
+ // Only small table values appear in join output result.
+
+ for (int i = 0; i < smallTableRetainSize; i++) {
+ int smallTableValueIndex = smallTableRetainList.get(i);
+
+ ExprNodeDesc smallTableExprNode = smallTableExprs.get(i);
+ if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) {
+ clearNotVectorizedReason();
+ smallTableExprVectorizes = false;
}
+
+ bigTableRetainedNames[i] = smallTableExprNode.toString();
+
+ // Make a new big table scratch column for the small table value.
+ TypeInfo typeInfo = smallTableExprNode.getTypeInfo();
+ int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName());
+
+ projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);
+
+ smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo);
+ nextOutputColumn++;
}
+ } else {
+ bigTableRetainedNames = new String[0];
+ }
+
+ // Remember the condition variables for EXPLAIN regardless.
+ vectorDesc.setIsVectorizationMapJoinNativeEnabled(isVectorizationMapJoinNativeEnabled);
+ vectorDesc.setEngine(engine);
+ vectorDesc.setOneMapJoinCondition(oneMapJoinCondition);
+ vectorDesc.setHasNullSafes(hasNullSafes);
+ vectorDesc.setSupportsKeyTypes(supportsKeyTypes);
+ if (!supportsKeyTypes) {
+ vectorDesc.setNotSupportedKeyTypes(new ArrayList(notSupportedKeyTypes));
+ }
+ vectorDesc.setIsEmptyKey(isEmptyKey);
+ vectorDesc.setSmallTableExprVectorizes(smallTableExprVectorizes);
+
+ // Currently, only under Tez and non-N-way joi
<TRUNCATED>