You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:45 UTC
[14/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981
ORC Schema Evolution Issues (Vectorized, ACID,
and Non-Vectorized) (Matt McCline,
reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema
evolution (Prasanth Jayachan
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig
deleted file mode 100644
index 0d4c1d8..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig
+++ /dev/null
@@ -1,1744 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer.physical;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Stack;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.exec.*;
-import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerLongOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerMultiKeyOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerStringOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiLongOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiMultiKeyOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiStringOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
-import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
-import org.apache.hadoop.hive.ql.lib.Dispatcher;
-import org.apache.hadoop.hive.ql.lib.GraphWalker;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker;
-import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
-import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
-import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
-import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
-import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.udf.UDFAcos;
-import org.apache.hadoop.hive.ql.udf.UDFAsin;
-import org.apache.hadoop.hive.ql.udf.UDFAtan;
-import org.apache.hadoop.hive.ql.udf.UDFBin;
-import org.apache.hadoop.hive.ql.udf.UDFConv;
-import org.apache.hadoop.hive.ql.udf.UDFCos;
-import org.apache.hadoop.hive.ql.udf.UDFDayOfMonth;
-import org.apache.hadoop.hive.ql.udf.UDFDegrees;
-import org.apache.hadoop.hive.ql.udf.UDFExp;
-import org.apache.hadoop.hive.ql.udf.UDFHex;
-import org.apache.hadoop.hive.ql.udf.UDFHour;
-import org.apache.hadoop.hive.ql.udf.UDFLength;
-import org.apache.hadoop.hive.ql.udf.UDFLike;
-import org.apache.hadoop.hive.ql.udf.UDFLn;
-import org.apache.hadoop.hive.ql.udf.UDFLog;
-import org.apache.hadoop.hive.ql.udf.UDFLog10;
-import org.apache.hadoop.hive.ql.udf.UDFLog2;
-import org.apache.hadoop.hive.ql.udf.UDFMinute;
-import org.apache.hadoop.hive.ql.udf.UDFMonth;
-import org.apache.hadoop.hive.ql.udf.UDFRadians;
-import org.apache.hadoop.hive.ql.udf.UDFRand;
-import org.apache.hadoop.hive.ql.udf.UDFSecond;
-import org.apache.hadoop.hive.ql.udf.UDFSign;
-import org.apache.hadoop.hive.ql.udf.UDFSin;
-import org.apache.hadoop.hive.ql.udf.UDFSqrt;
-import org.apache.hadoop.hive.ql.udf.UDFSubstr;
-import org.apache.hadoop.hive.ql.udf.UDFTan;
-import org.apache.hadoop.hive.ql.udf.UDFToBoolean;
-import org.apache.hadoop.hive.ql.udf.UDFToByte;
-import org.apache.hadoop.hive.ql.udf.UDFToDouble;
-import org.apache.hadoop.hive.ql.udf.UDFToFloat;
-import org.apache.hadoop.hive.ql.udf.UDFToInteger;
-import org.apache.hadoop.hive.ql.udf.UDFToLong;
-import org.apache.hadoop.hive.ql.udf.UDFToShort;
-import org.apache.hadoop.hive.ql.udf.UDFToString;
-import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
-import org.apache.hadoop.hive.ql.udf.UDFYear;
-import org.apache.hadoop.hive.ql.udf.generic.*;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-
-public class Vectorizer implements PhysicalPlanResolver {
-
- protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class);
-
- Pattern supportedDataTypesPattern;
- List<Task<? extends Serializable>> vectorizableTasks =
- new ArrayList<Task<? extends Serializable>>();
- Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
-
- Set<String> supportedAggregationUdfs = new HashSet<String>();
-
- private HiveConf hiveConf;
-
- public Vectorizer() {
-
- StringBuilder patternBuilder = new StringBuilder();
- patternBuilder.append("int");
- patternBuilder.append("|smallint");
- patternBuilder.append("|tinyint");
- patternBuilder.append("|bigint");
- patternBuilder.append("|integer");
- patternBuilder.append("|long");
- patternBuilder.append("|short");
- patternBuilder.append("|timestamp");
- patternBuilder.append("|" + serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME);
- patternBuilder.append("|" + serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME);
- patternBuilder.append("|boolean");
- patternBuilder.append("|binary");
- patternBuilder.append("|string");
- patternBuilder.append("|byte");
- patternBuilder.append("|float");
- patternBuilder.append("|double");
- patternBuilder.append("|date");
- patternBuilder.append("|void");
-
- // Decimal types can be specified with different precision and scales e.g. decimal(10,5),
- // as opposed to other data types which can be represented by constant strings.
- // The regex matches only the "decimal" prefix of the type.
- patternBuilder.append("|decimal.*");
-
- // CHAR and VARCHAR types can be specified with maximum length.
- patternBuilder.append("|char.*");
- patternBuilder.append("|varchar.*");
-
- supportedDataTypesPattern = Pattern.compile(patternBuilder.toString());
-
- supportedGenericUDFs.add(GenericUDFOPPlus.class);
- supportedGenericUDFs.add(GenericUDFOPMinus.class);
- supportedGenericUDFs.add(GenericUDFOPMultiply.class);
- supportedGenericUDFs.add(GenericUDFOPDivide.class);
- supportedGenericUDFs.add(GenericUDFOPMod.class);
- supportedGenericUDFs.add(GenericUDFOPNegative.class);
- supportedGenericUDFs.add(GenericUDFOPPositive.class);
-
- supportedGenericUDFs.add(GenericUDFOPEqualOrLessThan.class);
- supportedGenericUDFs.add(GenericUDFOPEqualOrGreaterThan.class);
- supportedGenericUDFs.add(GenericUDFOPGreaterThan.class);
- supportedGenericUDFs.add(GenericUDFOPLessThan.class);
- supportedGenericUDFs.add(GenericUDFOPNot.class);
- supportedGenericUDFs.add(GenericUDFOPNotEqual.class);
- supportedGenericUDFs.add(GenericUDFOPNotNull.class);
- supportedGenericUDFs.add(GenericUDFOPNull.class);
- supportedGenericUDFs.add(GenericUDFOPOr.class);
- supportedGenericUDFs.add(GenericUDFOPAnd.class);
- supportedGenericUDFs.add(GenericUDFOPEqual.class);
- supportedGenericUDFs.add(UDFLength.class);
-
- supportedGenericUDFs.add(UDFYear.class);
- supportedGenericUDFs.add(UDFMonth.class);
- supportedGenericUDFs.add(UDFDayOfMonth.class);
- supportedGenericUDFs.add(UDFHour.class);
- supportedGenericUDFs.add(UDFMinute.class);
- supportedGenericUDFs.add(UDFSecond.class);
- supportedGenericUDFs.add(UDFWeekOfYear.class);
- supportedGenericUDFs.add(GenericUDFToUnixTimeStamp.class);
-
- supportedGenericUDFs.add(GenericUDFDateAdd.class);
- supportedGenericUDFs.add(GenericUDFDateSub.class);
- supportedGenericUDFs.add(GenericUDFDate.class);
- supportedGenericUDFs.add(GenericUDFDateDiff.class);
-
- supportedGenericUDFs.add(UDFLike.class);
- supportedGenericUDFs.add(GenericUDFRegExp.class);
- supportedGenericUDFs.add(UDFSubstr.class);
- supportedGenericUDFs.add(GenericUDFLTrim.class);
- supportedGenericUDFs.add(GenericUDFRTrim.class);
- supportedGenericUDFs.add(GenericUDFTrim.class);
-
- supportedGenericUDFs.add(UDFSin.class);
- supportedGenericUDFs.add(UDFCos.class);
- supportedGenericUDFs.add(UDFTan.class);
- supportedGenericUDFs.add(UDFAsin.class);
- supportedGenericUDFs.add(UDFAcos.class);
- supportedGenericUDFs.add(UDFAtan.class);
- supportedGenericUDFs.add(UDFDegrees.class);
- supportedGenericUDFs.add(UDFRadians.class);
- supportedGenericUDFs.add(GenericUDFFloor.class);
- supportedGenericUDFs.add(GenericUDFCeil.class);
- supportedGenericUDFs.add(UDFExp.class);
- supportedGenericUDFs.add(UDFLn.class);
- supportedGenericUDFs.add(UDFLog2.class);
- supportedGenericUDFs.add(UDFLog10.class);
- supportedGenericUDFs.add(UDFLog.class);
- supportedGenericUDFs.add(GenericUDFPower.class);
- supportedGenericUDFs.add(GenericUDFRound.class);
- supportedGenericUDFs.add(GenericUDFBRound.class);
- supportedGenericUDFs.add(GenericUDFPosMod.class);
- supportedGenericUDFs.add(UDFSqrt.class);
- supportedGenericUDFs.add(UDFSign.class);
- supportedGenericUDFs.add(UDFRand.class);
- supportedGenericUDFs.add(UDFBin.class);
- supportedGenericUDFs.add(UDFHex.class);
- supportedGenericUDFs.add(UDFConv.class);
-
- supportedGenericUDFs.add(GenericUDFLower.class);
- supportedGenericUDFs.add(GenericUDFUpper.class);
- supportedGenericUDFs.add(GenericUDFConcat.class);
- supportedGenericUDFs.add(GenericUDFAbs.class);
- supportedGenericUDFs.add(GenericUDFBetween.class);
- supportedGenericUDFs.add(GenericUDFIn.class);
- supportedGenericUDFs.add(GenericUDFCase.class);
- supportedGenericUDFs.add(GenericUDFWhen.class);
- supportedGenericUDFs.add(GenericUDFCoalesce.class);
- supportedGenericUDFs.add(GenericUDFElt.class);
- supportedGenericUDFs.add(GenericUDFInitCap.class);
-
- // For type casts
- supportedGenericUDFs.add(UDFToLong.class);
- supportedGenericUDFs.add(UDFToInteger.class);
- supportedGenericUDFs.add(UDFToShort.class);
- supportedGenericUDFs.add(UDFToByte.class);
- supportedGenericUDFs.add(UDFToBoolean.class);
- supportedGenericUDFs.add(UDFToFloat.class);
- supportedGenericUDFs.add(UDFToDouble.class);
- supportedGenericUDFs.add(UDFToString.class);
- supportedGenericUDFs.add(GenericUDFTimestamp.class);
- supportedGenericUDFs.add(GenericUDFToDecimal.class);
- supportedGenericUDFs.add(GenericUDFToDate.class);
- supportedGenericUDFs.add(GenericUDFToChar.class);
- supportedGenericUDFs.add(GenericUDFToVarchar.class);
- supportedGenericUDFs.add(GenericUDFToIntervalYearMonth.class);
- supportedGenericUDFs.add(GenericUDFToIntervalDayTime.class);
-
- // For conditional expressions
- supportedGenericUDFs.add(GenericUDFIf.class);
-
- supportedAggregationUdfs.add("min");
- supportedAggregationUdfs.add("max");
- supportedAggregationUdfs.add("count");
- supportedAggregationUdfs.add("sum");
- supportedAggregationUdfs.add("avg");
- supportedAggregationUdfs.add("variance");
- supportedAggregationUdfs.add("var_pop");
- supportedAggregationUdfs.add("var_samp");
- supportedAggregationUdfs.add("std");
- supportedAggregationUdfs.add("stddev");
- supportedAggregationUdfs.add("stddev_pop");
- supportedAggregationUdfs.add("stddev_samp");
- }
-
- class VectorizationDispatcher implements Dispatcher {
-
- private List<String> reduceColumnNames;
- private List<TypeInfo> reduceTypeInfos;
-
- public VectorizationDispatcher(PhysicalContext physicalContext) {
- reduceColumnNames = null;
- reduceTypeInfos = null;
- }
-
- @Override
- public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
- throws SemanticException {
- Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
- if (currTask instanceof MapRedTask) {
- convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
- } else if (currTask instanceof TezTask) {
- TezWork work = ((TezTask) currTask).getWork();
- for (BaseWork w: work.getAllWork()) {
- if (w instanceof MapWork) {
- convertMapWork((MapWork) w, true);
- } else if (w instanceof ReduceWork) {
- // We are only vectorizing Reduce under Tez.
- if (HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
- convertReduceWork((ReduceWork) w, true);
- }
- }
- }
- } else if (currTask instanceof SparkTask) {
- SparkWork sparkWork = (SparkWork) currTask.getWork();
- for (BaseWork baseWork : sparkWork.getAllWork()) {
- if (baseWork instanceof MapWork) {
- convertMapWork((MapWork) baseWork, false);
- } else if (baseWork instanceof ReduceWork
- && HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
- convertReduceWork((ReduceWork) baseWork, false);
- }
- }
- }
- return null;
- }
-
- private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- boolean ret = validateMapWork(mapWork, isTez);
- if (ret) {
- vectorizeMapWork(mapWork, isTez);
- }
- }
-
- private void addMapWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
- opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
- + FileSinkOperator.getOperatorName()), np);
- opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
- + ReduceSinkOperator.getOperatorName()), np);
- }
-
- private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- LOG.info("Validating MapWork...");
-
- // Eliminate MR plans with more than one TableScanOperator.
- LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
- if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
- return false;
- }
- int tableScanCount = 0;
- for (Operator<?> op : aliasToWork.values()) {
- if (op == null) {
- LOG.warn("Map work has invalid aliases to work with. Fail validation!");
- return false;
- }
- if (op instanceof TableScanOperator) {
- tableScanCount++;
- }
- }
- if (tableScanCount > 1) {
- LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!");
- return false;
- }
-
- // Validate the input format
- for (String path : mapWork.getPathToPartitionInfo().keySet()) {
- PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
- List<Class<?>> interfaceList =
- Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
- if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
- LOG.info("Input format: " + pd.getInputFileFormatClassName()
- + ", doesn't provide vectorized input");
- return false;
- }
- }
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
- addMapWorkRules(opRules, vnp);
- Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
- GraphWalker ogw = new DefaultGraphWalker(disp);
-
- // iterator the mapper operator tree
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(mapWork.getAliasToWork().values());
- HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
- ogw.startWalking(topNodes, nodeOutput);
- for (Node n : nodeOutput.keySet()) {
- if (nodeOutput.get(n) != null) {
- if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
- return false;
- }
- }
- }
- return true;
- }
-
- private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- LOG.info("Vectorizing MapWork...");
- mapWork.setVectorMode(true);
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez);
- addMapWorkRules(opRules, vnp);
- Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
- GraphWalker ogw = new PreOrderOnceWalker(disp);
- // iterator the mapper operator tree
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(mapWork.getAliasToWork().values());
- HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
- ogw.startWalking(topNodes, nodeOutput);
-
- mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
- mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
-
- if (LOG.isDebugEnabled()) {
- debugDisplayAllMaps(mapWork);
- }
-
- return;
- }
-
- private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
- boolean ret = validateReduceWork(reduceWork);
- if (ret) {
- vectorizeReduceWork(reduceWork, isTez);
- }
- }
-
- private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
- try {
- // Check key ObjectInspector.
- ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
- if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
- return false;
- }
- StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
- List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
-
- // Tez doesn't use tagging...
- if (reduceWork.getNeedsTagging()) {
- return false;
- }
-
- // Check value ObjectInspector.
- ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
- if (valueObjectInspector == null ||
- !(valueObjectInspector instanceof StructObjectInspector)) {
- return false;
- }
- StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
- List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
-
- reduceColumnNames = new ArrayList<String>();
- reduceTypeInfos = new ArrayList<TypeInfo>();
-
- for (StructField field: keyFields) {
- reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
- reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
- }
- for (StructField field: valueFields) {
- reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
- reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
- }
- } catch (Exception e) {
- throw new SemanticException(e);
- }
- return true;
- }
-
- private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
- opRules.put(new RuleRegExp("R1", GroupByOperator.getOperatorName() + ".*"), np);
- opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
- }
-
- private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
- LOG.info("Validating ReduceWork...");
-
- // Validate input to ReduceWork.
- if (!getOnlyStructObjectInspectors(reduceWork)) {
- return false;
- }
- // Now check the reduce operator tree.
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- ReduceWorkValidationNodeProcessor vnp = new ReduceWorkValidationNodeProcessor();
- addReduceWorkRules(opRules, vnp);
- Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
- GraphWalker ogw = new DefaultGraphWalker(disp);
- // iterator the reduce operator tree
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.add(reduceWork.getReducer());
- HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
- ogw.startWalking(topNodes, nodeOutput);
- for (Node n : nodeOutput.keySet()) {
- if (nodeOutput.get(n) != null) {
- if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
- return false;
- }
- }
- }
- return true;
- }
-
- private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
- LOG.info("Vectorizing ReduceWork...");
- reduceWork.setVectorMode(true);
-
- // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as
- // expected. We need to descend down, otherwise it breaks our algorithm that determines
- // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker.
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- ReduceWorkVectorizationNodeProcessor vnp =
- new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez);
- addReduceWorkRules(opRules, vnp);
- Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
- GraphWalker ogw = new PreOrderWalker(disp);
- // iterator the reduce operator tree
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.add(reduceWork.getReducer());
- LOG.info("vectorizeReduceWork reducer Operator: " +
- reduceWork.getReducer().getName() + "...");
- HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
- ogw.startWalking(topNodes, nodeOutput);
-
- // Necessary since we are vectorizing the root operator in reduce.
- reduceWork.setReducer(vnp.getRootVectorOp());
-
- reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
- reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
-
- if (LOG.isDebugEnabled()) {
- debugDisplayAllMaps(reduceWork);
- }
- }
- }
-
- class MapWorkValidationNodeProcessor implements NodeProcessor {
-
- private final MapWork mapWork;
- private final boolean isTez;
-
- public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) {
- this.mapWork = mapWork;
- this.isTez = isTez;
- }
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- for (Node n : stack) {
- Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
- if (nonVectorizableChildOfGroupBy(op)) {
- return new Boolean(true);
- }
- boolean ret = validateMapWorkOperator(op, mapWork, isTez);
- if (!ret) {
- LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
- return new Boolean(false);
- }
- }
- return new Boolean(true);
- }
- }
-
- class ReduceWorkValidationNodeProcessor implements NodeProcessor {
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- for (Node n : stack) {
- Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
- if (nonVectorizableChildOfGroupBy(op)) {
- return new Boolean(true);
- }
- boolean ret = validateReduceWorkOperator(op);
- if (!ret) {
- LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
- return new Boolean(false);
- }
- }
- return new Boolean(true);
- }
- }
-
- // This class has common code used by both MapWorkVectorizationNodeProcessor and
- // ReduceWorkVectorizationNodeProcessor.
- class VectorizationNodeProcessor implements NodeProcessor {
-
- // The vectorization context for the Map or Reduce task.
- protected VectorizationContext taskVectorizationContext;
-
- // The input projection column type name map for the Map or Reduce task.
- protected Map<Integer, String> taskColumnTypeNameMap;
-
- VectorizationNodeProcessor() {
- taskColumnTypeNameMap = new HashMap<Integer, String>();
- }
-
- public Map<String, Integer> getVectorColumnNameMap() {
- return taskVectorizationContext.getProjectionColumnMap();
- }
-
- public Map<Integer, String> getVectorColumnTypeMap() {
- return taskColumnTypeNameMap;
- }
-
- public Map<Integer, String> getVectorScratchColumnTypeMap() {
- return taskVectorizationContext.getScratchColumnTypeMap();
- }
-
- protected final Set<Operator<? extends OperatorDesc>> opsDone =
- new HashSet<Operator<? extends OperatorDesc>>();
-
- protected final Map<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>> opToVectorOpMap =
- new HashMap<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>();
-
- public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
- Operator<? extends OperatorDesc> op) throws SemanticException {
- VectorizationContext vContext = null;
- if (stack.size() <= 1) {
- throw new SemanticException(
- String.format("Expected operator stack for operator %s to have at least 2 operators",
- op.getName()));
- }
- // Walk down the stack of operators until we found one willing to give us a context.
- // At the bottom will be the root operator, guaranteed to have a context
- int i= stack.size()-2;
- while (vContext == null) {
- if (i < 0) {
- return null;
- }
- Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
- Operator<? extends OperatorDesc> vectorOpParent = opToVectorOpMap.get(opParent);
- if (vectorOpParent != null) {
- if (vectorOpParent instanceof VectorizationContextRegion) {
- VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOpParent;
- vContext = vcRegion.getOuputVectorizationContext();
- LOG.info("walkStackToFindVectorizationContext " + vectorOpParent.getName() + " has new vectorization context " + vContext.toString());
- } else {
- LOG.info("walkStackToFindVectorizationContext " + vectorOpParent.getName() + " does not have new vectorization context");
- }
- } else {
- LOG.info("walkStackToFindVectorizationContext " + opParent.getName() + " is not vectorized");
- }
- --i;
- }
- return vContext;
- }
-
- public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
- VectorizationContext vContext, boolean isTez) throws SemanticException {
- Operator<? extends OperatorDesc> vectorOp = op;
- try {
- if (!opsDone.contains(op)) {
- vectorOp = vectorizeOperator(op, vContext, isTez);
- opsDone.add(op);
- if (vectorOp != op) {
- opToVectorOpMap.put(op, vectorOp);
- opsDone.add(vectorOp);
- }
- }
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
- return vectorOp;
- }
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- throw new SemanticException("Must be overridden");
- }
- }
-
- class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
-
- private final boolean isTez;
-
- public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
- super();
- this.isTez = isTez;
- }
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
-
- Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
-
- VectorizationContext vContext = null;
-
- if (op instanceof TableScanOperator) {
- if (taskVectorizationContext == null) {
- taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(),
- taskColumnTypeNameMap);
- }
- vContext = taskVectorizationContext;
- } else {
- LOG.info("MapWorkVectorizationNodeProcessor process going to walk the operator stack to get vectorization context for " + op.getName());
- vContext = walkStackToFindVectorizationContext(stack, op);
- if (vContext == null) {
- // No operator has "pushed" a new context -- so use the task vectorization context.
- vContext = taskVectorizationContext;
- }
- }
-
- assert vContext != null;
- LOG.info("MapWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
-
- // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
- // vectorize the operators below it.
- if (nonVectorizableChildOfGroupBy(op)) {
- // No need to vectorize
- if (!opsDone.contains(op)) {
- opsDone.add(op);
- }
- return null;
- }
-
- Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
-
- if (LOG.isDebugEnabled()) {
- if (vectorOp instanceof VectorizationContextRegion) {
- VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
- VectorizationContext vNewContext = vcRegion.getOuputVectorizationContext();
- LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " added vectorization context " + vNewContext.toString());
- }
- }
-
- return null;
- }
- }
-
- class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
-
- private final List<String> reduceColumnNames;
- private final List<TypeInfo> reduceTypeInfos;
-
- private final boolean isTez;
-
- private Operator<? extends OperatorDesc> rootVectorOp;
-
- public Operator<? extends OperatorDesc> getRootVectorOp() {
- return rootVectorOp;
- }
-
- public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
- List<TypeInfo> reduceTypeInfos, boolean isTez) {
- super();
- this.reduceColumnNames = reduceColumnNames;
- this.reduceTypeInfos = reduceTypeInfos;
- rootVectorOp = null;
- this.isTez = isTez;
- }
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
-
- Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
-
- VectorizationContext vContext = null;
-
- boolean saveRootVectorOp = false;
-
- if (op.getParentOperators().size() == 0) {
- LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
-
- vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames);
- taskVectorizationContext = vContext;
- int i = 0;
- for (TypeInfo typeInfo : reduceTypeInfos) {
- taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
- i++;
- }
- saveRootVectorOp = true;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context " + vContext.toString());
- }
- } else {
- LOG.info("ReduceWorkVectorizationNodeProcessor process going to walk the operator stack to get vectorization context for " + op.getName());
- vContext = walkStackToFindVectorizationContext(stack, op);
- if (vContext == null) {
- // If we didn't find a context among the operators, assume the top -- reduce shuffle's
- // vectorization context.
- vContext = taskVectorizationContext;
- }
- }
-
- assert vContext != null;
- LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
-
- // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
- // vectorize the operators below it.
- if (nonVectorizableChildOfGroupBy(op)) {
- // No need to vectorize
- if (!opsDone.contains(op)) {
- opsDone.add(op);
- }
- return null;
- }
-
- Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
-
- if (LOG.isDebugEnabled()) {
- if (vectorOp instanceof VectorizationContextRegion) {
- VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
- VectorizationContext vNewContext = vcRegion.getOuputVectorizationContext();
- LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " added vectorization context " + vNewContext.toString());
- }
- }
- if (saveRootVectorOp && op != vectorOp) {
- rootVectorOp = vectorOp;
- }
-
- return null;
- }
- }
-
- private static class ValidatorVectorizationContext extends VectorizationContext {
- private ValidatorVectorizationContext() {
- super("No Name");
- }
-
- @Override
- protected int getInputColumnIndex(String name) {
- return 0;
- }
-
- @Override
- protected int getInputColumnIndex(ExprNodeColumnDesc colExpr) {
- return 0;
- }
- }
-
- @Override
- public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
- hiveConf = physicalContext.getConf();
-
- boolean vectorPath = HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
- if (!vectorPath) {
- LOG.info("Vectorization is disabled");
- return physicalContext;
- }
- // create dispatcher and graph walker
- Dispatcher disp = new VectorizationDispatcher(physicalContext);
- TaskGraphWalker ogw = new TaskGraphWalker(disp);
-
- // get all the tasks nodes from root task
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(physicalContext.getRootTasks());
-
- // begin to walk through the task tree.
- ogw.startWalking(topNodes, null);
- return physicalContext;
- }
-
- boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTez) {
- boolean ret = false;
- switch (op.getType()) {
- case MAPJOIN:
- if (op instanceof MapJoinOperator) {
- ret = validateMapJoinOperator((MapJoinOperator) op);
- } else if (op instanceof SMBMapJoinOperator) {
- ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
- }
- break;
- case GROUPBY:
- ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
- break;
- case FILTER:
- ret = validateFilterOperator((FilterOperator) op);
- break;
- case SELECT:
- ret = validateSelectOperator((SelectOperator) op);
- break;
- case REDUCESINK:
- ret = validateReduceSinkOperator((ReduceSinkOperator) op);
- break;
- case TABLESCAN:
- ret = validateTableScanOperator((TableScanOperator) op, mWork);
- break;
- case FILESINK:
- case LIMIT:
- case EVENT:
- case SPARKPRUNINGSINK:
- ret = true;
- break;
- case HASHTABLESINK:
- ret = op instanceof SparkHashTableSinkOperator &&
- validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
- break;
- default:
- ret = false;
- break;
- }
- return ret;
- }
-
- boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) {
- boolean ret = false;
- switch (op.getType()) {
- case MAPJOIN:
- // Does MAPJOIN actually get planned in Reduce?
- if (op instanceof MapJoinOperator) {
- ret = validateMapJoinOperator((MapJoinOperator) op);
- } else if (op instanceof SMBMapJoinOperator) {
- ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
- }
- break;
- case GROUPBY:
- if (HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
- ret = validateGroupByOperator((GroupByOperator) op, true, true);
- } else {
- ret = false;
- }
- break;
- case FILTER:
- ret = validateFilterOperator((FilterOperator) op);
- break;
- case SELECT:
- ret = validateSelectOperator((SelectOperator) op);
- break;
- case REDUCESINK:
- ret = validateReduceSinkOperator((ReduceSinkOperator) op);
- break;
- case FILESINK:
- ret = validateFileSinkOperator((FileSinkOperator) op);
- break;
- case LIMIT:
- case EVENT:
- case SPARKPRUNINGSINK:
- ret = true;
- break;
- case HASHTABLESINK:
- ret = op instanceof SparkHashTableSinkOperator &&
- validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
- break;
- default:
- ret = false;
- break;
- }
- return ret;
- }
-
- public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
- Operator<? extends OperatorDesc> currentOp = op;
- while (currentOp.getParentOperators().size() > 0) {
- currentOp = currentOp.getParentOperators().get(0);
- if (currentOp.getType().equals(OperatorType.GROUPBY)) {
- GroupByDesc desc = (GroupByDesc)currentOp.getConf();
- boolean isVectorOutput = desc.getVectorDesc().isVectorOutput();
- if (isVectorOutput) {
- // This GROUP BY does vectorize its output.
- return false;
- }
- return true;
- }
- }
- return false;
- }
-
- private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
- SMBJoinDesc desc = op.getConf();
- // Validation is the same as for map join, since the 'small' tables are not vectorized
- return validateMapJoinDesc(desc);
- }
-
- private boolean validateTableScanOperator(TableScanOperator op, MapWork mWork) {
- TableScanDesc desc = op.getConf();
- if (desc.isGatherStats()) {
- return false;
- }
-
- String columns = "";
- String types = "";
- String partitionColumns = "";
- String partitionTypes = "";
- boolean haveInfo = false;
-
- // This over-reaches slightly, since we can have > 1 table-scan per map-work.
- // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate.
- // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later.
- LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo();
-
- // For vectorization, compare each partition information for against the others.
- // We assume the table information will be from one of the partitions, so it will
- // work to focus on the partition information and not compare against the TableScanOperator
- // columns (in the VectorizationContext)....
- for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
- PartitionDesc partDesc = entry.getValue();
- if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
- // No partition information -- we match because we would default to using the table description.
- continue;
- }
- Properties partProps = partDesc.getProperties();
- if (!haveInfo) {
- columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- haveInfo = true;
- } else {
- String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- if (!columns.equalsIgnoreCase(nextColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column names %s do not match the other column names %s",
- entry.getKey(), nextColumns, columns));
- return false;
- }
- if (!types.equalsIgnoreCase(nextTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column types %s do not match the other column types %s",
- entry.getKey(), nextTypes, types));
- return false;
- }
- if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its partition column names %s do not match the other partition column names %s",
- entry.getKey(), nextPartitionColumns, partitionColumns));
- return false;
- }
- if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its partition column types %s do not match the other partition column types %s",
- entry.getKey(), nextPartitionTypes, partitionTypes));
- return false;
- }
- }
- }
- return true;
- }
-
- private boolean validateMapJoinOperator(MapJoinOperator op) {
- MapJoinDesc desc = op.getConf();
- return validateMapJoinDesc(desc);
- }
-
- private boolean validateMapJoinDesc(MapJoinDesc desc) {
- byte posBigTable = (byte) desc.getPosBigTable();
- List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable);
- if (!validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER)) {
- LOG.info("Cannot vectorize map work filter expression");
- return false;
- }
- List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable);
- if (!validateExprNodeDesc(keyExprs)) {
- LOG.info("Cannot vectorize map work key expression");
- return false;
- }
- List<ExprNodeDesc> valueExprs = desc.getExprs().get(posBigTable);
- if (!validateExprNodeDesc(valueExprs)) {
- LOG.info("Cannot vectorize map work value expression");
- return false;
- }
- return true;
- }
-
- private boolean validateSparkHashTableSinkOperator(SparkHashTableSinkOperator op) {
- SparkHashTableSinkDesc desc = op.getConf();
- byte tag = desc.getTag();
- // it's essentially a MapJoinDesc
- List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag);
- List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag);
- List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag);
- return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) &&
- validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs);
- }
-
- private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
- List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
- List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
- List<ExprNodeDesc> valueDesc = op.getConf().getValueCols();
- return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) &&
- validateExprNodeDesc(valueDesc);
- }
-
- private boolean validateSelectOperator(SelectOperator op) {
- List<ExprNodeDesc> descList = op.getConf().getColList();
- for (ExprNodeDesc desc : descList) {
- boolean ret = validateExprNodeDesc(desc);
- if (!ret) {
- LOG.info("Cannot vectorize select expression: " + desc.toString());
- return false;
- }
- }
- return true;
- }
-
- private boolean validateFilterOperator(FilterOperator op) {
- ExprNodeDesc desc = op.getConf().getPredicate();
- return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
- }
-
- private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
- GroupByDesc desc = op.getConf();
- VectorGroupByDesc vectorDesc = desc.getVectorDesc();
-
- if (desc.isGroupingSetsPresent()) {
- LOG.info("Grouping sets not supported in vector mode");
- return false;
- }
- if (desc.pruneGroupingSetId()) {
- LOG.info("Pruning grouping set id not supported in vector mode");
- return false;
- }
- boolean ret = validateExprNodeDesc(desc.getKeys());
- if (!ret) {
- LOG.info("Cannot vectorize groupby key expression");
- return false;
- }
-
- if (!isReduce) {
-
- // MapWork
-
- ret = validateHashAggregationDesc(desc.getAggregators());
- if (!ret) {
- return false;
- }
- } else {
-
- // ReduceWork
-
- boolean isComplete = desc.getMode() == GroupByDesc.Mode.COMPLETE;
- if (desc.getMode() != GroupByDesc.Mode.HASH) {
-
- // Reduce Merge-Partial GROUP BY.
-
- // A merge-partial GROUP BY is fed by grouping by keys from reduce-shuffle. It is the
- // first (or root) operator for its reduce task.
- // TODO: Technically, we should also handle FINAL, PARTIAL1, PARTIAL2 and PARTIALS
- // that are not hash or complete, but aren't merge-partial, somehow.
-
- if (desc.isDistinct()) {
- LOG.info("Vectorized Reduce MergePartial GROUP BY does not support DISTINCT");
- return false;
- }
-
- boolean hasKeys = (desc.getKeys().size() > 0);
-
- // Do we support merge-partial aggregation AND the output is primitive?
- ret = validateReduceMergePartialAggregationDesc(desc.getAggregators(), hasKeys);
- if (!ret) {
- return false;
- }
-
- if (hasKeys) {
- if (op.getParentOperators().size() > 0 && !isComplete) {
- LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle a key group when it is fed by reduce-shuffle");
- return false;
- }
-
- LOG.info("Vectorized Reduce MergePartial GROUP BY will process key groups");
-
- // Primitive output validation above means we can output VectorizedRowBatch to the
- // children operators.
- vectorDesc.setVectorOutput(true);
- } else {
- LOG.info("Vectorized Reduce MergePartial GROUP BY will do global aggregation");
- }
- if (!isComplete) {
- vectorDesc.setIsReduceMergePartial(true);
- } else {
- vectorDesc.setIsReduceStreaming(true);
- }
- } else {
-
- // Reduce Hash GROUP BY or global aggregation.
-
- ret = validateHashAggregationDesc(desc.getAggregators());
- if (!ret) {
- return false;
- }
- }
- }
-
- return true;
- }
-
- private boolean validateFileSinkOperator(FileSinkOperator op) {
- return true;
- }
-
- private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
- return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
- }
-
- private boolean validateExprNodeDesc(List<ExprNodeDesc> descs,
- VectorExpressionDescriptor.Mode mode) {
- for (ExprNodeDesc d : descs) {
- boolean ret = validateExprNodeDesc(d, mode);
- if (!ret) {
- return false;
- }
- }
- return true;
- }
-
-
- private boolean validateHashAggregationDesc(List<AggregationDesc> descs) {
- return validateAggregationDesc(descs, /* isReduceMergePartial */ false, false);
- }
-
- private boolean validateReduceMergePartialAggregationDesc(List<AggregationDesc> descs, boolean hasKeys) {
- return validateAggregationDesc(descs, /* isReduceMergePartial */ true, hasKeys);
- }
-
- private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduceMergePartial, boolean hasKeys) {
- for (AggregationDesc d : descs) {
- boolean ret = validateAggregationDesc(d, isReduceMergePartial, hasKeys);
- if (!ret) {
- return false;
- }
- }
- return true;
- }
-
- private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
- if (desc instanceof ExprNodeColumnDesc) {
- ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc;
- // Currently, we do not support vectorized virtual columns (see HIVE-5570).
- if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) {
- LOG.info("Cannot vectorize virtual column " + c.getColumn());
- return false;
- }
- }
- String typeName = desc.getTypeInfo().getTypeName();
- boolean ret = validateDataType(typeName, mode);
- if (!ret) {
- LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
- return false;
- }
- if (desc instanceof ExprNodeGenericFuncDesc) {
- ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
- boolean r = validateGenericUdf(d);
- if (!r) {
- LOG.info("Cannot vectorize UDF " + d);
- return false;
- }
- }
- if (desc.getChildren() != null) {
- for (ExprNodeDesc d: desc.getChildren()) {
- // Don't restrict child expressions for projection. Always use looser FILTER mode.
- boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
- if (!r) {
- return false;
- }
- }
- }
- return true;
- }
-
- private boolean validateExprNodeDesc(ExprNodeDesc desc) {
- return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.PROJECTION);
- }
-
- boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
- if (!validateExprNodeDescRecursive(desc, mode)) {
- return false;
- }
- try {
- VectorizationContext vc = new ValidatorVectorizationContext();
- if (vc.getVectorExpression(desc, mode) == null) {
- // TODO: this cannot happen - VectorizationContext throws in such cases.
- LOG.info("getVectorExpression returned null");
- return false;
- }
- } catch (Exception e) {
- LOG.info("Failed to vectorize", e);
- return false;
- }
- return true;
- }
-
- private boolean validateGenericUdf(ExprNodeGenericFuncDesc genericUDFExpr) {
- if (VectorizationContext.isCustomUDF(genericUDFExpr)) {
- return true;
- }
- GenericUDF genericUDF = genericUDFExpr.getGenericUDF();
- if (genericUDF instanceof GenericUDFBridge) {
- Class<? extends UDF> udf = ((GenericUDFBridge) genericUDF).getUdfClass();
- return supportedGenericUDFs.contains(udf);
- } else {
- return supportedGenericUDFs.contains(genericUDF.getClass());
- }
- }
-
- private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorAggrExpr) {
- ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
- return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE);
- }
-
- private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduceMergePartial,
- boolean hasKeys) {
-
- String udfName = aggDesc.getGenericUDAFName().toLowerCase();
- if (!supportedAggregationUdfs.contains(udfName)) {
- LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported");
- return false;
- }
- if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
- LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported");
- return false;
- }
-
- // See if we can vectorize the aggregation.
- VectorizationContext vc = new ValidatorVectorizationContext();
- VectorAggregateExpression vectorAggrExpr;
- try {
- vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduceMergePartial);
- } catch (Exception e) {
- // We should have already attempted to vectorize in validateAggregationDesc.
- LOG.info("Vectorization of aggreation should have succeeded ", e);
- return false;
- }
-
- if (isReduceMergePartial && hasKeys && !validateAggregationIsPrimitive(vectorAggrExpr)) {
- LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
- return false;
- }
-
- return true;
- }
-
- private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
- type = type.toLowerCase();
- boolean result = supportedDataTypesPattern.matcher(type).matches();
- if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) {
- return false;
- }
- return result;
- }
-
- private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName,
- Map<Integer, String> typeNameMap) {
-
- VectorizationContext vContext = new VectorizationContext(contextName);
-
- // Add all non-virtual columns to make a vectorization context for
- // the TableScan operator.
- int i = 0;
- for (ColumnInfo c : rowSchema.getSignature()) {
- // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
- if (!isVirtualColumn(c)) {
- vContext.addInitialColumn(c.getInternalName());
- typeNameMap.put(i, c.getTypeName());
- i++;
- }
- }
- vContext.finishedAddingInitialColumns();
-
- return vContext;
- }
-
- private void fixupParentChildOperators(Operator<? extends OperatorDesc> op,
- Operator<? extends OperatorDesc> vectorOp) {
- if (op.getParentOperators() != null) {
- vectorOp.setParentOperators(op.getParentOperators());
- for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
- p.replaceChild(op, vectorOp);
- }
- }
- if (op.getChildOperators() != null) {
- vectorOp.setChildOperators(op.getChildOperators());
- for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
- c.replaceParent(op, vectorOp);
- }
- }
- }
-
- private boolean isBigTableOnlyResults(MapJoinDesc desc) {
- Byte[] order = desc.getTagOrder();
- byte posBigTable = (byte) desc.getPosBigTable();
- Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
-
- int[] smallTableIndices;
- int smallTableIndicesSize;
- if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) {
- smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable);
- LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices " + Arrays.toString(smallTableIndices));
- smallTableIndicesSize = smallTableIndices.length;
- } else {
- smallTableIndices = null;
- LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices EMPTY");
- smallTableIndicesSize = 0;
- }
-
- List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable);
- LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainList " + smallTableRetainList);
- int smallTableRetainSize = smallTableRetainList.size();
-
- if (smallTableIndicesSize > 0) {
- // Small table indices has priority over retain.
- for (int i = 0; i < smallTableIndicesSize; i++) {
- if (smallTableIndices[i] < 0) {
- // Negative numbers indicate a column to be (deserialize) read from the small table's
- // LazyBinary value row.
- LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false");
- return false;
- }
- }
- } else if (smallTableRetainSize > 0) {
- LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false");
- return false;
- }
-
- LOG.info("Vectorizer isBigTableOnlyResults returning true");
- return true;
- }
-
- Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op,
- VectorizationContext vContext, MapJoinDesc desc) throws HiveException {
- Operator<? extends OperatorDesc> vectorOp = null;
- Class<? extends Operator<?>> opClass = null;
-
- VectorMapJoinDesc.HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE;
- VectorMapJoinDesc.HashTableKind hashTableKind = HashTableKind.NONE;
- VectorMapJoinDesc.HashTableKeyType hashTableKeyType = HashTableKeyType.NONE;
-
- if (HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
- hashTableImplementationType = HashTableImplementationType.FAST;
- } else {
- // Restrict to using BytesBytesMultiHashMap via MapJoinBytesTableContainer or
- // HybridHashTableContainer.
- hashTableImplementationType = HashTableImplementationType.OPTIMIZED;
- }
-
- int joinType = desc.getConds()[0].getType();
-
- boolean isInnerBigOnly = false;
- if (joinType == JoinDesc.INNER_JOIN && isBigTableOnlyResults(desc)) {
- isInnerBigOnly = true;
- }
-
- // By default, we can always use the multi-key class.
- hashTableKeyType = HashTableKeyType.MULTI_KEY;
-
- if (!HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MULTIKEY_ONLY_ENABLED)) {
-
- // Look for single column optimization.
- byte posBigTable = (byte) desc.getPosBigTable();
- Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
- List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
- if (bigTableKeyExprs.size() == 1) {
- String typeName = bigTableKeyExprs.get(0).getTypeString();
- LOG.info("Vectorizer vectorizeOperator map join typeName " + typeName);
- if (typeName.equals("boolean")) {
- hashTableKeyType = HashTableKeyType.BOOLEAN;
- } else if (typeName.equals("tinyint")) {
- hashTableKeyType = HashTableKeyType.BYTE;
- } else if (typeName.equals("smallint")) {
- hashTableKeyType = HashTableKeyType.SHORT;
- } else if (typeName.equals("int")) {
- hashTableKeyType = HashTableKeyType.INT;
- } else if (typeName.equals("bigint") || typeName.equals("long")) {
- hashTableKeyType = HashTableKeyType.LONG;
- } else if (VectorizationContext.isStringFamily(typeName)) {
- hashTableKeyType = HashTableKeyType.STRING;
- }
- }
- }
-
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- hashTableKind = HashTableKind.HASH_MAP;
- } else {
- hashTableKind = HashTableKind.HASH_MULTISET;
- }
- break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- hashTableKind = HashTableKind.HASH_MAP;
- break;
- case JoinDesc.LEFT_SEMI_JOIN:
- hashTableKind = HashTableKind.HASH_SET;
- break;
- default:
- throw new HiveException("Unknown join type " + joinType);
- }
-
- LOG.info("Vectorizer vectorizeOperator map join hashTableKind " + hashTableKind.name() + " hashTableKeyType " + hashTableKeyType.name());
-
- switch (hashTableKeyType) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- opClass = VectorMapJoinInnerLongOperator.class;
- } else {
- opClass = VectorMapJoinInnerBigOnlyLongOperator.class;
- }
- break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- opClass = VectorMapJoinOuterLongOperator.class;
- break;
- case JoinDesc.LEFT_SEMI_JOIN:
- opClass = VectorMapJoinLeftSemiLongOperator.class;
- break;
- default:
- throw new HiveException("Unknown join type " + joinType);
- }
- break;
- case STRING:
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- opClass = VectorMapJoinInnerStringOperator.class;
- } else {
- opClass = VectorMapJoinInnerBigOnlyStringOperator.class;
- }
- break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- opClass = VectorMapJoinOuterStringOperator.class;
- break;
- case JoinDesc.LEFT_SEMI_JOIN:
- opClass = VectorMapJoinLeftSemiStringOperator.class;
- break;
- default:
- throw new HiveException("Unknown join type " + joinType);
- }
- break;
- case MULTI_KEY:
- switch (joinType) {
- case JoinDesc.INNER_JOIN:
- if (!isInnerBigOnly) {
- opClass = VectorMapJoinInnerMultiKeyOperator.class;
- } else {
- opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class;
- }
- break;
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- opClass = VectorMapJoinOuterMultiKeyOperator.class;
- break;
- case JoinDesc.LEFT_SEMI_JOIN:
- opClass = VectorMapJoinLeftSemiMultiKeyOperator.class;
- break;
- default:
- throw new HiveException("Unknown join type " + joinType);
- }
- break;
- }
-
- vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext);
- LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName());
-
- boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MINMAX_ENABLED);
-
- VectorMapJoinDesc vectorDesc = desc.getVectorDesc();
- vectorDesc.setHashTableImplementationType(hashTableImplementationType);
- vectorDesc.setHashTableKind(hashTableKind);
- vectorDesc.setHashTableKeyType(hashTableKeyType);
- vectorDesc.setMinMaxEnabled(minMaxEnabled);
- return vectorOp;
- }
-
- private boolean onExpressionHasNullSafes(MapJoinDesc desc) {
- boolean[] nullSafes = desc.getNullSafes();
- for (boolean nullSafe : nullSafes) {
- if (nullSafe) {
- return true;
- }
- }
- return false;
- }
-
- private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc,
- boolean isTez) {
-
- boolean specialize = false;
-
- if (op instanceof MapJoinOperator &&
- HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED)) {
-
- // Currently, only under Tez and non-N-way joins.
- if (isTez && desc.getConds().length == 1 && !onExpressionHasNullSafes(desc)) {
-
- // Ok, all basic restrictions satisfied so far...
- specialize = true;
-
- if (!HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
-
- // We are using the optimized hash table we have further
- // restrictions (using optimized and key type).
-
- if (!HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE)) {
- specialize = false;
- } else {
- byte posBigTable = (byte) desc.getPosBigTable();
- Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
- List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
- for (ExprNodeDesc exprNodeDesc : bigTableKeyExprs) {
- String typeName = exprNodeDesc.getTypeString();
- if (!MapJoinKey.isSupportedField(typeName)) {
- specialize = false;
- break;
- }
- }
- }
- } else {
-
- // With the fast hash table implementation, we currently do not support
- // Hybrid Grace Hash Join.
-
- if (HiveConf.getBoolVar(hiveConf,
- HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) {
- specialize = false;
- }
- }
- }
- }
- return specialize;
- }
-
- Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
- VectorizationContext vContext, boolean isTez) throws HiveException {
- Operator<? extends OperatorDesc> vectorOp = null;
-
- switch (op.getType()) {
- case MAPJOIN:
- {
- MapJoinDesc desc = (MapJoinDesc) op.getConf();
- boolean specialize = canSpecializeMapJoin(op, desc, isTez);
-
- if (!specialize) {
-
- Class<? extends Operator<?>> opClass = null;
- if (op instanceof MapJoinOperator) {
-
- // *NON-NATIVE* vector map differences for LEFT OUTER JOIN and Filtered...
-
- List<ExprNodeDesc> bigTableFilters = desc.getFilters().get((byte) desc.getPosBigTable());
- boolean isOuterAndFiltered = (!desc.isNoOuterJoin() && bigTableFilters.size() > 0);
- if (!isOuterAndFiltered) {
- opClass = VectorMapJoinOperator.class;
- } else {
- opClass = VectorMapJoinOuterFilteredOperator.class;
- }
- } else if (op instanceof SMBMapJoinOperator) {
- opClass = VectorSMBMapJoinOperator.class;
- }
-
- vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext);
-
- } else {
-
- // TEMPORARY Until Native Vector Map Join with Hybrid passes tests...
- // HiveConf.setBoolVar(physicalContext.getConf(),
- // HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN, false);
-
- vectorOp = specializeMapJoinOperator(op, vContext, desc);
- }
- }
- break;
- case GROUPBY:
- case FILTER:
- case SELECT:
- case FILESINK:
- case REDUCESINK:
- case LIMIT:
- case EXTRACT:
- case EVENT:
- case HASHTABLESINK:
- vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
- break;
- default:
- vectorOp = op;
- break;
- }
-
- LOG.info("vectorizeOperator " + (vectorOp == null ? "NULL" : vectorOp.getClass().getName()));
- LOG.info("vectorizeOperator " + (vectorOp == null || vectorOp.getConf() == null ? "NULL" : vectorOp.getConf().getClass().getName()));
-
- if (vectorOp != op) {
- fixupParentChildOperators(op, vectorOp);
- ((AbstractOperatorDesc) vectorOp.getConf()).setVectorMode(true);
- }
- return vectorOp;
- }
-
- private boolean isVirtualColumn(ColumnInfo column) {
-
- // Not using method column.getIsVirtualCol() because partitioning columns are also
- // treated as virtual columns in ColumnInfo.
- if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(column.getInternalName())) {
- return true;
- }
- return false;
- }
-
- public void debugDisplayAllMaps(BaseWork work) {
-
- Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
- Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
- Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap();
-
- LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
- LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
- LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index ed896e4..727f842 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -28,8 +28,10 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
@@ -61,9 +63,7 @@ public abstract class BaseWork extends AbstractOperatorDesc {
// Vectorization.
- protected Map<String, Integer> vectorColumnNameMap;
- protected Map<Integer, String> vectorColumnTypeMap;
- protected Map<Integer, String> vectorScratchColumnTypeMap;
+ protected VectorizedRowBatchCtx vectorizedRowBatchCtx;
public void setGatheringStats(boolean gatherStats) {
this.gatheringStats = gatherStats;
@@ -145,29 +145,17 @@ public abstract class BaseWork extends AbstractOperatorDesc {
return returnSet;
}
- public Map<String, Integer> getVectorColumnNameMap() {
- return vectorColumnNameMap;
- }
-
- public void setVectorColumnNameMap(Map<String, Integer> vectorColumnNameMap) {
- this.vectorColumnNameMap = vectorColumnNameMap;
- }
+ // -----------------------------------------------------------------------------------------------
- public Map<Integer, String> getVectorColumnTypeMap() {
- return vectorColumnTypeMap;
+ public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
+ return vectorizedRowBatchCtx;
}
- public void setVectorColumnTypeMap(Map<Integer, String> vectorColumnTypeMap) {
- this.vectorColumnTypeMap = vectorColumnTypeMap;
+ public void setVectorizedRowBatchCtx(VectorizedRowBatchCtx vectorizedRowBatchCtx) {
+ this.vectorizedRowBatchCtx = vectorizedRowBatchCtx;
}
- public Map<Integer, String> getVectorScratchColumnTypeMap() {
- return vectorScratchColumnTypeMap;
- }
-
- public void setVectorScratchColumnTypeMap(Map<Integer, String> vectorScratchColumnTypeMap) {
- this.vectorScratchColumnTypeMap = vectorScratchColumnTypeMap;
- }
+ // -----------------------------------------------------------------------------------------------
/**
* @return the mapredLocalWork
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 864301c..b032349 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -68,11 +68,13 @@ public class PartitionDesc implements Serializable, Cloneable {
private String baseFileName;
+ private VectorPartitionDesc vectorPartitionDesc;
+
public void setBaseFileName(String baseFileName) {
this.baseFileName = baseFileName;
}
- public PartitionDesc() {
+ public PartitionDesc() {
}
public PartitionDesc(final TableDesc table, final LinkedHashMap<String, String> partSpec) {
@@ -271,6 +273,9 @@ public class PartitionDesc implements Serializable, Cloneable {
ret.partSpec = new java.util.LinkedHashMap<String, String>();
ret.partSpec.putAll(partSpec);
}
+ if (vectorPartitionDesc != null) {
+ ret.vectorPartitionDesc = vectorPartitionDesc.clone();
+ }
return ret;
}
@@ -300,4 +305,12 @@ public class PartitionDesc implements Serializable, Cloneable {
public void intern(Interner<TableDesc> interner) {
this.tableDesc = interner.intern(tableDesc);
}
+
+ public void setVectorPartitionDesc(VectorPartitionDesc vectorPartitionDesc) {
+ this.vectorPartitionDesc = vectorPartitionDesc;
+ }
+
+ public VectorPartitionDesc getVectorPartitionDesc() {
+ return vectorPartitionDesc;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionConversion.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionConversion.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionConversion.java
new file mode 100644
index 0000000..8fe298d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionConversion.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * PartitionConversion.
+ *
+ */
+public class VectorPartitionConversion {
+
+ private static long serialVersionUID = 1L;
+
+ private boolean validConversion;
+ private boolean[] resultConversionFlags;
+
+ private TypeInfo invalidFromTypeInfo;
+ private TypeInfo invalidToTypeInfo;
+
+ public boolean getValidConversion() {
+ return validConversion;
+ }
+
+ public boolean[] getResultConversionFlags() {
+ return resultConversionFlags;
+ }
+
+ public TypeInfo getInvalidFromTypeInfo() {
+ return invalidFromTypeInfo;
+ }
+
+ public TypeInfo getInvalidToTypeInfo() {
+ return invalidToTypeInfo;
+ }
+
+ // Currently, we only support these no-precision-loss or promotion data type conversions:
+ // //
+ // Short -> Int IMPLICIT WITH VECTORIZATION
+ // Short -> BigInt IMPLICIT WITH VECTORIZATION
+ // Int --> BigInt IMPLICIT WITH VECTORIZATION
+ //
+ // CONSIDER ADDING:
+ // Float -> Double IMPLICIT WITH VECTORIZATION
+ // (Char | VarChar) -> String IMPLICIT WITH VECTORIZATION
+ //
+ private static HashMap<PrimitiveCategory, PrimitiveCategory[]> validFromPrimitiveMap =
+ new HashMap<PrimitiveCategory, PrimitiveCategory[]>();
+ static {
+ validFromPrimitiveMap.put(
+ PrimitiveCategory.SHORT,
+ new PrimitiveCategory[] { PrimitiveCategory.INT, PrimitiveCategory.LONG });
+ validFromPrimitiveMap.put(
+ PrimitiveCategory.INT,
+ new PrimitiveCategory[] { PrimitiveCategory.LONG });
+ }
+
+ private boolean validateOne(TypeInfo fromTypeInfo, TypeInfo toTypeInfo) {
+
+ if (fromTypeInfo.equals(toTypeInfo)) {
+ return false;
+ }
+
+ if (fromTypeInfo.getCategory() == Category.PRIMITIVE &&
+ toTypeInfo.getCategory() == Category.PRIMITIVE) {
+
+ PrimitiveCategory fromPrimitiveCategory = ((PrimitiveTypeInfo) fromTypeInfo).getPrimitiveCategory();
+ PrimitiveCategory toPrimitiveCategory = ((PrimitiveTypeInfo) toTypeInfo).getPrimitiveCategory();
+
+ PrimitiveCategory[] toPrimitiveCategories =
+ validFromPrimitiveMap.get(fromPrimitiveCategory);
+ if (toPrimitiveCategories == null ||
+ !ArrayUtils.contains(toPrimitiveCategories, toPrimitiveCategory)) {
+ invalidFromTypeInfo = fromTypeInfo;
+ invalidToTypeInfo = toTypeInfo;
+
+ // Tell caller a bad one was found.
+ validConversion = false;
+ return false;
+ }
+ } else {
+ // Ignore checking complex types. Assume they will not be included in the query.
+ }
+
+ return true;
+ }
+
+ public void validateConversion(List<TypeInfo> fromTypeInfoList,
+ List<TypeInfo> toTypeInfoList) {
+
+ final int columnCount = fromTypeInfoList.size();
+ resultConversionFlags = new boolean[columnCount];
+
+ // The method validateOne will turn this off when invalid conversion is found.
+ validConversion = true;
+
+ boolean atLeastOneConversion = false;
+ for (int i = 0; i < columnCount; i++) {
+ TypeInfo fromTypeInfo = fromTypeInfoList.get(i);
+ TypeInfo toTypeInfo = toTypeInfoList.get(i);
+
+ resultConversionFlags[i] = validateOne(fromTypeInfo, toTypeInfo);
+ if (!validConversion) {
+ return;
+ }
+ }
+
+ if (atLeastOneConversion) {
+ // Leave resultConversionFlags set.
+ } else {
+ resultConversionFlags = null;
+ }
+ }
+
+ public void validateConversion(TypeInfo[] fromTypeInfos, TypeInfo[] toTypeInfos) {
+
+ final int columnCount = fromTypeInfos.length;
+ resultConversionFlags = new boolean[columnCount];
+
+ // The method validateOne will turn this off when invalid conversion is found.
+ validConversion = true;
+
+ boolean atLeastOneConversion = false;
+ for (int i = 0; i < columnCount; i++) {
+ TypeInfo fromTypeInfo = fromTypeInfos[i];
+ TypeInfo toTypeInfo = toTypeInfos[i];
+
+ resultConversionFlags[i] = validateOne(fromTypeInfo, toTypeInfo);
+ if (!validConversion) {
+ return;
+ }
+ if (resultConversionFlags[i]) {
+ atLeastOneConversion = true;
+ }
+ }
+
+ if (atLeastOneConversion) {
+ // Leave resultConversionFlags set.
+ } else {
+ resultConversionFlags = null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
new file mode 100644
index 0000000..45151f2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * VectorMapDesc.
+ *
+ * Extra vector information just for the PartitionDesc.
+ *
+ */
+public class VectorPartitionDesc {
+
+ private static long serialVersionUID = 1L;
+
+ // Data Type Conversion Needed?
+ //
+ // VECTORIZED_INPUT_FILE_FORMAT:
+ // No data type conversion check? Assume ALTER TABLE prevented conversions that
+ // VectorizedInputFileFormat cannot handle...
+ //
+
+ public static enum VectorMapOperatorReadType {
+ NONE,
+ VECTORIZED_INPUT_FILE_FORMAT
+ }
+
+
+ private final VectorMapOperatorReadType vectorMapOperatorReadType;
+
+ private final boolean needsDataTypeConversionCheck;
+
+ private boolean[] conversionFlags;
+
+ private TypeInfo[] typeInfos;
+
+ private VectorPartitionDesc(VectorMapOperatorReadType vectorMapOperatorReadType,
+ boolean needsDataTypeConversionCheck) {
+ this.vectorMapOperatorReadType = vectorMapOperatorReadType;
+ this.needsDataTypeConversionCheck = needsDataTypeConversionCheck;
+
+ conversionFlags = null;
+ typeInfos = null;
+ }
+
+ public static VectorPartitionDesc createVectorizedInputFileFormat() {
+ return new VectorPartitionDesc(VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT, true);
+ }
+
+
+ @Override
+ public VectorPartitionDesc clone() {
+ VectorPartitionDesc result =
+ new VectorPartitionDesc(vectorMapOperatorReadType,
+ needsDataTypeConversionCheck);
+ result.conversionFlags =
+ (conversionFlags == null ? null :
+ Arrays.copyOf(conversionFlags, conversionFlags.length));
+ result.typeInfos = Arrays.copyOf(typeInfos, typeInfos.length);
+ return result;
+ }
+
+ public VectorMapOperatorReadType getVectorMapOperatorReadType() {
+ return vectorMapOperatorReadType;
+ }
+
+ public boolean getNeedsDataTypeConversionCheck() {
+ return needsDataTypeConversionCheck;
+ }
+
+ public void setConversionFlags(boolean[] conversionFlags) {
+ this.conversionFlags = conversionFlags;
+ }
+
+ public boolean[] getConversionFlags() {
+ return conversionFlags;
+ }
+
+ public TypeInfo[] getTypeInfos() {
+ return typeInfos;
+ }
+
+ public void setTypeInfos(List<TypeInfo> typeInfoList) {
+ typeInfos = typeInfoList.toArray(new TypeInfo[0]);
+ }
+
+ public int getNonPartColumnCount() {
+ return typeInfos.length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java
index 0f8712e..c076e6c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java
@@ -24,6 +24,7 @@ import java.util.Random;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import junit.framework.TestCase;
@@ -50,13 +51,13 @@ public class TestVectorRowObject extends TestCase {
void testVectorRowObject(int caseNum, Random r) throws HiveException {
- Map<Integer, String> emptyScratchMap = new HashMap<Integer, String>();
+ String[] emptyScratchTypeNames = new String[0];
RandomRowObjectSource source = new RandomRowObjectSource();
source.init(r);
VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
- batchContext.init(emptyScratchMap, source.rowStructObjectInspector());
+ batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames);
VectorizedRowBatch batch = batchContext.createVectorizedRowBatch();
VectorAssignRowSameBatch vectorAssignRow = new VectorAssignRowSameBatch();