You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/03/28 06:53:13 UTC
svn commit: r1582613 [2/4] - in /hive/branches/branch-0.13:
common/src/java/org/apache/hadoop/hive/conf/ itests/qtest/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/io/...
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class TezBucketJoinProcCtx extends BucketJoinProcCtx {
+ // determines if we need to use custom edge or one-to-one edge
+ boolean isSubQuery = false;
+ int numBuckets = -1;
+
+ public TezBucketJoinProcCtx(HiveConf conf) {
+ super(conf);
+ }
+
+ public void setIsSubQuery (boolean isSubQuery) {
+ this.isSubQuery = isSubQuery;
+ }
+
+ public boolean isSubQuery () {
+ return isSubQuery;
+ }
+
+ public void setNumBuckets(int numBuckets) {
+ this.numBuckets = numBuckets;
+ }
+
+ public Integer getNumBuckets() {
+ return numBuckets;
+ }
+}
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+public class AnnotateOpTraitsProcCtx implements NodeProcessorCtx {
+
+ ParseContext parseContext;
+ HiveConf conf;
+
+ public AnnotateOpTraitsProcCtx(ParseContext parseContext) {
+ this.setParseContext(parseContext);
+ if(parseContext != null) {
+ this.setConf(parseContext.getConf());
+ } else {
+ this.setConf(null);
+ }
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public void setConf(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ public ParseContext getParseContext() {
+ return parseContext;
+ }
+
+ public void setParseContext(ParseContext parseContext) {
+ this.parseContext = parseContext;
+ }
+
+}
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,78 @@
+package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MuxOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.OpTraitsRulesProcFactory;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/*
+ * This class annotates each operator with its traits. The OpTraits class
+ * specifies the traits that are populated for each operator.
+ */
+public class AnnotateWithOpTraits implements Transform {
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ AnnotateOpTraitsProcCtx annotateCtx = new AnnotateOpTraitsProcCtx(pctx);
+
+ // create a walker which walks the tree in a DFS manner while maintaining the
+ // operator stack. The dispatcher generates the plan from the operator tree
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getTableScanRule());
+ opRules.put(new RuleRegExp("RS", ReduceSinkOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getReduceSinkRule());
+ opRules.put(new RuleRegExp("JOIN", JoinOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getJoinRule());
+ opRules.put(new RuleRegExp("MAPJOIN", MapJoinOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getMultiParentRule());
+ opRules.put(new RuleRegExp("SMB", SMBMapJoinOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getMultiParentRule());
+ opRules.put(new RuleRegExp("MUX", MuxOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getMultiParentRule());
+ opRules.put(new RuleRegExp("DEMUX", DemuxOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getMultiParentRule());
+ opRules.put(new RuleRegExp("UNION", UnionOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getMultiParentRule());
+ opRules.put(new RuleRegExp("GBY", GroupByOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getGroupByRule());
+ opRules.put(new RuleRegExp("SEL", SelectOperator.getOperatorName() + "%"),
+ OpTraitsRulesProcFactory.getSelectRule());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(OpTraitsRulesProcFactory.getDefaultRule(), opRules,
+ annotateCtx);
+ GraphWalker ogw = new PreOrderWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ return pctx;
+ }
+
+}
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.AbstractBucketJoinProc;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OpTraits;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/*
+ * This class populates the following operator traits for the entire operator tree:
+ * 1. Bucketing columns.
+ * 2. Table
+ * 3. Pruned partitions
+ *
+ * Bucketing columns refer to not to the bucketing columns from the table object but instead
+ * to the dynamic 'bucketing' done by operators such as reduce sinks and group-bys.
+ * All the operators have a translation from their input names to the output names corresponding
+ * to the bucketing column. The colExprMap that is a part of every operator is used in this
+ * transformation.
+ *
+ * The table object is used for the base-case in map-reduce when deciding to perform a bucket
+ * map join. This object is used in the BucketMapJoinProc to find if number of files for the
+ * table correspond to the number of buckets specified in the meta data.
+ *
+ * The pruned partition information has the same purpose as the table object at the moment.
+ *
+ * The traits of sorted-ness etc. can be populated as well for future optimizations to make use of.
+ */
+
+public class OpTraitsRulesProcFactory {
+
+ public static class DefaultRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ @SuppressWarnings("unchecked")
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)nd;
+ op.setOpTraits(op.getParentOperators().get(0).getOpTraits());
+ return null;
+ }
+
+ }
+
+ /*
+ * Reduce sink operator is the de-facto operator
+ * for determining keyCols (emit keys of a map phase)
+ */
+ public static class ReduceSinkRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ ReduceSinkOperator rs = (ReduceSinkOperator)nd;
+ List<String> bucketCols = new ArrayList<String>();
+ if (rs.getColumnExprMap() != null) {
+ for (ExprNodeDesc exprDesc : rs.getConf().getKeyCols()) {
+ for (Entry<String, ExprNodeDesc> entry : rs.getColumnExprMap().entrySet()) {
+ if (exprDesc.isSame(entry.getValue())) {
+ bucketCols.add(entry.getKey());
+ }
+ }
+ }
+ }
+
+ List<List<String>> listBucketCols = new ArrayList<List<String>>();
+ listBucketCols.add(bucketCols);
+ OpTraits opTraits = new OpTraits(listBucketCols, -1);
+ rs.setOpTraits(opTraits);
+ return null;
+ }
+ }
+
+ /*
+ * Table scan has the table object and pruned partitions that has information such as
+ * bucketing, sorting, etc. that is used later for optimization.
+ */
+ public static class TableScanRule implements NodeProcessor {
+
+ public boolean checkBucketedTable(Table tbl,
+ ParseContext pGraphContext,
+ PrunedPartitionList prunedParts) throws SemanticException {
+
+ if (tbl.isPartitioned()) {
+ List<Partition> partitions = prunedParts.getNotDeniedPartns();
+ // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
+ if (!partitions.isEmpty()) {
+ for (Partition p : partitions) {
+ List<String> fileNames =
+ AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext);
+ // The number of files for the table should be same as number of buckets.
+ int bucketCount = p.getBucketCount();
+
+ if (fileNames.size() != 0 && fileNames.size() != bucketCount) {
+ return false;
+ }
+ }
+ }
+ } else {
+
+ List<String> fileNames =
+ AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext);
+ Integer num = new Integer(tbl.getNumBuckets());
+
+ // The number of files for the table should be same as number of buckets.
+ if (fileNames.size() != 0 && fileNames.size() != num) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ TableScanOperator ts = (TableScanOperator)nd;
+ AnnotateOpTraitsProcCtx opTraitsCtx = (AnnotateOpTraitsProcCtx)procCtx;
+ Table table = opTraitsCtx.getParseContext().getTopToTable().get(ts);
+ PrunedPartitionList prunedPartList = null;
+ try {
+ prunedPartList =
+ opTraitsCtx.getParseContext().getPrunedPartitions(ts.getConf().getAlias(), ts);
+ } catch (HiveException e) {
+ prunedPartList = null;
+ }
+ boolean bucketMapJoinConvertible = checkBucketedTable(table,
+ opTraitsCtx.getParseContext(), prunedPartList);
+ List<List<String>>bucketCols = new ArrayList<List<String>>();
+ int numBuckets = -1;
+ if (bucketMapJoinConvertible) {
+ bucketCols.add(table.getBucketCols());
+ numBuckets = table.getNumBuckets();
+ }
+ OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+ ts.setOpTraits(opTraits);
+ return null;
+ }
+ }
+
+ /*
+ * Group-by re-orders the keys emitted hence, the keyCols would change.
+ */
+ public static class GroupByRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ GroupByOperator gbyOp = (GroupByOperator)nd;
+ List<String> gbyKeys = new ArrayList<String>();
+ for (ExprNodeDesc exprDesc : gbyOp.getConf().getKeys()) {
+ for (Entry<String, ExprNodeDesc> entry : gbyOp.getColumnExprMap().entrySet()) {
+ if (exprDesc.isSame(entry.getValue())) {
+ gbyKeys.add(entry.getKey());
+ }
+ }
+ }
+
+ List<List<String>> listBucketCols = new ArrayList<List<String>>();
+ listBucketCols.add(gbyKeys);
+ OpTraits opTraits = new OpTraits(listBucketCols, -1);
+ gbyOp.setOpTraits(opTraits);
+ return null;
+ }
+ }
+
+ public static class SelectRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ SelectOperator selOp = (SelectOperator)nd;
+ List<List<String>> parentBucketColNames =
+ selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+ List<List<String>> listBucketCols = new ArrayList<List<String>>();
+ if (selOp.getColumnExprMap() != null) {
+ if (parentBucketColNames != null) {
+ for (List<String> colNames : parentBucketColNames) {
+ List<String> bucketColNames = new ArrayList<String>();
+ for (String colName : colNames) {
+ for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
+ if (entry.getValue() instanceof ExprNodeColumnDesc) {
+ if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+ bucketColNames.add(entry.getKey());
+ }
+ }
+ }
+ }
+ listBucketCols.add(bucketColNames);
+ }
+ }
+ }
+
+ int numBuckets = -1;
+ if (selOp.getParentOperators().get(0).getOpTraits() != null) {
+ numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
+ }
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+ selOp.setOpTraits(opTraits);
+ return null;
+ }
+ }
+
+ public static class JoinRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ JoinOperator joinOp = (JoinOperator)nd;
+ List<List<String>> bucketColsList = new ArrayList<List<String>>();
+ byte pos = 0;
+ for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ if (!(parentOp instanceof ReduceSinkOperator)) {
+ // can be mux operator
+ break;
+ }
+ ReduceSinkOperator rsOp = (ReduceSinkOperator)parentOp;
+ if (rsOp.getOpTraits() == null) {
+ ReduceSinkRule rsRule = new ReduceSinkRule();
+ rsRule.process(rsOp, stack, procCtx, nodeOutputs);
+ }
+ bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+ pos++;
+ }
+
+ joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+ return null;
+ }
+
+ private List<String> getOutputColNames(JoinOperator joinOp,
+ ReduceSinkOperator rs, byte pos) {
+ List<List<String>> parentBucketColNames =
+ rs.getOpTraits().getBucketColNames();
+
+ if (parentBucketColNames != null) {
+ List<String> bucketColNames = new ArrayList<String>();
+
+ // guaranteed that there is only 1 list within this list because
+ // a reduce sink always brings down the bucketing cols to a single list.
+ // may not be true with correlation operators (mux-demux)
+ List<String> colNames = parentBucketColNames.get(0);
+ for (String colName : colNames) {
+ for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
+ if (exprNode instanceof ExprNodeColumnDesc) {
+ if(((ExprNodeColumnDesc)(exprNode)).getColumn().equals(colName)) {
+ for (Entry<String, ExprNodeDesc> entry : joinOp.getColumnExprMap().entrySet()) {
+ if (entry.getValue().isSame(exprNode)) {
+ bucketColNames.add(entry.getKey());
+ // we have found the colName
+ break;
+ }
+ }
+ } else {
+ // continue on to the next exprNode to find a match
+ continue;
+ }
+ // we have found the colName. No need to search more exprNodes.
+ break;
+ }
+ }
+ }
+
+ return bucketColNames;
+ }
+
+ // no col names in parent
+ return null;
+ }
+ }
+
+ /*
+ * When we have operators that have multiple parents, it is not
+ * clear which parent's traits we need to propagate forward.
+ */
+ public static class MultiParentRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ OpTraits opTraits = new OpTraits(null, -1);
+ @SuppressWarnings("unchecked")
+ Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
+ operator.setOpTraits(opTraits);
+ return null;
+ }
+ }
+
+ public static NodeProcessor getTableScanRule() {
+ return new TableScanRule();
+ }
+
+ public static NodeProcessor getReduceSinkRule() {
+ return new ReduceSinkRule();
+ }
+
+ public static NodeProcessor getSelectRule() {
+ return new SelectRule();
+ }
+
+ public static NodeProcessor getDefaultRule() {
+ return new DefaultRule();
+ }
+
+ public static NodeProcessor getMultiParentRule() {
+ return new MultiParentRule();
+ }
+
+ public static NodeProcessor getGroupByRule() {
+ return new GroupByRule();
+ }
+
+ public static NodeProcessor getJoinRule() {
+ return new JoinRule();
+ }
+}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Mar 28 05:53:12 2014
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
@@ -45,6 +47,8 @@ import org.apache.hadoop.hive.ql.plan.De
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -89,7 +93,7 @@ public class GenTezProcContext implement
// a map that keeps track of work that need to be linked while
// traversing an operator tree
- public final Map<Operator<?>, List<BaseWork>> linkOpWithWorkMap;
+ public final Map<Operator<?>, Map<BaseWork,TezEdgeProperty>> linkOpWithWorkMap;
// a map to keep track of what reduce sinks have to be hooked up to
// map join work
@@ -144,7 +148,7 @@ public class GenTezProcContext implement
this.currentTask = (TezTask) TaskFactory.get(
new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
- this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
+ this.linkOpWithWorkMap = new HashMap<Operator<?>, Map<BaseWork, TezEdgeProperty>>();
this.linkWorkWithReduceSinkMap = new HashMap<BaseWork, List<ReduceSinkOperator>>();
this.mapJoinWorkMap = new HashMap<MapJoinOperator, List<BaseWork>>();
this.rootToWorkMap = new HashMap<Operator<?>, BaseWork>();
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Fri Mar 28 05:53:12 2014
@@ -46,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
/**
* GenTezUtils is a collection of shared helper methods to produce
@@ -104,9 +105,10 @@ public class GenTezUtils {
setupReduceSink(context, reduceWork, reduceSink);
tezWork.add(reduceWork);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
tezWork.connect(
context.preceedingWork,
- reduceWork, EdgeType.SIMPLE_EDGE);
+ reduceWork, edgeProp);
context.connectedReduceSinks.add(reduceSink);
return reduceWork;
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Fri Mar 28 05:53:12 2014
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -45,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
/**
* GenTezWork separates the operator tree into tez tasks.
@@ -160,30 +162,34 @@ public class GenTezWork implements NodeP
* RS following the TS, we have already generated work for the TS-RS.
* We need to hook the current work to this generated work.
*/
- List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(mj);
- if (linkWorkList != null) {
- if (context.linkChildOpWithDummyOp.containsKey(mj)) {
- for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
- work.addDummyOp((HashTableDummyOperator) dummy);
+ if (context.linkOpWithWorkMap.containsKey(mj)) {
+ Map<BaseWork,TezEdgeProperty> linkWorkMap = context.linkOpWithWorkMap.get(mj);
+ if (linkWorkMap != null) {
+ if (context.linkChildOpWithDummyOp.containsKey(mj)) {
+ for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
+ work.addDummyOp((HashTableDummyOperator) dummy);
+ }
}
- }
- for (BaseWork parentWork : linkWorkList) {
- LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
- tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
-
- // need to set up output name for reduce sink now that we know the name
- // of the downstream work
- for (ReduceSinkOperator r:
- context.linkWorkWithReduceSinkMap.get(parentWork)) {
- if (r.getConf().getOutputName() != null) {
- LOG.debug("Cloning reduce sink for multi-child broadcast edge");
- // we've already set this one up. Need to clone for the next work.
- r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
- (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
- context.clonedReduceSinks.add(r);
+ for (Entry<BaseWork,TezEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
+ BaseWork parentWork = parentWorkMap.getKey();
+ LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
+ TezEdgeProperty edgeProp = parentWorkMap.getValue();
+ tezWork.connect(parentWork, work, edgeProp);
+
+ // need to set up output name for reduce sink now that we know the name
+ // of the downstream work
+ for (ReduceSinkOperator r:
+ context.linkWorkWithReduceSinkMap.get(parentWork)) {
+ if (r.getConf().getOutputName() != null) {
+ LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+ // we've already set this one up. Need to clone for the next work.
+ r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+ (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
+ context.clonedReduceSinks.add(r);
+ }
+ r.getConf().setOutputName(work.getName());
+ context.connectedReduceSinks.add(r);
}
- r.getConf().setOutputName(work.getName());
- context.connectedReduceSinks.add(r);
}
}
}
@@ -221,7 +227,8 @@ public class GenTezWork implements NodeP
// finally hook everything up
LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
- tezWork.connect(unionWork, work, EdgeType.CONTAINS);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS);
+ tezWork.connect(unionWork, work, edgeProp);
unionWork.addUnionOperators(context.currentUnionOperators);
context.currentUnionOperators.clear();
context.workWithUnionOperators.add(work);
@@ -261,7 +268,8 @@ public class GenTezWork implements NodeP
if (!context.connectedReduceSinks.contains(rs)) {
// add dependency between the two work items
- tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ tezWork.connect(work, rWork, edgeProp);
context.connectedReduceSinks.add(rs);
}
} else {
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Fri Mar 28 05:53:12 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.W
import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -117,7 +118,7 @@ public class TezCompiler extends TaskCom
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pCtx.getTopOps().values());
- GraphWalker ogw = new TezWalker(disp);
+ GraphWalker ogw = new ForwardWalker(disp);
ogw.startWalking(topNodes, null);
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java Fri Mar 28 05:53:12 2014
@@ -22,6 +22,7 @@ public class AbstractOperatorDesc implem
protected boolean vectorMode = false;
protected transient Statistics statistics;
+ protected transient OpTraits opTraits;
@Override
@Explain(skipHeader = true, displayName = "Statistics")
@@ -42,4 +43,12 @@ public class AbstractOperatorDesc implem
public void setVectorMode(boolean vm) {
this.vectorMode = vm;
}
+
+ public OpTraits getOpTraits() {
+ return opTraits;
+ }
+
+ public void setOpTraits(OpTraits opTraits) {
+ this.opTraits = opTraits;
+ }
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Fri Mar 28 05:53:12 2014
@@ -49,6 +49,9 @@ public class MapJoinDesc extends JoinDes
// for tez. used to remember which position maps to which logical input
private Map<Integer, String> parentToInput = new HashMap<Integer, String>();
+
+ // for tez. used to remember which type of a Bucket Map Join this is.
+ private boolean customBucketMapJoin;
// table alias (small) --> input file name (big) --> target file names (small)
private Map<String, Map<String, List<String>>> aliasBucketFileNameMapping;
@@ -81,6 +84,7 @@ public class MapJoinDesc extends JoinDes
this.bigTablePartSpecToFileMapping = clone.bigTablePartSpecToFileMapping;
this.dumpFilePrefix = clone.dumpFilePrefix;
this.parentToInput = clone.parentToInput;
+ this.customBucketMapJoin = clone.customBucketMapJoin;
}
public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys,
@@ -280,4 +284,12 @@ public class MapJoinDesc extends JoinDes
public float getHashTableMemoryUsage() {
return hashtableMemoryUsage;
}
+
+ public void setCustomBucketMapJoin(boolean customBucketMapJoin) {
+ this.customBucketMapJoin = customBucketMapJoin;
+ }
+
+ public boolean getCustomBucketMapJoin() {
+ return this.customBucketMapJoin;
+ }
}
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+
+public class OpTraits {
+
+ List<List<String>> bucketColNames;
+ int numBuckets;
+
+ public OpTraits(List<List<String>> bucketColNames, int numBuckets) {
+ this.bucketColNames = bucketColNames;
+ this.numBuckets = numBuckets;
+ }
+
+ public List<List<String>> getBucketColNames() {
+ return bucketColNames;
+ }
+
+ public int getNumBuckets() {
+ return numBuckets;
+ }
+
+ public void setBucketColNames(List<List<String>> bucketColNames) {
+ this.bucketColNames = bucketColNames;
+ }
+
+ public void setNumBuckets(int numBuckets) {
+ this.numBuckets = numBuckets;
+ }
+}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java Fri Mar 28 05:53:12 2014
@@ -24,4 +24,6 @@ public interface OperatorDesc extends Se
public Object clone() throws CloneNotSupportedException;
public Statistics getStatistics();
public void setStatistics(Statistics statistics);
+ public OpTraits getOpTraits();
+ public void setOpTraits(OpTraits opTraits);
}
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,45 @@
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+
+public class TezEdgeProperty {
+
+ public enum EdgeType {
+ SIMPLE_EDGE,
+ BROADCAST_EDGE,
+ CONTAINS,
+ CUSTOM_EDGE,
+ CUSTOM_SIMPLE_EDGE,
+ }
+
+ private HiveConf hiveConf;
+ private EdgeType edgeType;
+ private int numBuckets;
+
+ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType,
+ int buckets) {
+ this.hiveConf = hiveConf;
+ this.edgeType = edgeType;
+ this.numBuckets = buckets;
+ }
+
+ public TezEdgeProperty(EdgeType edgeType) {
+ this(null, edgeType, -1);
+ }
+
+ public EdgeType getEdgeType() {
+ return edgeType;
+ }
+
+ public HiveConf getHiveConf () {
+ return hiveConf;
+ }
+
+ public int getNumBuckets() {
+ return numBuckets;
+ }
+}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Fri Mar 28 05:53:12 2014
@@ -32,6 +32,8 @@ import org.apache.commons.lang3.tuple.Im
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.tez.dag.api.EdgeProperty;
/**
* TezWork. This class encapsulates all the work objects that can be executed
@@ -43,12 +45,6 @@ import org.apache.commons.logging.LogFac
@Explain(displayName = "Tez")
public class TezWork extends AbstractOperatorDesc {
- public enum EdgeType {
- SIMPLE_EDGE,
- BROADCAST_EDGE,
- CONTAINS
- }
-
private static transient final Log LOG = LogFactory.getLog(TezWork.class);
private static int counter;
@@ -57,8 +53,8 @@ public class TezWork extends AbstractOpe
private final Set<BaseWork> leaves = new HashSet<BaseWork>();
private final Map<BaseWork, List<BaseWork>> workGraph = new HashMap<BaseWork, List<BaseWork>>();
private final Map<BaseWork, List<BaseWork>> invertedWorkGraph = new HashMap<BaseWork, List<BaseWork>>();
- private final Map<Pair<BaseWork, BaseWork>, EdgeType> edgeProperties =
- new HashMap<Pair<BaseWork, BaseWork>, EdgeType>();
+ private final Map<Pair<BaseWork, BaseWork>, TezEdgeProperty> edgeProperties =
+ new HashMap<Pair<BaseWork, BaseWork>, TezEdgeProperty>();
public TezWork(String name) {
this.name = name + ":" + (++counter);
@@ -147,19 +143,6 @@ public class TezWork extends AbstractOpe
}
/**
- * connect adds an edge between a and b. Both nodes have
- * to be added prior to calling connect.
- */
- public void connect(BaseWork a, BaseWork b, EdgeType edgeType) {
- workGraph.get(a).add(b);
- invertedWorkGraph.get(b).add(a);
- roots.remove(b);
- leaves.remove(a);
- ImmutablePair workPair = new ImmutablePair(a, b);
- edgeProperties.put(workPair, edgeType);
- }
-
- /**
* disconnect removes an edge between a and b. Both a and
* b have to be in the graph. If there is no matching edge
* no change happens.
@@ -242,10 +225,14 @@ public class TezWork extends AbstractOpe
invertedWorkGraph.remove(work);
}
+ public EdgeType getEdgeType(BaseWork a, BaseWork b) {
+ return edgeProperties.get(new ImmutablePair(a,b)).getEdgeType();
+ }
+
/**
* returns the edge type connecting work a and b
*/
- public EdgeType getEdgeProperty(BaseWork a, BaseWork b) {
+ public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) {
return edgeProperties.get(new ImmutablePair(a,b));
}
@@ -275,7 +262,7 @@ public class TezWork extends AbstractOpe
for (BaseWork d: entry.getValue()) {
Dependency dependency = new Dependency();
dependency.w = d;
- dependency.type = getEdgeProperty(d, entry.getKey());
+ dependency.type = getEdgeType(d, entry.getKey());
dependencies.add(dependency);
}
if (!dependencies.isEmpty()) {
@@ -284,4 +271,19 @@ public class TezWork extends AbstractOpe
}
return result;
}
+
+ /**
+ * connect adds an edge between a and b. Both nodes have
+ * to be added prior to calling connect.
+ * @param
+ */
+ public void connect(BaseWork a, BaseWork b,
+ TezEdgeProperty edgeProp) {
+ workGraph.get(a).add(b);
+ invertedWorkGraph.get(b).add(a);
+ roots.remove(b);
+ leaves.remove(a);
+ ImmutablePair workPair = new ImmutablePair(a, b);
+ edgeProperties.put(workPair, edgeProp);
+ }
}
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Fri Mar 28 05:53:12 2014
@@ -47,8 +47,9 @@ import org.apache.hadoop.hive.ql.plan.Ba
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -92,7 +93,7 @@ public class TestTezTask {
when(path.getFileSystem(any(Configuration.class))).thenReturn(fs);
when(utils.getTezDir(any(Path.class))).thenReturn(path);
when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class),
- any(List.class), any(FileSystem.class), any(Context.class), anyBoolean())).thenAnswer(new Answer<Vertex>() {
+ any(List.class), any(FileSystem.class), any(Context.class), anyBoolean(), any(TezWork.class))).thenAnswer(new Answer<Vertex>() {
@Override
public Vertex answer(InvocationOnMock invocation) throws Throwable {
@@ -103,7 +104,7 @@ public class TestTezTask {
});
when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class),
- any(Vertex.class), any(EdgeType.class))).thenAnswer(new Answer<Edge>() {
+ any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer<Edge>() {
@Override
public Edge answer(InvocationOnMock invocation) throws Throwable {
@@ -145,9 +146,10 @@ public class TestTezTask {
rws[0].setReducer(op);
rws[1].setReducer(op);
- work.connect(mws[0], rws[0], EdgeType.SIMPLE_EDGE);
- work.connect(mws[1], rws[0], EdgeType.SIMPLE_EDGE);
- work.connect(rws[0], rws[1], EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ work.connect(mws[0], rws[0], edgeProp);
+ work.connect(mws[1], rws[0], edgeProp);
+ work.connect(rws[0], rws[1], edgeProp);
task = new TezTask(utils);
task.setWork(work);
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java Fri Mar 28 05:53:12 2014
@@ -22,7 +22,7 @@ import java.util.List;
import junit.framework.Assert;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.junit.Before;
import org.junit.Test;
@@ -62,7 +62,8 @@ public class TestTezWork {
BaseWork parent = nodes.get(0);
BaseWork child = nodes.get(1);
- work.connect(parent, child, EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ work.connect(parent, child, edgeProp);
Assert.assertEquals(work.getParents(child).size(), 1);
Assert.assertEquals(work.getChildren(parent).size(), 1);
@@ -78,7 +79,7 @@ public class TestTezWork {
Assert.assertEquals(work.getChildren(w).size(), 0);
}
- Assert.assertEquals(work.getEdgeProperty(parent, child), EdgeType.SIMPLE_EDGE);
+ Assert.assertEquals(work.getEdgeProperty(parent, child).getEdgeType(), EdgeType.SIMPLE_EDGE);
}
@Test
@@ -86,7 +87,8 @@ public class TestTezWork {
BaseWork parent = nodes.get(0);
BaseWork child = nodes.get(1);
- work.connect(parent, child, EdgeType.BROADCAST_EDGE);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.BROADCAST_EDGE);
+ work.connect(parent, child, edgeProp);
Assert.assertEquals(work.getParents(child).size(), 1);
Assert.assertEquals(work.getChildren(parent).size(), 1);
@@ -102,7 +104,7 @@ public class TestTezWork {
Assert.assertEquals(work.getChildren(w).size(), 0);
}
- Assert.assertEquals(work.getEdgeProperty(parent, child), EdgeType.BROADCAST_EDGE);
+ Assert.assertEquals(work.getEdgeProperty(parent, child).getEdgeType(), EdgeType.BROADCAST_EDGE);
}
@Test
@@ -110,8 +112,9 @@ public class TestTezWork {
BaseWork parent = nodes.get(0);
BaseWork children[] = {nodes.get(1), nodes.get(2)};
- work.connect(parent, children[0], EdgeType.SIMPLE_EDGE);
- work.connect(parent, children[1], EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ work.connect(parent, children[0], edgeProp);
+ work.connect(parent, children[1], edgeProp);
work.disconnect(parent, children[0]);
@@ -128,8 +131,9 @@ public class TestTezWork {
BaseWork parent = nodes.get(0);
BaseWork children[] = {nodes.get(1), nodes.get(2)};
- work.connect(parent, children[0], EdgeType.SIMPLE_EDGE);
- work.connect(parent, children[1], EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ work.connect(parent, children[0], edgeProp);
+ work.connect(parent, children[1], edgeProp);
work.remove(parent);
@@ -142,8 +146,9 @@ public class TestTezWork {
@Test
public void testGetAllWork() throws Exception {
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
for (int i = 4; i > 0; --i) {
- work.connect(nodes.get(i), nodes.get(i-1), EdgeType.SIMPLE_EDGE);
+ work.connect(nodes.get(i), nodes.get(i-1), edgeProp);
}
List<BaseWork> sorted = work.getAllWork();
Added: hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q (added)
+++ hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q Fri Mar 28 05:53:12 2014
@@ -0,0 +1,85 @@
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting = true;
+set hive.optimize.bucketingsorting=false;
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.key, a.value, b.value
+from tab a join tab_part b on a.key = b.key;
+
+-- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table.
+-- In this case the sub-query is chosen as the big table.
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+join tab b on a.k1 = b.key;
+
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab on tab_part.key = tab.key GROUP BY tab.key) a
+join tab b on a.k1 = b.key;
+
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y on x.key = y.key GROUP BY x.key) a
+join tab_part b on a.k1 = b.key;
+
+-- multi-way join
+explain
+select a.key, a.value, b.value
+from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key;
+
+explain
+select a.key, a.value, c.value
+from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a join tab c on a.key = c.key;
+
+-- in this case sub-query is the small table
+explain
+select a.key, a.value, b.value
+from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+join tab_part b on a.key = b.key;
+
+set hive.map.aggr=false;
+explain
+select a.key, a.value, b.value
+from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+join tab_part b on a.key = b.key;
+
+-- join on non-bucketed column results in broadcast join.
+explain
+select a.key, a.value, b.value
+from tab a join tab_part b on a.value = b.value;
+
+CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab1
+select key,value from srcbucket_mapjoin;
+
+explain
+select a.key, a.value, b.value
+from tab1 a join tab_part b on a.key = b.key;
+
+explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value;
+
+
Added: hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q (added)
+++ hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q Fri Mar 28 05:53:12 2014
@@ -0,0 +1,50 @@
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting = true;
+set hive.optimize.bucketingsorting=false;
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+set hive.convert.join.bucket.mapjoin.tez = true;
+
+explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value;
+
+CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab1
+select key,value from srcbucket_mapjoin;
+
+explain
+select a.key, a.value, b.value
+from tab1 a join src b on a.key = b.key;
+
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a join (select key from tab_part where key > 2) b on a.key = b.key;
+
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a left outer join (select key from tab_part where key > 2) b on a.key = b.key;
+
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a right outer join (select key from tab_part where key > 2) b on a.key = b.key;
+
+explain select a.key, b.key from (select distinct key from tab) a join tab b on b.key = a.key;
+
+explain select a.value, b.value from (select distinct value from tab) a join tab b on b.key = a.value;