You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2017/10/24 20:16:14 UTC

[7/7] hive git commit: HIVE-14731: Use Tez cartesian product edge in Hive (unpartitioned case only) (Zhiyuan Yang via Gunther Hagleitner)

HIVE-14731: Use Tez cartesian product edge in Hive (unpartitioned case only) (Zhiyuan Yang via Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfbe6125
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfbe6125
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfbe6125

Branch: refs/heads/master
Commit: cfbe6125725223657dff1e2c9bc3131a5193ae51
Parents: a284df1
Author: Gunther Hagleitner <gu...@apache.org>
Authored: Tue Oct 24 13:06:09 2017 -0700
Committer: Gunther Hagleitner <gu...@apache.org>
Committed: Tue Oct 24 13:06:09 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/jsonexplain/Vertex.java  |    2 +-
 .../common/jsonexplain/tez/TezJsonParser.java   |    2 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    2 +
 data/conf/llap/hive-site.xml                    |    5 +
 data/conf/tez/hive-site.xml                     |    5 +
 .../test/resources/testconfiguration.properties |    6 +
 .../hadoop/hive/ql/exec/tez/DagUtils.java       |   69 +-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |    5 +-
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |   74 +-
 .../optimizer/physical/CrossProductCheck.java   |  368 ---
 .../optimizer/physical/CrossProductHandler.java |  382 +++
 .../optimizer/physical/PhysicalOptimizer.java   |    2 +-
 .../physical/SparkCrossProductCheck.java        |   12 +-
 .../hadoop/hive/ql/parse/TezCompiler.java       |    4 +-
 .../hadoop/hive/ql/plan/TezEdgeProperty.java    |    4 +-
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    |    4 +-
 .../test/queries/clientpositive/cross_prod_1.q  |   34 +
 .../test/queries/clientpositive/cross_prod_3.q  |   13 +
 .../test/queries/clientpositive/cross_prod_4.q  |   10 +
 .../dynamic_partition_pruning_2.q               |    2 +-
 .../clientpositive/hybridgrace_hashjoin_1.q     |    1 +
 .../queries/clientpositive/subquery_multi.q     |    4 +-
 .../queries/clientpositive/subquery_notin.q     |    4 +-
 .../queries/clientpositive/subquery_select.q    |    4 +-
 .../clientpositive/llap/auto_join0.q.out        |   56 +-
 .../clientpositive/llap/auto_join_filters.q.out |    4 +-
 .../clientpositive/llap/auto_join_nulls.q.out   |    2 +-
 .../llap/auto_sortmerge_join_12.q.out           |   64 +-
 .../clientpositive/llap/cross_join.q.out        |   94 +-
 .../clientpositive/llap/cross_prod_1.q.out      | 2502 ++++++++++++++++++
 .../clientpositive/llap/cross_prod_3.q.out      |  133 +
 .../clientpositive/llap/cross_prod_4.q.out      |  195 ++
 .../llap/cross_product_check_1.q.out            |   12 +-
 .../llap/cross_product_check_2.q.out            |  305 ++-
 .../results/clientpositive/llap/cte_5.q.out     |   10 +-
 .../results/clientpositive/llap/cte_mat_1.q.out |   10 +-
 .../results/clientpositive/llap/cte_mat_2.q.out |   10 +-
 .../llap/dynamic_partition_pruning.q.out        |   81 +-
 .../llap/dynamic_partition_pruning_2.q.out      |   52 +-
 .../llap/dynamic_semijoin_reduction_sw.q.out    |    2 +-
 .../clientpositive/llap/explainuser_1.q.out     |   30 +-
 .../llap/hybridgrace_hashjoin_1.q.out           |  166 +-
 .../clientpositive/llap/jdbc_handler.q.out      |    2 +-
 .../results/clientpositive/llap/join0.q.out     |    2 +-
 .../clientpositive/llap/leftsemijoin.q.out      |    2 +-
 .../results/clientpositive/llap/mapjoin2.q.out  |    2 +-
 .../clientpositive/llap/mapjoin_hint.q.out      |   64 +-
 .../clientpositive/llap/subquery_exists.q.out   |    6 +-
 .../clientpositive/llap/subquery_in.q.out       |    2 +-
 .../clientpositive/llap/subquery_multi.q.out    |  106 +-
 .../clientpositive/llap/subquery_notin.q.out    |  107 +-
 .../clientpositive/llap/subquery_null_agg.q.out |    2 +-
 .../clientpositive/llap/subquery_scalar.q.out   |   48 +-
 .../clientpositive/llap/subquery_select.q.out   |  103 +-
 .../clientpositive/llap/tez_self_join.q.out     |    2 +-
 .../llap/vector_between_columns.q.out           |  155 +-
 .../llap/vector_complex_all.q.out               |   92 +-
 .../llap/vector_groupby_mapjoin.q.out           |  113 +-
 .../llap/vector_include_no_sel.q.out            |   99 +-
 .../llap/vector_join_filters.q.out              |    2 +-
 .../clientpositive/llap/vector_join_nulls.q.out |    2 +-
 .../vectorized_dynamic_partition_pruning.q.out  |   97 +-
 .../llap/vectorized_multi_output_select.q.out   |   58 +-
 .../clientpositive/spark/subquery_multi.q.out   |   80 +-
 .../clientpositive/spark/subquery_notin.q.out   |  106 +-
 .../clientpositive/spark/subquery_select.q.out  |   84 +-
 .../tez/hybridgrace_hashjoin_1.q.out            |  164 +-
 67 files changed, 4670 insertions(+), 1576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
index b7dc88c..a73893f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
@@ -74,7 +74,7 @@ public final class Vertex implements Comparable<Vertex>{
   public VertexType vertexType;
 
   public static enum EdgeType {
-    BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, UNKNOWN
+    BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, XPROD_EDGE, UNKNOWN
   };
   public String edgeType;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
index 69e5358..b6cca10 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
@@ -36,6 +36,8 @@ public class TezJsonParser extends DagJsonParser {
         return "MULTICAST";
       case "ONE_TO_ONE_EDGE":
         return "FORWARD";
+      case "XPROD_EDGE":
+        return "XPROD_EDGE";
       default:
         return "UNKNOWN";
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 62dcbd5..875e781 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3028,6 +3028,8 @@ public class HiveConf extends Configuration {
         0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"),
     TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction",
         -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"),
+    TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED("hive.tez.cartesian-product.enabled",
+        false, "Use Tez cartesian product edge to speed up cross product"),
     // The default is different on the client and server, so it's null here.
     LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."),
     LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb",

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index 870b584..8cd5144 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -338,4 +338,9 @@
   <value>true</value>
 </property>
 
+<property>
+  <name>hive.tez.cartesian-product.enabled</name>
+  <value>true</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/data/conf/tez/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml
index 35e8c99..f1dabf5 100644
--- a/data/conf/tez/hive-site.xml
+++ b/data/conf/tez/hive-site.xml
@@ -283,4 +283,9 @@
   <value>true</value>
 </property>
 
+<property>
+  <name>hive.tez.cartesian-product.enabled</name>
+  <value>true</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index a081638..c338826 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -139,6 +139,9 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   count_dist_rewrite.q,\
   create_merge_compressed.q,\
   cross_join.q,\
+  cross_prod_1.q,\
+  cross_prod_3.q,\
+  cross_prod_4.q,\
   cross_product_check_1.q,\
   cross_product_check_2.q,\
   ctas.q,\
@@ -508,6 +511,9 @@ minillaplocal.query.files=\
   correlationoptimizer4.q,\
   correlationoptimizer6.q,\
   disable_merge_for_bucketing.q,\
+  cross_prod_1.q,\
+  cross_prod_3.q,\
+  cross_prod_4.q,\
   dynamic_partition_pruning.q,\
   dynamic_semijoin_reduction.q,\
   dynamic_semijoin_reduction_2.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index aae3480..5c338b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -46,6 +46,9 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -135,6 +138,7 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -264,7 +268,7 @@ public class DagUtils {
    */
   @SuppressWarnings("rawtypes")
   public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
-      TezEdgeProperty edgeProp, VertexType vertexType)
+      TezEdgeProperty edgeProp, BaseWork work, TezWork tezWork)
     throws IOException {
 
     Class mergeInputClass;
@@ -279,7 +283,8 @@ public class DagUtils {
     case CUSTOM_EDGE: {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
+      CustomVertexConfiguration vertexConf
+              = new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work));
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       VertexManagerPluginDescriptor desc =
@@ -299,6 +304,10 @@ public class DagUtils {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       break;
 
+    case XPROD_EDGE:
+      mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+      break;
+
     case SIMPLE_EDGE:
       setupAutoReducerParallelism(edgeProp, w);
       // fall through
@@ -308,7 +317,7 @@ public class DagUtils {
       break;
     }
 
-    return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+    return GroupInputEdge.create(group, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork),
         InputDescriptor.create(mergeInputClass.getName()));
   }
 
@@ -322,13 +331,14 @@ public class DagUtils {
    * @return
    */
   public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
-      VertexType vertexType)
+      BaseWork work, TezWork tezWork)
     throws IOException {
 
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
+      CustomVertexConfiguration vertexConf =
+              new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work));
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
@@ -339,6 +349,9 @@ public class DagUtils {
       w.setVertexManagerPlugin(desc);
       break;
     }
+    case XPROD_EDGE:
+      break;
+
     case SIMPLE_EDGE: {
       setupAutoReducerParallelism(edgeProp, w);
       break;
@@ -352,14 +365,15 @@ public class DagUtils {
       // nothing
     }
 
-    return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
+    return Edge.create(v, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork));
   }
 
   /*
    * Helper function to create an edge property from an edge type.
    */
