You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/03/28 06:53:13 UTC

svn commit: r1582613 [2/4] - in /hive/branches/branch-0.13: common/src/java/org/apache/hadoop/hive/conf/ itests/qtest/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/io/...

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class TezBucketJoinProcCtx extends BucketJoinProcCtx {
+  // determines if we need to use custom edge or one-to-one edge
+  boolean isSubQuery = false;
+  int numBuckets = -1;
+
+  public TezBucketJoinProcCtx(HiveConf conf) {
+    super(conf);
+  }
+
+  public void setIsSubQuery (boolean isSubQuery) {
+    this.isSubQuery = isSubQuery;
+  }
+
+  public boolean isSubQuery () {
+    return isSubQuery;
+  }
+
+  public void setNumBuckets(int numBuckets) {
+    this.numBuckets = numBuckets;
+  }
+
+  public Integer getNumBuckets() {
+    return numBuckets;
+  }
+}

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+public class AnnotateOpTraitsProcCtx implements NodeProcessorCtx {
+
+  ParseContext parseContext;
+  HiveConf conf;
+  
+  public AnnotateOpTraitsProcCtx(ParseContext parseContext) {
+    this.setParseContext(parseContext);
+    if(parseContext != null) {
+      this.setConf(parseContext.getConf());
+    } else {
+      this.setConf(null);
+    }
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public void setConf(HiveConf conf) {
+    this.conf = conf;
+  }
+
+  public ParseContext getParseContext() {
+    return parseContext;
+  }
+
+  public void setParseContext(ParseContext parseContext) {
+    this.parseContext = parseContext;
+  }
+
+}

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,78 @@
+package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MuxOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.OpTraitsRulesProcFactory;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/*
+ * This class annotates each operator with its traits. The OpTraits class
+ * specifies the traits that are populated for each operator.
+ */
+public class AnnotateWithOpTraits implements Transform {
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    AnnotateOpTraitsProcCtx annotateCtx = new AnnotateOpTraitsProcCtx(pctx);
+
+    // create a walker which walks the tree in a DFS manner while maintaining the
+    // operator stack. The dispatcher generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getTableScanRule());
+    opRules.put(new RuleRegExp("RS", ReduceSinkOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getReduceSinkRule());
+    opRules.put(new RuleRegExp("JOIN", JoinOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getJoinRule());
+    opRules.put(new RuleRegExp("MAPJOIN", MapJoinOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getMultiParentRule());
+    opRules.put(new RuleRegExp("SMB", SMBMapJoinOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getMultiParentRule());
+    opRules.put(new RuleRegExp("MUX", MuxOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getMultiParentRule());
+    opRules.put(new RuleRegExp("DEMUX", DemuxOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getMultiParentRule());
+    opRules.put(new RuleRegExp("UNION", UnionOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getMultiParentRule());
+    opRules.put(new RuleRegExp("GBY", GroupByOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getGroupByRule());
+    opRules.put(new RuleRegExp("SEL", SelectOperator.getOperatorName() + "%"),
+        OpTraitsRulesProcFactory.getSelectRule());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(OpTraitsRulesProcFactory.getDefaultRule(), opRules,
+        annotateCtx);
+    GraphWalker ogw = new PreOrderWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    return pctx;
+  }
+
+}

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.AbstractBucketJoinProc;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OpTraits;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/*
+ * This class populates the following operator traits for the entire operator tree:
+ * 1. Bucketing columns.
+ * 2. Table
+ * 3. Pruned partitions
+ * 
+ * Bucketing columns refer to not to the bucketing columns from the table object but instead
+ * to the dynamic 'bucketing' done by operators such as reduce sinks and group-bys.
+ * All the operators have a translation from their input names to the output names corresponding
+ * to the bucketing column. The colExprMap that is a part of every operator is used in this
+ * transformation.
+ * 
+ * The table object is used for the base-case in map-reduce when deciding to perform a bucket
+ * map join. This object is used in the BucketMapJoinProc to find if number of files for the
+ * table correspond to the number of buckets specified in the meta data.
+ * 
+ * The pruned partition information has the same purpose as the table object at the moment.
+ * 
+ * The traits of sorted-ness etc. can be populated as well for future optimizations to make use of.
+ */
+
+public class OpTraitsRulesProcFactory {
+
+  public static class DefaultRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      @SuppressWarnings("unchecked")
+      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)nd;
+      op.setOpTraits(op.getParentOperators().get(0).getOpTraits());
+      return null;
+    }
+
+  }
+
+  /*
+   * Reduce sink operator is the de-facto operator 
+   * for determining keyCols (emit keys of a map phase)
+   */
+  public static class ReduceSinkRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      ReduceSinkOperator rs = (ReduceSinkOperator)nd;
+      List<String> bucketCols = new ArrayList<String>();
+      if (rs.getColumnExprMap() != null) {
+        for (ExprNodeDesc exprDesc : rs.getConf().getKeyCols()) {
+          for (Entry<String, ExprNodeDesc> entry : rs.getColumnExprMap().entrySet()) {
+            if (exprDesc.isSame(entry.getValue())) {
+              bucketCols.add(entry.getKey());
+            }
+          }
+        }
+      }
+
+      List<List<String>> listBucketCols = new ArrayList<List<String>>();
+      listBucketCols.add(bucketCols);
+      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      rs.setOpTraits(opTraits);
+      return null;
+    }
+  }
+
+  /*
+   * Table scan has the table object and pruned partitions that has information such as
+   * bucketing, sorting, etc. that is used later for optimization.
+   */
+  public static class TableScanRule implements NodeProcessor {
+
+    public boolean checkBucketedTable(Table tbl, 
+        ParseContext pGraphContext,
+        PrunedPartitionList prunedParts) throws SemanticException {
+
+      if (tbl.isPartitioned()) {
+        List<Partition> partitions = prunedParts.getNotDeniedPartns();
+        // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
+        if (!partitions.isEmpty()) {
+          for (Partition p : partitions) {
+            List<String> fileNames =
+                AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext);
+            // The number of files for the table should be same as number of buckets.
+            int bucketCount = p.getBucketCount();
+
+            if (fileNames.size() != 0 && fileNames.size() != bucketCount) {
+              return false;
+            }
+          }
+        }
+      } else {
+
+        List<String> fileNames =
+            AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext);
+        Integer num = new Integer(tbl.getNumBuckets());
+
+        // The number of files for the table should be same as number of buckets.
+        if (fileNames.size() != 0 && fileNames.size() != num) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      TableScanOperator ts = (TableScanOperator)nd;
+      AnnotateOpTraitsProcCtx opTraitsCtx = (AnnotateOpTraitsProcCtx)procCtx;
+      Table table = opTraitsCtx.getParseContext().getTopToTable().get(ts);
+      PrunedPartitionList prunedPartList = null;
+      try {
+        prunedPartList =
+            opTraitsCtx.getParseContext().getPrunedPartitions(ts.getConf().getAlias(), ts);
+      } catch (HiveException e) {
+        prunedPartList = null;
+      }
+      boolean bucketMapJoinConvertible = checkBucketedTable(table, 
+          opTraitsCtx.getParseContext(), prunedPartList);
+      List<List<String>>bucketCols = new ArrayList<List<String>>();
+      int numBuckets = -1;
+      if (bucketMapJoinConvertible) {
+        bucketCols.add(table.getBucketCols());
+        numBuckets = table.getNumBuckets();
+      }
+      OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+      ts.setOpTraits(opTraits);
+      return null;
+    }
+  }
+
+  /*
+   * Group-by re-orders the keys emitted hence, the keyCols would change.
+   */
+  public static class GroupByRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      GroupByOperator gbyOp = (GroupByOperator)nd;
+      List<String> gbyKeys = new ArrayList<String>();
+      for (ExprNodeDesc exprDesc : gbyOp.getConf().getKeys()) {
+        for (Entry<String, ExprNodeDesc> entry : gbyOp.getColumnExprMap().entrySet()) {
+          if (exprDesc.isSame(entry.getValue())) {
+            gbyKeys.add(entry.getKey());
+          }
+        }
+      }
+
+      List<List<String>> listBucketCols = new ArrayList<List<String>>();
+      listBucketCols.add(gbyKeys);
+      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      gbyOp.setOpTraits(opTraits);
+      return null;
+    }
+  }
+
+  public static class SelectRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      SelectOperator selOp = (SelectOperator)nd;
+      List<List<String>> parentBucketColNames = 
+          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+      List<List<String>> listBucketCols = new ArrayList<List<String>>();
+      if (selOp.getColumnExprMap() != null) {
+        if (parentBucketColNames != null) {
+          for (List<String> colNames : parentBucketColNames) {
+            List<String> bucketColNames = new ArrayList<String>();
+            for (String colName : colNames) {
+              for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
+                if (entry.getValue() instanceof ExprNodeColumnDesc) {
+                  if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+                    bucketColNames.add(entry.getKey());
+                  }
+                }
+              }
+            }
+            listBucketCols.add(bucketColNames);
+          }
+        }
+      }
+
+      int numBuckets = -1;
+      if (selOp.getParentOperators().get(0).getOpTraits() != null) {
+        numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
+      }
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+      selOp.setOpTraits(opTraits);
+      return null;
+    }
+  }
+
+  public static class JoinRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      JoinOperator joinOp = (JoinOperator)nd;
+      List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      byte pos = 0;
+      for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+        if (!(parentOp instanceof ReduceSinkOperator)) {
+          // can be mux operator
+          break;
+        }
+        ReduceSinkOperator rsOp = (ReduceSinkOperator)parentOp;
+        if (rsOp.getOpTraits() == null) {
+          ReduceSinkRule rsRule = new ReduceSinkRule();
+          rsRule.process(rsOp, stack, procCtx, nodeOutputs);
+        }
+        bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+        pos++;
+      }
+
+      joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+      return null;
+    }
+
+    private List<String> getOutputColNames(JoinOperator joinOp,
+        ReduceSinkOperator rs, byte pos) {
+      List<List<String>> parentBucketColNames =
+          rs.getOpTraits().getBucketColNames();
+
+      if (parentBucketColNames != null) {
+        List<String> bucketColNames = new ArrayList<String>();
+
+        // guaranteed that there is only 1 list within this list because
+        // a reduce sink always brings down the bucketing cols to a single list.
+        // may not be true with correlation operators (mux-demux)
+        List<String> colNames = parentBucketColNames.get(0);
+        for (String colName : colNames) {
+          for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
+            if (exprNode instanceof ExprNodeColumnDesc) {
+              if(((ExprNodeColumnDesc)(exprNode)).getColumn().equals(colName)) {
+                for (Entry<String, ExprNodeDesc> entry : joinOp.getColumnExprMap().entrySet()) {
+                  if (entry.getValue().isSame(exprNode)) {
+                    bucketColNames.add(entry.getKey());
+                    // we have found the colName
+                    break;
+                  }
+                }
+              } else {
+                // continue on to the next exprNode to find a match
+                continue;
+              }
+              // we have found the colName. No need to search more exprNodes.
+              break;
+            }
+          }
+        }
+
+        return bucketColNames;
+      }
+
+      // no col names in parent
+      return null;
+    }
+  }
+
+  /*
+   *  When we have operators that have multiple parents, it is not
+   *  clear which parent's traits we need to propagate forward.
+   */
+  public static class MultiParentRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      OpTraits opTraits = new OpTraits(null, -1);
+      @SuppressWarnings("unchecked")
+      Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
+      operator.setOpTraits(opTraits);
+      return null;
+    } 
+  }
+
+  public static NodeProcessor getTableScanRule() {
+    return new TableScanRule();
+  }
+
+  public static NodeProcessor getReduceSinkRule() {
+    return new ReduceSinkRule();
+  }
+  
+  public static NodeProcessor getSelectRule() {
+    return new SelectRule();
+  }
+
+  public static NodeProcessor getDefaultRule() {
+    return new DefaultRule();
+  }
+
+  public static NodeProcessor getMultiParentRule() {
+    return new MultiParentRule();
+  }
+
+  public static NodeProcessor getGroupByRule() {
+    return new GroupByRule();
+  }
+
+  public static NodeProcessor getJoinRule() {
+    return new JoinRule();
+  }
+}

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Mar 28 05:53:12 2014
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
@@ -45,6 +47,8 @@ import org.apache.hadoop.hive.ql.plan.De
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 
 /**
@@ -89,7 +93,7 @@ public class GenTezProcContext implement
 
   // a map that keeps track of work that need to be linked while
   // traversing an operator tree
-  public final Map<Operator<?>, List<BaseWork>> linkOpWithWorkMap;
+  public final Map<Operator<?>, Map<BaseWork,TezEdgeProperty>> linkOpWithWorkMap;
 
   // a map to keep track of what reduce sinks have to be hooked up to
   // map join work
@@ -144,7 +148,7 @@ public class GenTezProcContext implement
     this.currentTask = (TezTask) TaskFactory.get(
          new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
     this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
-    this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
+    this.linkOpWithWorkMap = new HashMap<Operator<?>, Map<BaseWork, TezEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new HashMap<BaseWork, List<ReduceSinkOperator>>();
     this.mapJoinWorkMap = new HashMap<MapJoinOperator, List<BaseWork>>();
     this.rootToWorkMap = new HashMap<Operator<?>, BaseWork>();

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Fri Mar 28 05:53:12 2014
@@ -46,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 
 /**
  * GenTezUtils is a collection of shared helper methods to produce
@@ -104,9 +105,10 @@ public class GenTezUtils {
     setupReduceSink(context, reduceWork, reduceSink);
 
     tezWork.add(reduceWork);
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
     tezWork.connect(
         context.preceedingWork,
-        reduceWork, EdgeType.SIMPLE_EDGE);
+        reduceWork, edgeProp);
     context.connectedReduceSinks.add(reduceSink);
 
     return reduceWork;

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Fri Mar 28 05:53:12 2014
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -45,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 
 /**
  * GenTezWork separates the operator tree into tez tasks.
@@ -160,30 +162,34 @@ public class GenTezWork implements NodeP
          * RS following the TS, we have already generated work for the TS-RS.
          * We need to hook the current work to this generated work.
          */