-  private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
-      throws IOException {
+  private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp,
+                                          Configuration conf, BaseWork work, TezWork tezWork)
+          throws IOException {
     MRHelpers.translateMRConfToTez(conf);
     String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
     String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
@@ -412,7 +426,23 @@ public class DagUtils {
           .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
           .build();
       return et4Conf.createDefaultOneToOneEdgeProperty();
+    case XPROD_EDGE:
+      EdgeManagerPluginDescriptor edgeManagerDescriptor =
+        EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+      List<String> crossProductSources = new ArrayList<>();
+      for (BaseWork parentWork : tezWork.getParents(work)) {
+        if (EdgeType.XPROD_EDGE == tezWork.getEdgeType(parentWork, work)) {
+          crossProductSources.add(parentWork.getName());
+        }
+      }
+      CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
+      edgeManagerDescriptor.setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf)));
+      UnorderedPartitionedKVEdgeConfig cpEdgeConf =
+        UnorderedPartitionedKVEdgeConfig.newBuilder(keyClass, valClass,
+          ValueHashPartitioner.class.getName()).build();
+      return cpEdgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
     case SIMPLE_EDGE:
+      // fallthrough
     default:
       assert partitionerClassName != null;
       partitionerConf = createPartitionerConf(partitionerClassName, conf);
@@ -427,6 +457,14 @@ public class DagUtils {
     }
   }
 
+  public static class ValueHashPartitioner implements Partitioner {
+
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      return (value.hashCode() & 2147483647) % numPartitions;
+    }
+  }
+
   /**
    * Utility method to create a stripped down configuration for the MR partitioner.
    *
@@ -1240,6 +1278,21 @@ public class DagUtils {
     } else if (work instanceof MergeJoinWork) {
       v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
               ctx, vertexType);
+      // set VertexManagerPlugin if whether it's a cross product destination vertex
+      List<String> crossProductSources = new ArrayList<>();
+      for (BaseWork parentWork : tezWork.getParents(work)) {
+        if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) {
+          crossProductSources.add(parentWork.getName());
+        }
+      }
+
+      if (!crossProductSources.isEmpty()) {
+        CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
+        v.setVertexManagerPlugin(
+          VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+            .setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf))));
+        // parallelism shouldn't be set for cartesian product vertex
+      }
     } else {
       // something is seriously wrong if this is happening
       throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index c3a2a2b..a1b7cfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -477,7 +477,7 @@ public class TezTask extends Task<TezWork> {
         for (BaseWork v: children) {
           // finally we can create the grouped edge
           GroupInputEdge e = utils.createEdge(group, parentConf,
-               workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v));
+               workToVertex.get(v), work.getEdgeProperty(w, v), v, work);
 
           dag.addEdge(e);
         }
@@ -506,8 +506,7 @@ public class TezTask extends Task<TezWork> {
           Edge e = null;
 
           TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
-
-          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v));
+          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work);
           dag.addEdge(e);
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 53d34bb..9175597 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -102,6 +102,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf);
     joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo);
 
+    // not use map join in case of cross product
+    boolean cartesianProductEdgeEnabled =
+      HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED);
+    if (cartesianProductEdgeEnabled && !hasOuterJoin(joinOp) && isCrossProduct(joinOp)) {
+      fallbackToMergeJoin(joinOp, context);
+      return null;
+    }
+
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &
             !context.parseContext.getDisableMapJoin();
@@ -614,6 +622,42 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return false;
   }
 
+  private boolean hasOuterJoin(JoinOperator joinOp) throws SemanticException {
+    boolean hasOuter = false;
+    for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
+      switch (joinCondDesc.getType()) {
+        case JoinDesc.INNER_JOIN:
+        case JoinDesc.LEFT_SEMI_JOIN:
+        case JoinDesc.UNIQUE_JOIN:
+          hasOuter = false;
+          break;
+
+        case JoinDesc.FULL_OUTER_JOIN:
+        case JoinDesc.LEFT_OUTER_JOIN:
+        case JoinDesc.RIGHT_OUTER_JOIN:
+          hasOuter = true;
+          break;
+
+        default:
+          throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+      }
+    }
+    return hasOuter;
+  }
+
+  private boolean isCrossProduct(JoinOperator joinOp) {
+    ExprNodeDesc[][] joinExprs = joinOp.getConf().getJoinKeys();
+    if (joinExprs != null) {
+      for (ExprNodeDesc[] expr : joinExprs) {
+        if (expr != null && expr.length != 0) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
   /**
    * Obtain big table position for join.
    *
@@ -639,26 +683,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
        * case this for now.
        */
       if (joinOp.getConf().getConds().length > 1) {
-        boolean hasOuter = false;
-        for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
-          switch (joinCondDesc.getType()) {
-          case JoinDesc.INNER_JOIN:
-          case JoinDesc.LEFT_SEMI_JOIN:
-          case JoinDesc.UNIQUE_JOIN:
-            hasOuter = false;
-            break;
-
-          case JoinDesc.FULL_OUTER_JOIN:
-          case JoinDesc.LEFT_OUTER_JOIN:
-          case JoinDesc.RIGHT_OUTER_JOIN:
-            hasOuter = true;
-            break;
-
-          default:
-            throw new SemanticException("Unknown join type " + joinCondDesc.getType());
-          }
-        }
-        if (hasOuter) {
+        if (hasOuterJoin(joinOp)) {
           return -1;
         }
       }
@@ -1100,14 +1125,19 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
     }
 
+    // we are just converting to a common merge join operator. The shuffle
+    // join in map-reduce case.
+    fallbackToMergeJoin(joinOp, context);
+  }
+
+  private void fallbackToMergeJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+      throws SemanticException {
     int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false),
                   true, Long.MAX_VALUE, false);
     if (pos < 0) {
       LOG.info("Could not get a valid join position. Defaulting to position 0");
       pos = 0;
     }
-    // we are just converting to a common merge join operator. The shuffle
-    // join in map-reduce case.
     LOG.info("Fallback to common merge join operator");
     convertJoinSMBJoin(joinOp, context, pos, 0, false);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