-        List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(mj);
-        if (linkWorkList != null) {
-          if (context.linkChildOpWithDummyOp.containsKey(mj)) {
-            for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
-              work.addDummyOp((HashTableDummyOperator) dummy);
+        if (context.linkOpWithWorkMap.containsKey(mj)) {
+          Map<BaseWork,TezEdgeProperty> linkWorkMap = context.linkOpWithWorkMap.get(mj);
+          if (linkWorkMap != null) {
+            if (context.linkChildOpWithDummyOp.containsKey(mj)) {
+              for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
+                work.addDummyOp((HashTableDummyOperator) dummy);
+              }
             }
-          }
-          for (BaseWork parentWork : linkWorkList) {
-            LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
-            tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
-
-            // need to set up output name for reduce sink now that we know the name
-            // of the downstream work
-            for (ReduceSinkOperator r:
-                   context.linkWorkWithReduceSinkMap.get(parentWork)) {
-              if (r.getConf().getOutputName() != null) {
-                LOG.debug("Cloning reduce sink for multi-child broadcast edge");
-                // we've already set this one up. Need to clone for the next work.
-                r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
-                    (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
-                context.clonedReduceSinks.add(r);
+            for (Entry<BaseWork,TezEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
+              BaseWork parentWork = parentWorkMap.getKey();
+              LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
+              TezEdgeProperty edgeProp = parentWorkMap.getValue();
+              tezWork.connect(parentWork, work, edgeProp);
+              
+              // need to set up output name for reduce sink now that we know the name
+              // of the downstream work
+              for (ReduceSinkOperator r:
+                     context.linkWorkWithReduceSinkMap.get(parentWork)) {
+                if (r.getConf().getOutputName() != null) {
+                  LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+                  // we've already set this one up. Need to clone for the next work.
+                  r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+                      (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
+                  context.clonedReduceSinks.add(r);
+                }
+                r.getConf().setOutputName(work.getName());
+                context.connectedReduceSinks.add(r);
               }
-              r.getConf().setOutputName(work.getName());
-              context.connectedReduceSinks.add(r);
             }
           }
         }
@@ -221,7 +227,8 @@ public class GenTezWork implements NodeP
 
       // finally hook everything up
       LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
-      tezWork.connect(unionWork, work, EdgeType.CONTAINS);
+      TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS);
+      tezWork.connect(unionWork, work, edgeProp);
       unionWork.addUnionOperators(context.currentUnionOperators);
       context.currentUnionOperators.clear();
       context.workWithUnionOperators.add(work);
@@ -261,7 +268,8 @@ public class GenTezWork implements NodeP
 
       if (!context.connectedReduceSinks.contains(rs)) {
         // add dependency between the two work items
-        tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
+        TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+        tezWork.connect(work, rWork, edgeProp);
         context.connectedReduceSinks.add(rs);
       }
     } else {

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Fri Mar 28 05:53:12 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.W
 import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -117,7 +118,7 @@ public class TezCompiler extends TaskCom
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
     List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pCtx.getTopOps().values());
-    GraphWalker ogw = new TezWalker(disp);
+    GraphWalker ogw = new ForwardWalker(disp);
     ogw.startWalking(topNodes, null);
   }
 

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java Fri Mar 28 05:53:12 2014
@@ -22,6 +22,7 @@ public class AbstractOperatorDesc implem
 
   protected boolean vectorMode = false;
   protected transient Statistics statistics;
+  protected transient OpTraits opTraits;
 
   @Override
   @Explain(skipHeader = true, displayName = "Statistics")
@@ -42,4 +43,12 @@ public class AbstractOperatorDesc implem
   public void setVectorMode(boolean vm) {
     this.vectorMode = vm;
   }
+  
+  public OpTraits getOpTraits() {
+    return opTraits;
+  }
+  
+  public void setOpTraits(OpTraits opTraits) {
+    this.opTraits = opTraits;
+  }
 }

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Fri Mar 28 05:53:12 2014
@@ -49,6 +49,9 @@ public class MapJoinDesc extends JoinDes
 
   // for tez. used to remember which position maps to which logical input
   private Map<Integer, String> parentToInput = new HashMap<Integer, String>();
+  
+  // for tez. used to remember which type of a Bucket Map Join this is.
+  private boolean customBucketMapJoin;
 
   // table alias (small) --> input file name (big) --> target file names (small)
   private Map<String, Map<String, List<String>>> aliasBucketFileNameMapping;
@@ -81,6 +84,7 @@ public class MapJoinDesc extends JoinDes
     this.bigTablePartSpecToFileMapping = clone.bigTablePartSpecToFileMapping;
     this.dumpFilePrefix = clone.dumpFilePrefix;
     this.parentToInput = clone.parentToInput;
+    this.customBucketMapJoin = clone.customBucketMapJoin;
   }
 
   public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys,
@@ -280,4 +284,12 @@ public class MapJoinDesc extends JoinDes
   public float getHashTableMemoryUsage() {
     return hashtableMemoryUsage;
   }
+  
+  public void setCustomBucketMapJoin(boolean customBucketMapJoin) {
+    this.customBucketMapJoin = customBucketMapJoin;
+  }
+  
+  public boolean getCustomBucketMapJoin() {
+    return this.customBucketMapJoin;
+  }
 }

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+
+public class OpTraits {
+  
+  List<List<String>> bucketColNames;
+  int numBuckets;
+  
+  public OpTraits(List<List<String>> bucketColNames, int numBuckets) {
+    this.bucketColNames = bucketColNames;
+    this.numBuckets = numBuckets;
+  }
+
+  public List<List<String>> getBucketColNames() {
+    return bucketColNames;
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+
+  public void setBucketColNames(List<List<String>> bucketColNames) {
+    this.bucketColNames = bucketColNames;    
+  }
+
+  public void setNumBuckets(int numBuckets) {
+    this.numBuckets = numBuckets;
+  }
+}

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java Fri Mar 28 05:53:12 2014
@@ -24,4 +24,6 @@ public interface OperatorDesc extends Se
   public Object clone() throws CloneNotSupportedException;
   public Statistics getStatistics();
   public void setStatistics(Statistics statistics);
+  public OpTraits getOpTraits();
+  public void setOpTraits(OpTraits opTraits);
 }

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,45 @@
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+
+public class TezEdgeProperty {
+  
+  public enum EdgeType {
+    SIMPLE_EDGE,
+    BROADCAST_EDGE, 
+    CONTAINS,
+    CUSTOM_EDGE,
+    CUSTOM_SIMPLE_EDGE,
+  }
+
+  private HiveConf hiveConf;
+  private EdgeType edgeType;
+  private int numBuckets;
+
+  public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, 
+      int buckets) {
+    this.hiveConf = hiveConf;
+    this.edgeType = edgeType;
+    this.numBuckets = buckets;
+  }
+
+  public TezEdgeProperty(EdgeType edgeType) {
+    this(null, edgeType, -1);
+  }
+
+  public EdgeType getEdgeType() {
+    return edgeType;
+  }
+
+  public HiveConf getHiveConf () {
+    return hiveConf;
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+}

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Fri Mar 28 05:53:12 2014
@@ -32,6 +32,8 @@ import org.apache.commons.lang3.tuple.Im
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.tez.dag.api.EdgeProperty;
 
 /**
  * TezWork. This class encapsulates all the work objects that can be executed
@@ -43,12 +45,6 @@ import org.apache.commons.logging.LogFac
 @Explain(displayName = "Tez")
 public class TezWork extends AbstractOperatorDesc {
 
-  public enum EdgeType {
-    SIMPLE_EDGE,
-    BROADCAST_EDGE,
-    CONTAINS
-  }
-
   private static transient final Log LOG = LogFactory.getLog(TezWork.class);
 
   private static int counter;
@@ -57,8 +53,8 @@ public class TezWork extends AbstractOpe
   private final Set<BaseWork> leaves = new HashSet<BaseWork>();
   private final Map<BaseWork, List<BaseWork>> workGraph = new HashMap<BaseWork, List<BaseWork>>();
   private final Map<BaseWork, List<BaseWork>> invertedWorkGraph = new HashMap<BaseWork, List<BaseWork>>();
-  private final Map<Pair<BaseWork, BaseWork>, EdgeType> edgeProperties =
-      new HashMap<Pair<BaseWork, BaseWork>, EdgeType>();
+  private final Map<Pair<BaseWork, BaseWork>, TezEdgeProperty> edgeProperties =
+      new HashMap<Pair<BaseWork, BaseWork>, TezEdgeProperty>();
 
   public TezWork(String name) {
     this.name = name + ":" + (++counter);
@@ -147,19 +143,6 @@ public class TezWork extends AbstractOpe
   }
 
   /**
-   * connect adds an edge between a and b. Both nodes have
-   * to be added prior to calling connect.
-   */
-  public void connect(BaseWork a, BaseWork b, EdgeType edgeType) {
-    workGraph.get(a).add(b);
-    invertedWorkGraph.get(b).add(a);
-    roots.remove(b);
-    leaves.remove(a);
-    ImmutablePair workPair = new ImmutablePair(a, b);
-    edgeProperties.put(workPair, edgeType);
-  }
-
-  /**
    * disconnect removes an edge between a and b. Both a and
    * b have to be in the graph. If there is no matching edge
    * no change happens.
@@ -242,10 +225,14 @@ public class TezWork extends AbstractOpe
     invertedWorkGraph.remove(work);
   }
 
+  public EdgeType getEdgeType(BaseWork a, BaseWork b) {
+    return edgeProperties.get(new ImmutablePair(a,b)).getEdgeType();
+  }
+
   /**
    * returns the edge type connecting work a and b
    */