deleted file mode 100644
index 4b35bb6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer.physical;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
-import org.apache.hadoop.hive.ql.lib.Dispatcher;
-import org.apache.hadoop.hive.ql.lib.GraphWalker;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
-/*
- * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product.
- * If yes, output a warning to the Session's console.
- * The Checks made are the following:
- * 1. MR, Shuffle Join:
- * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
- * this is a cross product.
- * The parent ReduceSinkOp is in the MapWork for the same Stage.
- * 2. MR, MapJoin:
- * If the keys expr list on the mapJoin Desc is an empty list for any input,
- * this implies a cross product.
- * 3. Tez, Shuffle Join:
- * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
- * this is a cross product.
- * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the
- * reduceWork that contains the JoinOp.
- * 4. Tez, Map Join:
- * If the keys expr list on the mapJoin Desc is an empty list for any input,
- * this implies a cross product.
- */
-public class CrossProductCheck implements PhysicalPlanResolver, Dispatcher {
-
-  protected static transient final Logger LOG = LoggerFactory
-      .getLogger(CrossProductCheck.class);
-
-  @Override
-  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
-    TaskGraphWalker ogw = new TaskGraphWalker(this);
-
-    ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pctx.getRootTasks());
-
-    ogw.startWalking(topNodes, null);
-    return pctx;
-  }
-
-  @Override
-  public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
-      throws SemanticException {
-    @SuppressWarnings("unchecked")
-    Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
-    if (currTask instanceof MapRedTask) {
-      MapRedTask mrTsk = (MapRedTask)currTask;
-      MapredWork mrWrk = mrTsk.getWork();
-      checkMapJoins(mrTsk);
-      checkMRReducer(currTask.toString(), mrWrk);
-    } else if (currTask instanceof ConditionalTask ) {
-      List<Task<? extends Serializable>> taskListInConditionalTask =
-          ((ConditionalTask) currTask).getListTasks();
-      for(Task<? extends Serializable> tsk: taskListInConditionalTask){
-        dispatch(tsk, stack, nodeOutputs);
-      }
-
-    } else if (currTask instanceof TezTask) {
-      TezTask tzTask = (TezTask) currTask;
-      TezWork tzWrk = tzTask.getWork();
-      checkMapJoins(tzWrk);
-      checkTezReducer(tzWrk);
-    }
-    return null;
-  }
-
-  private void warn(String msg) {
-    SessionState.getConsole().printInfo("Warning: " + msg, false);
-  }
-
-  private void checkMapJoins(MapRedTask mrTsk) throws SemanticException {
-    MapredWork mrWrk = mrTsk.getWork();
-    MapWork mapWork = mrWrk.getMapWork();
-    List<String> warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork);
-    if (!warnings.isEmpty()) {
-      for (String w : warnings) {
-        warn(w);
-      }
-    }
-    ReduceWork redWork = mrWrk.getReduceWork();
-    if (redWork != null) {
-      warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork);
-      if (!warnings.isEmpty()) {
-        for (String w : warnings) {
-          warn(w);
-        }
-      }
-    }
-  }
-
-  private void checkMapJoins(TezWork tzWrk) throws SemanticException {
-    for(BaseWork wrk : tzWrk.getAllWork() ) {
-
-      if ( wrk instanceof MergeJoinWork ) {
-        wrk = ((MergeJoinWork)wrk).getMainWork();
-      }
-
-      List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
-      if ( !warnings.isEmpty() ) {
-        for(String w : warnings) {
-          warn(w);
-        }
-      }
-    }
-  }
-
-  private void checkTezReducer(TezWork tzWrk) throws SemanticException {
-    for(BaseWork wrk : tzWrk.getAllWork() ) {
-
-      if ( wrk instanceof MergeJoinWork ) {
-        wrk = ((MergeJoinWork)wrk).getMainWork();
-      }
-
-      if ( !(wrk instanceof ReduceWork ) ) {
-        continue;
-      }
-      ReduceWork rWork = (ReduceWork) wrk;
-      Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
-      if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
-        Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, ExtractReduceSinkInfo.Info>();
-        for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
-          rsInfo.putAll(getReducerInfo(tzWrk, rWork.getName(), e.getValue()));
-        }
-        checkForCrossProduct(rWork.getName(), reducer, rsInfo);
-      }
-    }
-  }
-
-  private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException {
-    ReduceWork rWrk = mrWrk.getReduceWork();
-    if ( rWrk == null) {
-      return;
-    }
-    Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
-    if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
-      BaseWork prntWork = mrWrk.getMapWork();
-      checkForCrossProduct(taskName, reducer,
-          new ExtractReduceSinkInfo(null).analyze(prntWork));
-    }
-  }
-
-  private void checkForCrossProduct(String taskName,
-      Operator<? extends OperatorDesc> reducer,
-      Map<Integer, ExtractReduceSinkInfo.Info> rsInfo) {
-    if ( rsInfo.isEmpty() ) {
-      return;
-    }
-    Iterator<ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
-    ExtractReduceSinkInfo.Info info = it.next();
-    if (info.keyCols.size() == 0) {
-      List<String> iAliases = new ArrayList<String>();
-      iAliases.addAll(info.inputAliases);
-      while (it.hasNext()) {
-        info = it.next();
-        iAliases.addAll(info.inputAliases);
-      }
-      String warning = String.format(
-          "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product",
-          reducer.toString(),
-          iAliases,
-          taskName);
-      warn(warning);
-    }
-  }
-
-  private Map<Integer, ExtractReduceSinkInfo.Info> getReducerInfo(TezWork tzWrk, String vertex, String prntVertex)
-      throws SemanticException {
-    BaseWork prntWork = tzWrk.getWorkMap().get(prntVertex);
-    return new ExtractReduceSinkInfo(vertex).analyze(prntWork);
-  }
-
-  /*
-   * Given a Work descriptor and the TaskName for the work
-   * this is responsible to check each MapJoinOp for cross products.
-   * The analyze call returns the warnings list.
-   * <p>
-   * For MR the taskname is the StageName, for Tez it is the vertex name.
-   */
-  public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx {
-
-    final List<String> warnings;
-    final String taskName;
-
-    MapJoinCheck(String taskName) {
-      this.taskName = taskName;
-      warnings = new ArrayList<String>();
-    }
-
-    List<String> analyze(BaseWork work) throws SemanticException {
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName()
-          + "%"), this);
-      Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
-      GraphWalker ogw = new DefaultGraphWalker(disp);
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.addAll(work.getAllRootOperators());
-      ogw.startWalking(topNodes, null);
-      return warnings;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      @SuppressWarnings("unchecked")
-      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
-      MapJoinDesc mjDesc = mjOp.getConf();
-
-      String bigTablAlias = mjDesc.getBigTableAlias();
-      if ( bigTablAlias == null ) {
-        Operator<? extends OperatorDesc> parent = null;
-        for(Operator<? extends OperatorDesc> op : mjOp.getParentOperators() ) {
-          if ( op instanceof TableScanOperator ) {
-            parent = op;
-          }
-        }
-        if ( parent != null) {
-          TableScanDesc tDesc = ((TableScanOperator)parent).getConf();
-          bigTablAlias = tDesc.getAlias();
-        }
-      }
-      bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias;
-
-      List<ExprNodeDesc> joinExprs = mjDesc.getKeys().values().iterator().next();
-
-      if ( joinExprs.size() == 0 ) {
-        warnings.add(
-            String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product",
-                mjOp.toString(), bigTablAlias, taskName));
-      }
-
-      return null;
-    }
-  }
-
-  /*
-   * for a given Work Descriptor, it extracts information about the ReduceSinkOps
-   * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output
-   * vertex.
-   */
-  public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx {
-
-    static class Info {
-      List<ExprNodeDesc> keyCols;
-      List<String> inputAliases;
-
-      Info(List<ExprNodeDesc> keyCols, List<String> inputAliases) {
-        this.keyCols = keyCols;
-        this.inputAliases = inputAliases == null ? new ArrayList<String>() : inputAliases;
-      }
-
-      Info(List<ExprNodeDesc> keyCols, String[] inputAliases) {
-        this.keyCols = keyCols;
-        this.inputAliases = inputAliases == null ? new ArrayList<String>() : Arrays.asList(inputAliases);
-      }
-    }
-
-    final String outputTaskName;
-    final Map<Integer, Info> reduceSinkInfo;
-
-    ExtractReduceSinkInfo(String parentTaskName) {
-      this.outputTaskName = parentTaskName;
-      reduceSinkInfo = new HashMap<Integer, Info>();
-    }
-
-    Map<Integer, Info> analyze(BaseWork work) throws SemanticException {
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName()
-          + "%"), this);
-      Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
-      GraphWalker ogw = new DefaultGraphWalker(disp);
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.addAll(work.getAllRootOperators());
-      ogw.startWalking(topNodes, null);
-      return reduceSinkInfo;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      ReduceSinkOperator rsOp = (ReduceSinkOperator) nd;
-      ReduceSinkDesc rsDesc = rsOp.getConf();
-
-      if ( outputTaskName != null ) {
-        String rOutputName = rsDesc.getOutputName();
-        if ( rOutputName == null || !outputTaskName.equals(rOutputName)) {
-          return null;
-        }
-      }
-
-      reduceSinkInfo.put(rsDesc.getTag(),
-          new Info(rsDesc.getKeyCols(), rsOp.getInputAliases()));
-
-      return null;
-    }
-  }
-
-  static class NoopProcessor implements NodeProcessor {
-    @Override
-    public final Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-     return nd;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java
new file mode 100644
index 0000000..1442378
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/*
+ * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product.
+ * If yes, output a warning to the Session's console.
+ * The Checks made are the following:
+ * 1. MR, Shuffle Join:
+ * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
+ * this is a cross product.
+ * The parent ReduceSinkOp is in the MapWork for the same Stage.
+ * 2. MR, MapJoin:
+ * If the keys expr list on the mapJoin Desc is an empty list for any input,
+ * this implies a cross product.
+ * 3. Tez, Shuffle Join:
+ * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
+ * this is a cross product.
+ * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the
+ * reduceWork that contains the JoinOp.
+ * 4. Tez, Map Join:
+ * If the keys expr list on the mapJoin Desc is an empty list for any input,
+ * this implies a cross product.
+ */
+public class CrossProductHandler implements PhysicalPlanResolver, Dispatcher {
+
+  protected static transient final Logger LOG = LoggerFactory
+      .getLogger(CrossProductHandler.class);
+  private Boolean cartesianProductEdgeEnabled = null;
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+    cartesianProductEdgeEnabled =
+      HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED);
+    TaskGraphWalker ogw = new TaskGraphWalker(this);
+
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getRootTasks());
+
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  @Override
+  public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+      throws SemanticException {
+    @SuppressWarnings("unchecked")
+    Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+    if (currTask instanceof MapRedTask) {
+      MapRedTask mrTsk = (MapRedTask)currTask;
+      MapredWork mrWrk = mrTsk.getWork();
+      checkMapJoins(mrTsk);
+      checkMRReducer(currTask.toString(), mrWrk);
+    } else if (currTask instanceof ConditionalTask ) {
+      List<Task<? extends Serializable>> taskListInConditionalTask =
+          ((ConditionalTask) currTask).getListTasks();
+      for(Task<? extends Serializable> tsk: taskListInConditionalTask){
+        dispatch(tsk, stack, nodeOutputs);
+      }
+
+    } else if (currTask instanceof TezTask) {
+      TezTask tezTask = (TezTask) currTask;
+      TezWork tezWork = tezTask.getWork();
+      checkMapJoins(tezWork);
+      checkTezReducer(tezWork);
+    }
+    return null;
+  }
+
+  private void warn(String msg) {
+    SessionState.getConsole().printInfo("Warning: " + msg, false);
+  }
+
+  private void checkMapJoins(MapRedTask mrTsk) throws SemanticException {
+    MapredWork mrWrk = mrTsk.getWork();
+    MapWork mapWork = mrWrk.getMapWork();
+    List<String> warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork);
+    if (!warnings.isEmpty()) {
+      for (String w : warnings) {
+        warn(w);
+      }
+    }
+    ReduceWork redWork = mrWrk.getReduceWork();
+    if (redWork != null) {
+      warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork);
+      if (!warnings.isEmpty()) {
+        for (String w : warnings) {
+          warn(w);
+        }
+      }
+    }
+  }
+
+  private void checkMapJoins(TezWork tezWork) throws SemanticException {
+    for(BaseWork wrk : tezWork.getAllWork() ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
+      List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
+      if ( !warnings.isEmpty() ) {
+        for(String w : warnings) {
+          warn(w);
+        }
+      }
+    }
+  }
+
+  private void checkTezReducer(TezWork tezWork) throws SemanticException {
+    for(BaseWork wrk : tezWork.getAllWork() ) {
+      BaseWork origWrk = null;
+
+      if ( wrk instanceof MergeJoinWork ) {
+        origWrk = wrk;
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
+      if ( !(wrk instanceof ReduceWork ) ) {
+        continue;
+      }
+      ReduceWork rWork = (ReduceWork) wrk;
+      Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
+      if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
+        boolean noOuterJoin = ((JoinDesc)reducer.getConf()).isNoOuterJoin();
+        Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, ExtractReduceSinkInfo.Info>();
+        for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
+          rsInfo.putAll(getReducerInfo(tezWork, rWork.getName(), e.getValue()));
+        }
+        if (checkForCrossProduct(rWork.getName(), reducer, rsInfo)
+                && cartesianProductEdgeEnabled && noOuterJoin) {
+          List<BaseWork> parents = tezWork.getParents(null == origWrk ? wrk : origWrk);
+          for (BaseWork p: parents) {
+            TezEdgeProperty prop = tezWork.getEdgeProperty(p, null == origWrk ? wrk : origWrk);
+            LOG.info("Edge Type: "+prop.getEdgeType());
+            if (prop.getEdgeType().equals(EdgeType.CUSTOM_SIMPLE_EDGE)
+                || prop.getEdgeType().equals(EdgeType.CUSTOM_EDGE)) {
+              prop.setEdgeType(EdgeType.XPROD_EDGE);
+              rWork.setNumReduceTasks(-1);
+              rWork.setMaxReduceTasks(-1);
+              rWork.setMinReduceTasks(-1);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException {
+    ReduceWork rWrk = mrWrk.getReduceWork();
+    if ( rWrk == null) {
+      return;
+    }
+    Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
+    if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
+      BaseWork parentWork = mrWrk.getMapWork();
+      checkForCrossProduct(taskName, reducer,
+          new ExtractReduceSinkInfo(null).analyze(parentWork));
+    }
+  }
+
+  private boolean checkForCrossProduct(String taskName,
+      Operator<? extends OperatorDesc> reducer,
+      Map<Integer, ExtractReduceSinkInfo.Info> rsInfo) {
+    if ( rsInfo.isEmpty() ) {
+      return false;
+    }
+    Iterator<ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
+    ExtractReduceSinkInfo.Info info = it.next();
+    if (info.keyCols.size() == 0) {
+      List<String> iAliases = new ArrayList<String>();
+      iAliases.addAll(info.inputAliases);
+      while (it.hasNext()) {
+        info = it.next();
+        iAliases.addAll(info.inputAliases);
+      }
+      String warning = String.format(
+          "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product",
+          reducer.toString(),
+          iAliases,
+          taskName);
+      warn(warning);
+      return true;
+    }
+    return false;
+  }
+
+  private Map<Integer, ExtractReduceSinkInfo.Info> getReducerInfo(TezWork tezWork, String vertex, String prntVertex)
+    throws SemanticException {
+    BaseWork parentWork = tezWork.getWorkMap().get(prntVertex);
+    return new ExtractReduceSinkInfo(vertex).analyze(parentWork);
+  }
+
+  /*
+   * Given a Work descriptor and the TaskName for the work
+   * this is responsible to check each MapJoinOp for cross products.
+   * The analyze call returns the warnings list.
+   * <p>
+   * For MR the taskname is the StageName, for Tez it is the vertex name.
+   */
+  public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx {
+
+    final List<String> warnings;
+    final String taskName;
+
+    MapJoinCheck(String taskName) {
+      this.taskName = taskName;
+      warnings = new ArrayList<String>();
+    }
+
+    List<String> analyze(BaseWork work) throws SemanticException {
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName()
+          + "%"), this);
+      Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(work.getAllRootOperators());
+      ogw.startWalking(topNodes, null);
+      return warnings;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      @SuppressWarnings("unchecked")
+      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
+      MapJoinDesc mjDesc = mjOp.getConf();
+
+      String bigTablAlias = mjDesc.getBigTableAlias();
+      if ( bigTablAlias == null ) {
+        Operator<? extends OperatorDesc> parent = null;
+        for(Operator<? extends OperatorDesc> op : mjOp.getParentOperators() ) {
+          if ( op instanceof TableScanOperator ) {
+            parent = op;
+          }
+        }
+        if ( parent != null) {
+          TableScanDesc tDesc = ((TableScanOperator)parent).getConf();
+          bigTablAlias = tDesc.getAlias();
+        }
+      }
+      bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias;
+
+      List<ExprNodeDesc> joinExprs = mjDesc.getKeys().values().iterator().next();
+
+      if ( joinExprs.size() == 0 ) {
+        warnings.add(
+            String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product",
+                mjOp.toString(), bigTablAlias, taskName));
+      }
+
+      return null;
+    }
+  }
+
+  /*
+   * for a given Work Descriptor, it extracts information about the ReduceSinkOps
+   * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output
+   * vertex.
+   */
+  public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx {
+
+    static class Info {
+      List<ExprNodeDesc> keyCols;
+      List<String> inputAliases;
+
+      Info(List<ExprNodeDesc> keyCols, List<String> inputAliases) {
+        this.keyCols = keyCols;
+        this.inputAliases = inputAliases == null ? new ArrayList<String>() : inputAliases;
+      }
+
+      Info(List<ExprNodeDesc> keyCols, String[] inputAliases) {
+        this.keyCols = keyCols;
+        this.inputAliases = inputAliases == null ? new ArrayList<String>() : Arrays.asList(inputAliases);
+      }
+    }
+
+    final String outputTaskName;
+    final Map<Integer, Info> reduceSinkInfo;
+
+    ExtractReduceSinkInfo(String parentTaskName) {
+      this.outputTaskName = parentTaskName;
+      reduceSinkInfo = new HashMap<Integer, Info>();
+    }
+
+    Map<Integer, Info> analyze(BaseWork work) throws SemanticException {
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName()
+          + "%"), this);
+      Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(work.getAllRootOperators());
+      ogw.startWalking(topNodes, null);
+      return reduceSinkInfo;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) nd;
+      ReduceSinkDesc rsDesc = rsOp.getConf();
+
+      if ( outputTaskName != null ) {
+        String rOutputName = rsDesc.getOutputName();
+        if ( rOutputName == null || !outputTaskName.equals(rOutputName)) {
+          return null;
+        }
+      }
+
+      reduceSinkInfo.put(rsDesc.getTag(),
+          new Info(rsDesc.getKeyCols(), rsOp.getInputAliases()));
+
+      return null;
+    }
+  }
+
+  static class NoopProcessor implements NodeProcessor {
+    @Override
+    public final Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+     return nd;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
index 9377563..c040406 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
@@ -82,7 +82,7 @@ public class PhysicalOptimizer {
     }
 
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) {
-      resolvers.add(new CrossProductCheck());
+      resolvers.add(new CrossProductHandler());
     }
 
     // Vectorization should be the last optimization, because it doesn't modify the plan

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
index 7f3b1b3..9f14c66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
@@ -92,9 +92,9 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher
     for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) {
       Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
       if (reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator) {
-        Map<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info>();
+        Map<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info>();
         for (BaseWork parent : sparkWork.getParents(reduceWork)) {
-          rsInfo.putAll(new CrossProductCheck.ExtractReduceSinkInfo(null).analyze(parent));
+          rsInfo.putAll(new CrossProductHandler.ExtractReduceSinkInfo(null).analyze(parent));
         }
         checkForCrossProduct(reduceWork.getName(), reducer, rsInfo);
       }
@@ -105,7 +105,7 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher
     SparkWork sparkWork = sparkTask.getWork();
     for (BaseWork baseWork : sparkWork.getAllWork()) {
       List<String> warnings =
-          new CrossProductCheck.MapJoinCheck(sparkTask.toString()).analyze(baseWork);
+          new CrossProductHandler.MapJoinCheck(sparkTask.toString()).analyze(baseWork);
       for (String w : warnings) {
         warn(w);
       }
@@ -114,12 +114,12 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher
 
   private void checkForCrossProduct(String workName,
       Operator<? extends OperatorDesc> reducer,
-      Map<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info> rsInfo) {
+      Map<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info> rsInfo) {
     if (rsInfo.isEmpty()) {
       return;
     }
-    Iterator<CrossProductCheck.ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
-    CrossProductCheck.ExtractReduceSinkInfo.Info info = it.next();
+    Iterator<CrossProductHandler.ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
+    CrossProductHandler.ExtractReduceSinkInfo.Info info = it.next();
     if (info.keyCols.size() == 0) {
       List<String> iAliases = new ArrayList<String>();
       iAliases.addAll(info.inputAliases);

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 15836ec..da30c3b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -80,7 +80,7 @@ import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication;
 import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
-import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
+import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductHandler;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapPreVectorizationPass;
@@ -658,7 +658,7 @@ public class TezCompiler extends TaskCompiler {
     }
 
     if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) {
-      physicalCtx = new CrossProductCheck().resolve(physicalCtx);
+      physicalCtx = new CrossProductHandler().resolve(physicalCtx);
     } else {
       LOG.debug("Skipping cross product analysis");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
index bbed9be..d43b81a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
@@ -28,7 +28,8 @@ public class TezEdgeProperty {
     CONTAINS,//used for union (all?)
     CUSTOM_EDGE,//CO_PARTITION_EDGE
     CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE
-    ONE_TO_ONE_EDGE
+    ONE_TO_ONE_EDGE,
+    XPROD_EDGE
   }
 
   private HiveConf hiveConf;
@@ -107,4 +108,5 @@ public class TezEdgeProperty {
   public void setEdgeType(EdgeType type) {
     this.edgeType = type;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 2dc334d..47aa936 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -109,8 +109,8 @@ public class TestTezTask {
         });
 
     when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class),
-            any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer<Edge>() {
-
+            any(TezEdgeProperty.class), any(BaseWork.class), any(TezWork.class)))
+            .thenAnswer(new Answer<Edge>() {
           @Override
           public Edge answer(InvocationOnMock invocation) throws Throwable {
             Object[] args = invocation.getArguments();

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cross_prod_1.q b/ql/src/test/queries/clientpositive/cross_prod_1.q
new file mode 100644
index 0000000..b5a84ea
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/cross_prod_1.q
@@ -0,0 +1,34 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.tez.cartesian-product.enabled=true;
+
+create table X as
+select distinct * from src order by key limit 10;
+
+explain select * from X as A, X as B order by A.key, B.key;
+select * from X as A, X as B order by A.key, B.key;
+
+explain select * from X as A join X as B on A.key<B.key;
+select * from X as A join X as B on A.key<B.key order by A.key, B.key;
+
+explain select * from X as A join X as B on A.key between "103" and "105";
+select * from X as A join X as B on A.key between "103" and "105" order by A.key, B.key;
+
+explain select * from X as A, X as B, X as C;
+select * from X as A, X as B, X as C order by A.key, B.key, C.key;
+
+explain select * from X as A join X as B on A.key in ("103", "104", "105");
+select * from X as A join X as B on A.key in ("103", "104", "105") order by A.key, B.key;
+
+explain select A.key, count(*)  from X as A, X as B group by A.key;
+select A.key, count(*)  from X as A, X as B group by A.key order by A.key;
+
+explain select * from X as A left outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105");
+explain select * from X as A right outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105");
+explain select * from X as A full outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105");
+
+explain select * from (select X.key, count(*) from X group by X.key) as A, (select X.key, count(*) from X group by X.key) as B;
+select * from (select X.key, count(*) from X group by X.key) as A, (select X.key, count(*) from X group by X.key) as B order by A.key, B.key;
+
+explain select * from (select * from X union all select * from X as y) a join X;
+select * from (select * from X union all select * from X as y) a join X order by a.key, X.key;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cross_prod_3.q b/ql/src/test/queries/clientpositive/cross_prod_3.q
new file mode 100644
index 0000000..a233f17
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/cross_prod_3.q
@@ -0,0 +1,13 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.tez.cartesian-product.enabled=true;
+set hive.auto.convert.join=true;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+create table X (key string, value string) clustered by (key) into 2 buckets;
+insert overwrite table X select distinct * from src order by key limit 10;
+
+create table Y as
+select * from src order by key limit 1;
+
+explain select * from Y, (select * from X as A join X as B on A.key=B.key) as C where Y.key=C.key;

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cross_prod_4.q b/ql/src/test/queries/clientpositive/cross_prod_4.q
new file mode 100644
index 0000000..ea58e98
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/cross_prod_4.q
@@ -0,0 +1,10 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.auto.convert.join=true;
+set hive.tez.cartesian-product.enabled=true;
+
+create table X as
+select distinct * from src order by key limit 10;
+
+explain select * from X as A, X as B;
+select * from X as A, X as B order by A.key, B.key;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
index b1e289f..b8b04ee 100644
--- a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
+++ b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
@@ -85,7 +85,7 @@ SELECT agg.amount
 FROM agg_01 agg,
 dim_shops d1
 WHERE agg.dim_shops_id = d1.id
-and agg.dim_shops_id = 1;
+and agg.dim_shops_id = 1 order by agg.amount;
 
 set hive.tez.dynamic.partition.pruning.max.event.size=1;
 set hive.tez.dynamic.partition.pruning.max.data.size=1000000;

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
index 9c19a86..e404dd0 100644
--- a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
+++ b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
@@ -1,5 +1,6 @@
 set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
+set tez.cartesian-product.max-parallelism=1;
 -- Hybrid Grace Hash Join
 -- Test basic functionalities:
 -- 1. Various cases when hash partitions spill

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_multi.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/subquery_multi.q b/ql/src/test/queries/clientpositive/subquery_multi.q
index c546d24..6ef198d 100644
--- a/ql/src/test/queries/clientpositive/subquery_multi.q
+++ b/ql/src/test/queries/clientpositive/subquery_multi.q
@@ -36,7 +36,7 @@ select * from part_null where p_name IN (select p_name from part_null) AND p_bra
 
 -- NOT IN is always true and IN is false for where p_name is NULL, hence should return all but one row
 explain select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null);
-select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null);
+select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null) order by part_null.p_partkey;
 
 -- NOT IN has one NULL value so this whole query should not return any row
 explain select * from part_null where p_brand IN (select p_brand from part_null) AND p_brand NOT IN (select p_name from part_null);
@@ -49,7 +49,7 @@ select * from part_null where p_name NOT IN (select c from tempty) AND p_brand I
 
 -- IN, EXISTS
 explain select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull);
-select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull);
+select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull) order by part_null.p_partkey;
 
 explain select * from part_null where p_size IN (select p_size from part_null) AND EXISTS (select c from tempty);
 select * from part_null where p_size IN (select p_size from part_null) AND EXISTS (select c from tempty);

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_notin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/subquery_notin.q b/ql/src/test/queries/clientpositive/subquery_notin.q
index e23eb2b..c509654 100644
--- a/ql/src/test/queries/clientpositive/subquery_notin.q
+++ b/ql/src/test/queries/clientpositive/subquery_notin.q
@@ -109,8 +109,8 @@ explain select * from part where p_brand <> 'Brand#14' AND p_size NOT IN (select
 select * from part where p_brand <> 'Brand#14' AND p_size NOT IN (select (p_size*p_size) from part p where p.p_type = part.p_type ) AND p_size <> 340;
 
 --lhs contains non-simple expression
-explain select * from part  where (p_size-1) NOT IN (select min(p_size) from part group by p_type);
-select * from part  where (p_size-1) NOT IN (select min(p_size) from part group by p_type);
+explain select * from part  where (p_size-1) NOT IN (select min(p_size) from part group by p_type) order by p_partkey;
+select * from part  where (p_size-1) NOT IN (select min(p_size) from part group by p_type) order by p_partkey;
 
 explain select * from part where (p_partkey*p_size) NOT IN (select min(p_partkey) from part group by p_type);
 select * from part where (p_partkey*p_size) NOT IN (select min(p_partkey) from part group by p_type);

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_select.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/subquery_select.q b/ql/src/test/queries/clientpositive/subquery_select.q
index 15377a4..c1766ff 100644
--- a/ql/src/test/queries/clientpositive/subquery_select.q
+++ b/ql/src/test/queries/clientpositive/subquery_select.q
@@ -155,8 +155,8 @@ SELECT p_size, (SELECT count(p_size) FROM part p
     WHERE p.p_type = part.p_type) IS NULL from part;
 
 -- scalar, non-corr, non agg
-explain select p_type, (select p_size from part order by p_size limit 1) = 1 from part;
-select p_type, (select p_size from part order by p_size limit 1) = 1 from part;
+explain select p_type, (select p_size from part order by p_size limit 1) = 1 from part order by p_type;
+select p_type, (select p_size from part order by p_size limit 1) = 1 from part order by p_type;
 
 -- in corr, multiple
 EXPLAIN SELECT p_size, p_size IN (

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join0.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join0.q.out b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
index 7f0a878..29945ad 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join0.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
@@ -1,4 +1,4 @@
-Warning: Map Join MAPJOIN[22][bigTable=?] in task 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[22][tables = [src1, src2]] in Stage 'Reducer 3' is a cross product
 PREHOOK: query: explain 
 select sum(hash(a.k1,a.v1,a.k2, a.v2))
 from (
@@ -30,9 +30,10 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (BROADCAST_EDGE)
-        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
-        Reducer 4 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (XPROD_EDGE), Reducer 5 (XPROD_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -64,28 +65,33 @@ STAGE PLANS:
                 expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
-                Map Join Operator
-                  condition map:
-                       Inner Join 0 to 1
-                  keys:
-                    0 
-                    1 
-                  outputColumnNames: _col0, _col1, _col2, _col3
-                  input vertices:
-                    1 Reducer 4
-                  Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: sum(hash(_col0,_col1,_col2,_col3))
-                    mode: hash
-                    outputColumnNames: _col0
-                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      sort order: 
-                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                      value expressions: _col0 (type: bigint)
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
+                  value expressions: _col0 (type: string), _col1 (type: string)
         Reducer 3 
             Execution mode: llap
             Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 
+                  1 
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE
+                Group By Operator
+                  aggregations: sum(hash(_col0,_col1,_col2,_col3))
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                    value expressions: _col0 (type: bigint)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
                 mode: mergepartial
@@ -98,7 +104,7 @@ STAGE PLANS:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 4 
+        Reducer 5 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
@@ -116,7 +122,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[22][bigTable=?] in task 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[22][tables = [src1, src2]] in Stage 'Reducer 3' is a cross product
 PREHOOK: query: select sum(hash(a.k1,a.v1,a.k2, a.v2))
 from (
 SELECT src1.key as k1, src1.value as v1, 

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out b/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
index d1d9408..079f047 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
@@ -14,7 +14,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in3.txt' INTO TABLE my
 POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@myinput1
-Warning: Map Join MAPJOIN[18][bigTable=?] in task 'Map 1' is a cross product
+Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
 PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value))  FROM myinput1 a JOIN myinput1 b on a.key > 40 AND a.value > 50 AND a.key = a.value AND b.key > 40 AND b.value > 50 AND b.key = b.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@myinput1
@@ -300,7 +300,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in2.txt' into table sm
 POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@smb_input2
-Warning: Map Join MAPJOIN[18][bigTable=?] in task 'Map 1' is a cross product
+Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
 PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b on a.key > 40 AND a.value > 50 AND a.key = a.value AND b.key > 40 AND b.value > 50 AND b.key = b.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@myinput1

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out b/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
index 5984e8f..04da1f2 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
@@ -14,7 +14,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in1.txt' INTO TABLE my
 POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@myinput1
-Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product
+Warning: Shuffle Join MERGEJOIN[14][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
 PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b
 PREHOOK: type: QUERY
 PREHOOK: Input: default@myinput1

http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
index 6ef1f34..3acbb20 100644
--- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
@@ -134,7 +134,7 @@ POSTHOOK: query: load data local inpath '../../data/files/smallsrcsortbucket3out
 POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@bucket_medium@ds=2008-04-08
-Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Map 3' is a cross product
+Warning: Shuffle Join MERGEJOIN[34][tables = [$hdt$_1, $hdt$_2, $hdt$_0, $hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: explain extended select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key
 PREHOOK: type: QUERY
 POSTHOOK: query: explain extended select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key
@@ -148,8 +148,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Map 3 <- Map 1 (BROADCAST_EDGE), Map 2 (BROADCAST_EDGE), Map 5 (BROADCAST_EDGE)
-        Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE)
+        Map 3 <- Map 1 (BROADCAST_EDGE), Map 2 (BROADCAST_EDGE)
+        Reducer 4 <- Map 3 (XPROD_EDGE), Map 6 (XPROD_EDGE)
+        Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -336,29 +337,12 @@ STAGE PLANS:
                           1 Map 2
                         Position of Big Table: 2
                         Statistics: Num rows: 244 Data size: 43381 Basic stats: COMPLETE Column stats: NONE
-                        Map Join Operator
-                          condition map:
-                               Inner Join 0 to 1
-                          Estimated key counts: Map 5 => 1
-                          keys:
-                            0 
-                            1 
-                          input vertices:
-                            1 Map 5
-                          Position of Big Table: 0
-                          Statistics: Num rows: 244 Data size: 45577 Basic stats: COMPLETE Column stats: NONE
-                          Group By Operator
-                            aggregations: count()
-                            mode: hash
-                            outputColumnNames: _col0
-                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-                            Reduce Output Operator
-                              null sort order: 
-                              sort order: 
-                              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-                              tag: -1
-                              value expressions: _col0 (type: bigint)
-                              auto parallelism: false
+                        Reduce Output Operator
+                          null sort order: 
+                          sort order: 
+                          Statistics: Num rows: 244 Data size: 43381 Basic stats: COMPLETE Column stats: NONE
+                          tag: 0
+                          auto parallelism: false
             Execution mode: llap
             LLAP IO: no inputs
             Path -> Alias:
@@ -465,7 +449,7 @@ STAGE PLANS:
             Truncated Path -> Alias:
               /bucket_big/ds=2008-04-08 [c]
               /bucket_big/ds=2008-04-09 [c]
-        Map 5 
+        Map 6 
             Map Operator Tree:
                 TableScan
                   alias: d
@@ -539,6 +523,30 @@ STAGE PLANS:
             Execution mode: llap
             Needs Tagging: false
             Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 
+                  1 
+                Position of Big Table: 0
+                Statistics: Num rows: 244 Data size: 45577 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    null sort order: 
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    tag: -1
+                    value expressions: _col0 (type: bigint)
+                    auto parallelism: false
+        Reducer 5 
+            Execution mode: llap
+            Needs Tagging: false
+            Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
                 mode: mergepartial
@@ -573,7 +581,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Map 3' is a cross product
+Warning: Shuffle Join MERGEJOIN[34][tables = [$hdt$_1, $hdt$_2, $hdt$_0, $hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key
 PREHOOK: type: QUERY
 PREHOOK: Input: default@bucket_big