-  public EdgeType getEdgeProperty(BaseWork a, BaseWork b) {
+  public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) {
     return edgeProperties.get(new ImmutablePair(a,b));
   }
 
@@ -275,7 +262,7 @@ public class TezWork extends AbstractOpe
       for (BaseWork d: entry.getValue()) {
         Dependency dependency = new Dependency();
         dependency.w = d;
-        dependency.type = getEdgeProperty(d, entry.getKey());
+        dependency.type = getEdgeType(d, entry.getKey());
         dependencies.add(dependency);
       }
       if (!dependencies.isEmpty()) {
@@ -284,4 +271,19 @@ public class TezWork extends AbstractOpe
     }
     return result;
   }
+
+  /**
+   * connect adds an edge between a and b. Both nodes have
+   * to be added prior to calling connect.
+   * @param  
+   */
+  public void connect(BaseWork a, BaseWork b,
+      TezEdgeProperty edgeProp) {
+    workGraph.get(a).add(b);
+    invertedWorkGraph.get(b).add(a);
+    roots.remove(b);
+    leaves.remove(a);
+    ImmutablePair workPair = new ImmutablePair(a, b);
+    edgeProperties.put(workPair, edgeProp);
+  }
 }

Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Fri Mar 28 05:53:12 2014
@@ -47,8 +47,9 @@ import org.apache.hadoop.hive.ql.plan.Ba
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -92,7 +93,7 @@ public class TestTezTask {
     when(path.getFileSystem(any(Configuration.class))).thenReturn(fs);
     when(utils.getTezDir(any(Path.class))).thenReturn(path);
     when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class),
-        any(List.class), any(FileSystem.class), any(Context.class), anyBoolean())).thenAnswer(new Answer<Vertex>() {
+        any(List.class), any(FileSystem.class), any(Context.class), anyBoolean(), any(TezWork.class))).thenAnswer(new Answer<Vertex>() {
 
           @Override
           public Vertex answer(InvocationOnMock invocation) throws Throwable {
@@ -103,7 +104,7 @@ public class TestTezTask {
         });
 
     when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class),
-        any(Vertex.class), any(EdgeType.class))).thenAnswer(new Answer<Edge>() {
+        any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer<Edge>() {
 
           @Override
           public Edge answer(InvocationOnMock invocation) throws Throwable {
@@ -145,9 +146,10 @@ public class TestTezTask {
     rws[0].setReducer(op);
     rws[1].setReducer(op);
 
-    work.connect(mws[0], rws[0], EdgeType.SIMPLE_EDGE);
-    work.connect(mws[1], rws[0], EdgeType.SIMPLE_EDGE);
-    work.connect(rws[0], rws[1], EdgeType.SIMPLE_EDGE);
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+    work.connect(mws[0], rws[0], edgeProp);
+    work.connect(mws[1], rws[0], edgeProp);
+    work.connect(rws[0], rws[1], edgeProp);
 
     task = new TezTask(utils);
     task.setWork(work);

Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java Fri Mar 28 05:53:12 2014
@@ -22,7 +22,7 @@ import java.util.List;
 
 import junit.framework.Assert;
 
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -62,7 +62,8 @@ public class TestTezWork {
     BaseWork parent = nodes.get(0);
     BaseWork child = nodes.get(1);
 
-    work.connect(parent, child, EdgeType.SIMPLE_EDGE);
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+    work.connect(parent, child, edgeProp);
 
     Assert.assertEquals(work.getParents(child).size(), 1);
     Assert.assertEquals(work.getChildren(parent).size(), 1);
@@ -78,7 +79,7 @@ public class TestTezWork {
       Assert.assertEquals(work.getChildren(w).size(), 0);
     }
 
-    Assert.assertEquals(work.getEdgeProperty(parent, child), EdgeType.SIMPLE_EDGE);
+    Assert.assertEquals(work.getEdgeProperty(parent, child).getEdgeType(), EdgeType.SIMPLE_EDGE);
   }
 
   @Test
@@ -86,7 +87,8 @@ public class TestTezWork {
     BaseWork parent = nodes.get(0);
     BaseWork child = nodes.get(1);
 
-    work.connect(parent, child, EdgeType.BROADCAST_EDGE);
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.BROADCAST_EDGE);
+    work.connect(parent, child, edgeProp);
 
     Assert.assertEquals(work.getParents(child).size(), 1);
     Assert.assertEquals(work.getChildren(parent).size(), 1);
@@ -102,7 +104,7 @@ public class TestTezWork {
       Assert.assertEquals(work.getChildren(w).size(), 0);
     }
 
-    Assert.assertEquals(work.getEdgeProperty(parent, child), EdgeType.BROADCAST_EDGE);
+    Assert.assertEquals(work.getEdgeProperty(parent, child).getEdgeType(), EdgeType.BROADCAST_EDGE);
   }
 
   @Test
@@ -110,8 +112,9 @@ public class TestTezWork {
     BaseWork parent = nodes.get(0);
     BaseWork children[] = {nodes.get(1), nodes.get(2)};
 
-    work.connect(parent, children[0], EdgeType.SIMPLE_EDGE);
-    work.connect(parent, children[1], EdgeType.SIMPLE_EDGE);
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+    work.connect(parent, children[0], edgeProp);
+    work.connect(parent, children[1], edgeProp);
 
     work.disconnect(parent, children[0]);
 
@@ -128,8 +131,9 @@ public class TestTezWork {
     BaseWork parent = nodes.get(0);
     BaseWork children[] = {nodes.get(1), nodes.get(2)};
 
-    work.connect(parent, children[0], EdgeType.SIMPLE_EDGE);
-    work.connect(parent, children[1], EdgeType.SIMPLE_EDGE);
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+    work.connect(parent, children[0], edgeProp);
+    work.connect(parent, children[1], edgeProp);
 
     work.remove(parent);
 
@@ -142,8 +146,9 @@ public class TestTezWork {
 
   @Test
   public void testGetAllWork() throws Exception {
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
     for (int i = 4; i > 0; --i) {
-      work.connect(nodes.get(i), nodes.get(i-1), EdgeType.SIMPLE_EDGE);
+      work.connect(nodes.get(i), nodes.get(i-1), edgeProp);
     }
 
     List<BaseWork> sorted = work.getAllWork();

Added: hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q (added)
+++ hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q Fri Mar 28 05:53:12 2014
@@ -0,0 +1,85 @@
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting = true;
+set hive.optimize.bucketingsorting=false;
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.key, a.value, b.value
+from tab a join tab_part b on a.key = b.key;
+
+-- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table.
+-- In this case the sub-query is chosen as the big table.
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+join tab b on a.k1 = b.key;
+
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab on tab_part.key = tab.key GROUP BY tab.key) a
+join tab b on a.k1 = b.key;
+
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y on x.key = y.key GROUP BY x.key) a
+join tab_part b on a.k1 = b.key;
+
+-- multi-way join
+explain
+select a.key, a.value, b.value
+from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key;
+
+explain
+select a.key, a.value, c.value
+from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a join tab c on a.key = c.key;
+
+-- in this case sub-query is the small table
+explain
+select a.key, a.value, b.value
+from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+join tab_part b on a.key = b.key;
+
+set hive.map.aggr=false;
+explain
+select a.key, a.value, b.value
+from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+join tab_part b on a.key = b.key;
+
+-- join on non-bucketed column results in broadcast join.
+explain
+select a.key, a.value, b.value
+from tab a join tab_part b on a.value = b.value;
+
+CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab1
+select key,value from srcbucket_mapjoin;
+
+explain
+select a.key, a.value, b.value
+from tab1 a join tab_part b on a.key = b.key;
+
+explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value;
+
+

Added: hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q (added)
+++ hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q Fri Mar 28 05:53:12 2014
@@ -0,0 +1,50 @@
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting = true;
+set hive.optimize.bucketingsorting=false;
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+set hive.convert.join.bucket.mapjoin.tez = true;
+
+explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value;
+
+CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab1
+select key,value from srcbucket_mapjoin;
+
+explain
+select a.key, a.value, b.value
+from tab1 a join src b on a.key = b.key;
+
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a join (select key from tab_part where key > 2) b on a.key = b.key;
+
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a left outer join (select key from tab_part where key > 2) b on a.key = b.key;
+
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a right outer join (select key from tab_part where key > 2) b on a.key = b.key;
+
+explain select a.key, b.key from (select distinct key from tab) a join tab b on b.key = a.key;
+
+explain select a.value, b.value from (select distinct value from tab) a join tab b on b.key = a.value;