You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/07/20 12:56:03 UTC

[1/3] hive git commit: HIVE-17037: Use 1-to-1 Tez edge to avoid unnecessary input data shuffle (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master a0df0ace2 -> a96d9f71d


http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out b/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out
index 4b1e92d..b4386c8 100644
--- a/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out
@@ -413,7 +413,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -547,7 +547,7 @@ STAGE PLANS:
                   sort order: +
                   Map-reduce partition columns: _col0 (type: int)
                   Reduce Sink Vectorization:
-                      className: VectorReduceSinkLongOperator
+                      className: VectorReduceSinkObjectHashOperator
                       native: true
                       nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
                   Statistics: Num rows: 5 Data size: 465 Basic stats: COMPLETE Column stats: NONE
@@ -604,7 +604,7 @@ STAGE PLANS:
                   sort order: +
                   Map-reduce partition columns: _col0 (type: int)
                   Reduce Sink Vectorization:
-                      className: VectorReduceSinkLongOperator
+                      className: VectorReduceSinkObjectHashOperator
                       native: true
                       nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
                   Statistics: Num rows: 5 Data size: 465 Basic stats: COMPLETE Column stats: NONE

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query10.q.out b/ql/src/test/results/clientpositive/perf/query10.q.out
index 73c474a..7793d52 100644
--- a/ql/src/test/results/clientpositive/perf/query10.q.out
+++ b/ql/src/test/results/clientpositive/perf/query10.q.out
@@ -125,8 +125,8 @@ Reducer 16 <- Map 13 (SIMPLE_EDGE), Map 19 (SIMPLE_EDGE)
 Reducer 17 <- Reducer 16 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
 Reducer 3 <- Map 9 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
-Reducer 4 <- Reducer 12 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 17 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 4 <- Reducer 12 (ONE_TO_ONE_EDGE), Reducer 15 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 17 (ONE_TO_ONE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
 
@@ -157,8 +157,8 @@ Stage-0
                           predicate:(_col16 is not null or _col18 is not null)
                           Merge Join Operator [MERGEJOIN_112] (rows=766650239 width=88)
                             Conds:RS_62._col0=RS_63._col0(Left Outer),Output:["_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col16","_col18"]
-                          <-Reducer 17 [SIMPLE_EDGE]
-                            SHUFFLE [RS_63]
+                          <-Reducer 17 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_63]
                               PartitionCols:_col0
                               Select Operator [SEL_61] (rows=158394413 width=135)
                                 Output:["_col0","_col1"]
@@ -196,8 +196,8 @@ Stage-0
                                 Output:["_col0","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col16"]
                                 Merge Join Operator [MERGEJOIN_111] (rows=696954748 width=88)
                                   Conds:RS_43._col0=RS_44._col0(Left Outer),RS_43._col0=RS_45._col0(Inner),Output:["_col0","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col15"]
-                                <-Reducer 12 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_44]
+                                <-Reducer 12 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_44]
                                     PartitionCols:_col0
                                     Select Operator [SEL_22] (rows=79201469 width=135)
                                       Output:["_col0","_col1"]
@@ -223,8 +223,8 @@ Stage-0
                                                     predicate:(ws_bill_customer_sk is not null and ws_sold_date_sk is not null)
                                                     TableScan [TS_9] (rows=144002668 width=135)
                                                       default@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_date_sk","ws_bill_customer_sk"]
-                                <-Reducer 15 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_45]
+                                <-Reducer 15 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_45]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_35] (rows=316797606 width=88)
                                       Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query14.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query14.q.out b/ql/src/test/results/clientpositive/perf/query14.q.out
index b15587c..74c7660 100644
--- a/ql/src/test/results/clientpositive/perf/query14.q.out
+++ b/ql/src/test/results/clientpositive/perf/query14.q.out
@@ -1,6 +1,6 @@
+Warning: Shuffle Join MERGEJOIN[890][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 5' is a cross product
 Warning: Shuffle Join MERGEJOIN[891][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 12' is a cross product
 Warning: Shuffle Join MERGEJOIN[892][tables = [$hdt$_2, $hdt$_3, $hdt$_1]] in Stage 'Reducer 16' is a cross product
-Warning: Shuffle Join MERGEJOIN[890][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 5' is a cross product
 PREHOOK: query: explain
 with  cross_items as
  (select i_item_sk ss_item_sk
@@ -222,9 +222,9 @@ Reducer 21 <- Map 17 (SIMPLE_EDGE), Map 97 (SIMPLE_EDGE)
 Reducer 22 <- Map 96 (SIMPLE_EDGE), Reducer 21 (SIMPLE_EDGE)
 Reducer 23 <- Reducer 22 (SIMPLE_EDGE), Union 24 (CONTAINS)
 Reducer 25 <- Union 24 (SIMPLE_EDGE)
-Reducer 26 <- Map 96 (SIMPLE_EDGE), Reducer 25 (SIMPLE_EDGE)
+Reducer 26 <- Map 96 (SIMPLE_EDGE), Reducer 25 (ONE_TO_ONE_EDGE)
 Reducer 27 <- Reducer 26 (SIMPLE_EDGE)
-Reducer 28 <- Map 96 (SIMPLE_EDGE), Reducer 27 (SIMPLE_EDGE), Reducer 92 (SIMPLE_EDGE)
+Reducer 28 <- Map 96 (SIMPLE_EDGE), Reducer 27 (ONE_TO_ONE_EDGE), Reducer 92 (SIMPLE_EDGE)
 Reducer 29 <- Reducer 28 (SIMPLE_EDGE)
 Reducer 30 <- Map 17 (SIMPLE_EDGE), Map 98 (SIMPLE_EDGE)
 Reducer 31 <- Map 96 (SIMPLE_EDGE), Reducer 30 (SIMPLE_EDGE)
@@ -239,9 +239,9 @@ Reducer 4 <- Union 3 (CUSTOM_SIMPLE_EDGE)
 Reducer 40 <- Map 96 (SIMPLE_EDGE), Reducer 39 (SIMPLE_EDGE)
 Reducer 41 <- Reducer 40 (SIMPLE_EDGE), Union 42 (CONTAINS)
 Reducer 43 <- Union 42 (SIMPLE_EDGE)
-Reducer 44 <- Map 96 (SIMPLE_EDGE), Reducer 43 (SIMPLE_EDGE)
+Reducer 44 <- Map 96 (SIMPLE_EDGE), Reducer 43 (ONE_TO_ONE_EDGE)
 Reducer 45 <- Reducer 44 (SIMPLE_EDGE)
-Reducer 46 <- Map 96 (SIMPLE_EDGE), Reducer 45 (SIMPLE_EDGE), Reducer 94 (SIMPLE_EDGE)
+Reducer 46 <- Map 96 (SIMPLE_EDGE), Reducer 45 (ONE_TO_ONE_EDGE), Reducer 94 (SIMPLE_EDGE)
 Reducer 47 <- Reducer 46 (SIMPLE_EDGE)
 Reducer 48 <- Map 17 (SIMPLE_EDGE), Map 98 (SIMPLE_EDGE)
 Reducer 49 <- Map 96 (SIMPLE_EDGE), Reducer 48 (SIMPLE_EDGE)
@@ -256,9 +256,9 @@ Reducer 57 <- Map 17 (SIMPLE_EDGE), Map 97 (SIMPLE_EDGE)
 Reducer 58 <- Map 96 (SIMPLE_EDGE), Reducer 57 (SIMPLE_EDGE)
 Reducer 59 <- Reducer 58 (SIMPLE_EDGE), Union 60 (CONTAINS)
 Reducer 61 <- Union 60 (SIMPLE_EDGE)
-Reducer 62 <- Map 96 (SIMPLE_EDGE), Reducer 61 (SIMPLE_EDGE)
+Reducer 62 <- Map 96 (SIMPLE_EDGE), Reducer 61 (ONE_TO_ONE_EDGE)
 Reducer 63 <- Reducer 62 (SIMPLE_EDGE)
-Reducer 64 <- Map 96 (SIMPLE_EDGE), Reducer 63 (SIMPLE_EDGE), Reducer 95 (SIMPLE_EDGE)
+Reducer 64 <- Map 96 (SIMPLE_EDGE), Reducer 63 (ONE_TO_ONE_EDGE), Reducer 95 (SIMPLE_EDGE)
 Reducer 65 <- Reducer 64 (SIMPLE_EDGE)
 Reducer 66 <- Map 17 (SIMPLE_EDGE), Map 98 (SIMPLE_EDGE)
 Reducer 67 <- Map 96 (SIMPLE_EDGE), Reducer 66 (SIMPLE_EDGE)
@@ -504,8 +504,8 @@ Stage-0
                                                 predicate:i_item_sk is not null
                                                 TableScan [TS_87] (rows=462000 width=1436)
                                                   default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_brand_id","i_class_id","i_category_id"]
-                                        <-Reducer 45 [SIMPLE_EDGE]
-                                          SHUFFLE [RS_362]
+                                        <-Reducer 45 [ONE_TO_ONE_EDGE]
+                                          FORWARD [RS_362]
                                             PartitionCols:_col0
                                             Group By Operator [GBY_355] (rows=254100 width=1436)
                                               Output:["_col0"],keys:KEY._col0
@@ -524,8 +524,8 @@ Stage-0
                                                         Filter Operator [FIL_810] (rows=462000 width=1436)
                                                           predicate:(i_brand_id is not null and i_class_id is not null and i_category_id is not null and i_item_sk is not null)
                                                            Please refer to the previous TableScan [TS_87]
-                                                  <-Reducer 43 [SIMPLE_EDGE]
-                                                    SHUFFLE [RS_350]
+                                                  <-Reducer 43 [ONE_TO_ONE_EDGE]
+                                                    FORWARD [RS_350]
                                                       PartitionCols:_col0, _col1, _col2
                                                       Select Operator [SEL_348] (rows=1 width=108)
                                                         Output:["_col0","_col1","_col2"]
@@ -833,8 +833,8 @@ Stage-0
                                               Filter Operator [FIL_834] (rows=462000 width=1436)
                                                 predicate:i_item_sk is not null
                                                  Please refer to the previous TableScan [TS_87]
-                                        <-Reducer 63 [SIMPLE_EDGE]
-                                          SHUFFLE [RS_551]
+                                        <-Reducer 63 [ONE_TO_ONE_EDGE]
+                                          FORWARD [RS_551]
                                             PartitionCols:_col0
                                             Group By Operator [GBY_544] (rows=254100 width=1436)
                                               Output:["_col0"],keys:KEY._col0
@@ -853,8 +853,8 @@ Stage-0
                                                         Filter Operator [FIL_835] (rows=462000 width=1436)
                                                           predicate:(i_brand_id is not null and i_class_id is not null and i_category_id is not null and i_item_sk is not null)
                                                            Please refer to the previous TableScan [TS_87]
-                                                  <-Reducer 61 [SIMPLE_EDGE]
-                                                    SHUFFLE [RS_539]
+                                                  <-Reducer 61 [ONE_TO_ONE_EDGE]
+                                                    FORWARD [RS_539]
                                                       PartitionCols:_col0, _col1, _col2
                                                       Select Operator [SEL_537] (rows=1 width=108)
                                                         Output:["_col0","_col1","_col2"]
@@ -1077,8 +1077,8 @@ Stage-0
                                               Filter Operator [FIL_784] (rows=462000 width=1436)
                                                 predicate:i_item_sk is not null
                                                  Please refer to the previous TableScan [TS_87]
-                                        <-Reducer 27 [SIMPLE_EDGE]
-                                          SHUFFLE [RS_174]
+                                        <-Reducer 27 [ONE_TO_ONE_EDGE]
+                                          FORWARD [RS_174]
                                             PartitionCols:_col0
                                             Group By Operator [GBY_167] (rows=254100 width=1436)
                                               Output:["_col0"],keys:KEY._col0
@@ -1097,8 +1097,8 @@ Stage-0
                                                         Filter Operator [FIL_785] (rows=462000 width=1436)
                                                           predicate:(i_brand_id is not null and i_class_id is not null and i_category_id is not null and i_item_sk is not null)
                                                            Please refer to the previous TableScan [TS_87]
-                                                  <-Reducer 25 [SIMPLE_EDGE]
-                                                    SHUFFLE [RS_162]
+                                                  <-Reducer 25 [ONE_TO_ONE_EDGE]
+                                                    FORWARD [RS_162]
                                                       PartitionCols:_col0, _col1, _col2
                                                       Select Operator [SEL_160] (rows=1 width=108)
                                                         Output:["_col0","_col1","_col2"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query16.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query16.q.out b/ql/src/test/results/clientpositive/perf/query16.q.out
index 662bc97..5ae7680 100644
--- a/ql/src/test/results/clientpositive/perf/query16.q.out
+++ b/ql/src/test/results/clientpositive/perf/query16.q.out
@@ -66,7 +66,7 @@ Reducer 15 <- Map 14 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 3 <- Map 10 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Map 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 13 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 13 (ONE_TO_ONE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (CUSTOM_SIMPLE_EDGE)
 Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
@@ -106,8 +106,8 @@ Stage-0
                                   Output:["_col4","_col5","_col6","_col16"]
                                   Merge Join Operator [MERGEJOIN_82] (rows=843291907 width=135)
                                     Conds:RS_35._col4=RS_36._col0(Left Outer),RS_35._col4=RS_37._col1(Inner),Output:["_col3","_col4","_col5","_col6","_col14","_col15"],residual filter predicates:{(_col3 <> _col15)}
-                                  <-Reducer 13 [SIMPLE_EDGE]
-                                    SHUFFLE [RS_36]
+                                  <-Reducer 13 [ONE_TO_ONE_EDGE]
+                                    FORWARD [RS_36]
                                       PartitionCols:_col0
                                       Select Operator [SEL_18] (rows=14399440 width=106)
                                         Output:["_col0","_col1"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query2.q.out b/ql/src/test/results/clientpositive/perf/query2.q.out
index 1686f57..0ba703a 100644
--- a/ql/src/test/results/clientpositive/perf/query2.q.out
+++ b/ql/src/test/results/clientpositive/perf/query2.q.out
@@ -125,10 +125,10 @@ Map 16 <- Union 15 (CONTAINS)
 Map 8 <- Union 2 (CONTAINS)
 Reducer 10 <- Map 9 (SIMPLE_EDGE), Union 15 (SIMPLE_EDGE)
 Reducer 11 <- Reducer 10 (SIMPLE_EDGE)
-Reducer 12 <- Map 13 (SIMPLE_EDGE), Reducer 11 (SIMPLE_EDGE)
+Reducer 12 <- Map 13 (SIMPLE_EDGE), Reducer 11 (ONE_TO_ONE_EDGE)
 Reducer 3 <- Map 9 (SIMPLE_EDGE), Union 2 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Map 13 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 5 <- Map 13 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
 Reducer 6 <- Reducer 12 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
 
@@ -160,8 +160,8 @@ Stage-0
                           predicate:((d_year = 2002) and d_week_seq is not null)
                           TableScan [TS_20] (rows=73049 width=1119)
                             default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_week_seq","d_year"]
-                  <-Reducer 11 [SIMPLE_EDGE]
-                    SHUFFLE [RS_50]
+                  <-Reducer 11 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_50]
                       PartitionCols:_col0
                       Group By Operator [GBY_45] (rows=237595882 width=135)
                         Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)","sum(VALUE._col2)","sum(VALUE._col3)","sum(VALUE._col4)","sum(VALUE._col5)","sum(VALUE._col6)"],keys:KEY._col0
@@ -215,8 +215,8 @@ Stage-0
                         Filter Operator [FIL_86] (rows=36524 width=1119)
                           predicate:((d_year = 2001) and d_week_seq is not null)
                            Please refer to the previous TableScan [TS_20]
-                  <-Reducer 4 [SIMPLE_EDGE]
-                    SHUFFLE [RS_23]
+                  <-Reducer 4 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_23]
                       PartitionCols:_col0
                       Group By Operator [GBY_18] (rows=237595882 width=135)
                         Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)","sum(VALUE._col2)","sum(VALUE._col3)","sum(VALUE._col4)","sum(VALUE._col5)","sum(VALUE._col6)"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query23.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query23.q.out b/ql/src/test/results/clientpositive/perf/query23.q.out
index 0e34b90..ebd2271 100644
--- a/ql/src/test/results/clientpositive/perf/query23.q.out
+++ b/ql/src/test/results/clientpositive/perf/query23.q.out
@@ -1,5 +1,5 @@
-Warning: Shuffle Join MERGEJOIN[369][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 30' is a cross product
 Warning: Shuffle Join MERGEJOIN[367][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 25' is a cross product
+Warning: Shuffle Join MERGEJOIN[369][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 30' is a cross product
 PREHOOK: query: explain
 with frequent_ss_items as 
  (select substr(i_item_desc,1,30) itemdesc,i_item_sk item_sk,d_date solddate,count(*) cnt

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query31.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query31.q.out b/ql/src/test/results/clientpositive/perf/query31.q.out
index 79b9e52..644d3e1 100644
--- a/ql/src/test/results/clientpositive/perf/query31.q.out
+++ b/ql/src/test/results/clientpositive/perf/query31.q.out
@@ -108,7 +108,7 @@ Reducer 11 <- Reducer 10 (SIMPLE_EDGE)
 Reducer 13 <- Map 12 (SIMPLE_EDGE), Map 24 (SIMPLE_EDGE)
 Reducer 14 <- Map 23 (SIMPLE_EDGE), Reducer 13 (SIMPLE_EDGE)
 Reducer 15 <- Reducer 14 (SIMPLE_EDGE)
-Reducer 16 <- Reducer 15 (SIMPLE_EDGE), Reducer 19 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
+Reducer 16 <- Reducer 15 (ONE_TO_ONE_EDGE), Reducer 19 (ONE_TO_ONE_EDGE), Reducer 22 (ONE_TO_ONE_EDGE)
 Reducer 17 <- Map 12 (SIMPLE_EDGE), Map 24 (SIMPLE_EDGE)
 Reducer 18 <- Map 23 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE)
 Reducer 19 <- Reducer 18 (SIMPLE_EDGE)
@@ -118,7 +118,7 @@ Reducer 21 <- Map 23 (SIMPLE_EDGE), Reducer 20 (SIMPLE_EDGE)
 Reducer 22 <- Reducer 21 (SIMPLE_EDGE)
 Reducer 3 <- Map 23 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 11 (SIMPLE_EDGE), Reducer 16 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 11 (ONE_TO_ONE_EDGE), Reducer 16 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
 Reducer 6 <- Map 1 (SIMPLE_EDGE), Map 12 (SIMPLE_EDGE)
 Reducer 7 <- Map 23 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
 Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
@@ -136,8 +136,8 @@ Stage-0
             predicate:(CASE WHEN ((_col1 > 0)) THEN (CASE WHEN ((_col9 > 0)) THEN (((_col11 / _col9) > (_col5 / _col1))) ELSE ((null > (_col5 / _col1))) END) ELSE (CASE WHEN ((_col9 > 0)) THEN (((_col11 / _col9) > null)) ELSE (null) END) END and CASE WHEN ((_col3 > 0)) THEN (CASE WHEN ((_col7 > 0)) THEN (((_col9 / _col7) > (_col1 / _col3))) ELSE ((null > (_col1 / _col3))) END) ELSE (CASE WHEN ((_col7 > 0)) THEN (((_col9 / _col7) > null)) ELSE (null) END) END)
             Merge Join Operator [MERGEJOIN_267] (rows=1149975359 width=88)
               Conds:RS_125._col0=RS_126._col0(Inner),RS_125._col0=RS_127._col0(Inner),RS_125._col0=RS_128._col0(Inner),Output:["_col0","_col1","_col3","_col5","_col7","_col9","_col11"]
-            <-Reducer 11 [SIMPLE_EDGE]
-              SHUFFLE [RS_127]
+            <-Reducer 11 [ONE_TO_ONE_EDGE]
+              FORWARD [RS_127]
                 PartitionCols:_col0
                 Group By Operator [GBY_58] (rows=348477374 width=88)
                   Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -185,8 +185,8 @@ Stage-0
                 PartitionCols:_col0
                 Merge Join Operator [MERGEJOIN_266] (rows=191667561 width=135)
                   Conds:RS_120._col0=RS_121._col0(Inner),RS_120._col0=RS_122._col0(Inner),Output:["_col0","_col1","_col3","_col5"]
-                <-Reducer 15 [SIMPLE_EDGE]
-                  SHUFFLE [RS_120]
+                <-Reducer 15 [ONE_TO_ONE_EDGE]
+                  FORWARD [RS_120]
                     PartitionCols:_col0
                     Group By Operator [GBY_78] (rows=87121617 width=135)
                       Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -223,8 +223,8 @@ Stage-0
                                       predicate:(ws_sold_date_sk is not null and ws_bill_addr_sk is not null)
                                       TableScan [TS_60] (rows=144002668 width=135)
                                         default@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_date_sk","ws_bill_addr_sk","ws_ext_sales_price"]
-                <-Reducer 19 [SIMPLE_EDGE]
-                  SHUFFLE [RS_121]
+                <-Reducer 19 [ONE_TO_ONE_EDGE]
+                  FORWARD [RS_121]
                     PartitionCols:_col0
                     Group By Operator [GBY_98] (rows=87121617 width=135)
                       Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -256,8 +256,8 @@ Stage-0
                                 SHUFFLE [RS_89]
                                   PartitionCols:_col0
                                    Please refer to the previous Select Operator [SEL_62]
-                <-Reducer 22 [SIMPLE_EDGE]
-                  SHUFFLE [RS_122]
+                <-Reducer 22 [ONE_TO_ONE_EDGE]
+                  FORWARD [RS_122]
                     PartitionCols:_col0
                     Group By Operator [GBY_118] (rows=87121617 width=135)
                       Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -289,8 +289,8 @@ Stage-0
                                 SHUFFLE [RS_109]
                                   PartitionCols:_col0
                                    Please refer to the previous Select Operator [SEL_62]
-            <-Reducer 4 [SIMPLE_EDGE]
-              SHUFFLE [RS_125]
+            <-Reducer 4 [ONE_TO_ONE_EDGE]
+              FORWARD [RS_125]
                 PartitionCols:_col0
                 Group By Operator [GBY_18] (rows=348477374 width=88)
                   Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -322,8 +322,8 @@ Stage-0
                             SHUFFLE [RS_9]
                               PartitionCols:_col0
                                Please refer to the previous Select Operator [SEL_2]
-            <-Reducer 8 [SIMPLE_EDGE]
-              SHUFFLE [RS_126]
+            <-Reducer 8 [ONE_TO_ONE_EDGE]
+              FORWARD [RS_126]
                 PartitionCols:_col0
                 Group By Operator [GBY_38] (rows=348477374 width=88)
                   Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query32.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query32.q.out b/ql/src/test/results/clientpositive/perf/query32.q.out
index 10ad060..7b21854 100644
--- a/ql/src/test/results/clientpositive/perf/query32.q.out
+++ b/ql/src/test/results/clientpositive/perf/query32.q.out
@@ -60,7 +60,7 @@ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
 Reducer 5 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
-Reducer 7 <- Map 9 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+Reducer 7 <- Map 9 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
 
 Stage-0
   Fetch Operator
@@ -119,8 +119,8 @@ Stage-0
                                 predicate:((i_manufact_id = 269) and i_item_sk is not null)
                                 TableScan [TS_20] (rows=462000 width=1436)
                                   default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_manufact_id"]
-                        <-Reducer 6 [SIMPLE_EDGE]
-                          SHUFFLE [RS_23]
+                        <-Reducer 6 [ONE_TO_ONE_EDGE]
+                          FORWARD [RS_23]
                             PartitionCols:_col1
                             Select Operator [SEL_19] (rows=158394413 width=135)
                               Output:["_col0","_col1"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query33.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query33.q.out b/ql/src/test/results/clientpositive/perf/query33.q.out
index aab3a19..9d47c8a 100644
--- a/ql/src/test/results/clientpositive/perf/query33.q.out
+++ b/ql/src/test/results/clientpositive/perf/query33.q.out
@@ -150,14 +150,14 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 10 <- Reducer 9 (SIMPLE_EDGE), Union 5 (CONTAINS)
-Reducer 11 <- Map 1 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE)
+Reducer 11 <- Map 1 (SIMPLE_EDGE), Reducer 17 (ONE_TO_ONE_EDGE)
 Reducer 12 <- Reducer 11 (SIMPLE_EDGE), Reducer 25 (SIMPLE_EDGE)
 Reducer 13 <- Reducer 12 (SIMPLE_EDGE), Union 5 (CONTAINS)
 Reducer 15 <- Map 14 (SIMPLE_EDGE)
 Reducer 16 <- Map 14 (SIMPLE_EDGE)
 Reducer 17 <- Map 14 (SIMPLE_EDGE)
 Reducer 19 <- Map 18 (SIMPLE_EDGE), Map 21 (SIMPLE_EDGE)
-Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE)
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 15 (ONE_TO_ONE_EDGE)
 Reducer 20 <- Map 26 (SIMPLE_EDGE), Reducer 19 (SIMPLE_EDGE)
 Reducer 22 <- Map 21 (SIMPLE_EDGE), Map 27 (SIMPLE_EDGE)
 Reducer 23 <- Map 26 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
@@ -167,7 +167,7 @@ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 20 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Union 5 (CONTAINS)
 Reducer 6 <- Union 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
-Reducer 8 <- Map 1 (SIMPLE_EDGE), Reducer 16 (SIMPLE_EDGE)
+Reducer 8 <- Map 1 (SIMPLE_EDGE), Reducer 16 (ONE_TO_ONE_EDGE)
 Reducer 9 <- Reducer 23 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
 
 Stage-0
@@ -252,8 +252,8 @@ Stage-0
                                         predicate:(i_manufact_id is not null and i_item_sk is not null)
                                         TableScan [TS_0] (rows=462000 width=1436)
                                           default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_manufact_id"]
-                                <-Reducer 16 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_64]
+                                <-Reducer 16 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_64]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_45] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0
@@ -291,8 +291,8 @@ Stage-0
                                   SHUFFLE [RS_101]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_2]
-                                <-Reducer 17 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_102]
+                                <-Reducer 17 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_102]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_83] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0
@@ -352,8 +352,8 @@ Stage-0
                                   SHUFFLE [RS_26]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_2]
-                                <-Reducer 15 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_27]
+                                <-Reducer 15 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_27]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_8] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query35.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query35.q.out b/ql/src/test/results/clientpositive/perf/query35.q.out
index 2815257..5d45b17 100644
--- a/ql/src/test/results/clientpositive/perf/query35.q.out
+++ b/ql/src/test/results/clientpositive/perf/query35.q.out
@@ -121,8 +121,8 @@ Reducer 16 <- Map 13 (SIMPLE_EDGE), Map 19 (SIMPLE_EDGE)
 Reducer 17 <- Reducer 16 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
 Reducer 3 <- Map 9 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
-Reducer 4 <- Reducer 12 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 17 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 4 <- Reducer 12 (ONE_TO_ONE_EDGE), Reducer 15 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 17 (ONE_TO_ONE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
 
@@ -153,8 +153,8 @@ Stage-0
                           predicate:(_col13 is not null or _col15 is not null)
                           Merge Join Operator [MERGEJOIN_113] (rows=766650239 width=88)
                             Conds:RS_62._col0=RS_63._col0(Left Outer),Output:["_col4","_col6","_col7","_col8","_col9","_col10","_col13","_col15"]
-                          <-Reducer 17 [SIMPLE_EDGE]
-                            SHUFFLE [RS_63]
+                          <-Reducer 17 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_63]
                               PartitionCols:_col0
                               Select Operator [SEL_61] (rows=158394413 width=135)
                                 Output:["_col0","_col1"]
@@ -192,8 +192,8 @@ Stage-0
                                 Output:["_col0","_col4","_col6","_col7","_col8","_col9","_col10","_col13"]
                                 Merge Join Operator [MERGEJOIN_112] (rows=696954748 width=88)
                                   Conds:RS_43._col0=RS_44._col0(Left Outer),RS_43._col0=RS_45._col0(Inner),Output:["_col0","_col4","_col6","_col7","_col8","_col9","_col10","_col12"]
-                                <-Reducer 12 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_44]
+                                <-Reducer 12 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_44]
                                     PartitionCols:_col0
                                     Select Operator [SEL_22] (rows=79201469 width=135)
                                       Output:["_col0","_col1"]
@@ -219,8 +219,8 @@ Stage-0
                                                     predicate:(ws_bill_customer_sk is not null and ws_sold_date_sk is not null)
                                                     TableScan [TS_9] (rows=144002668 width=135)
                                                       default@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_date_sk","ws_bill_customer_sk"]
-                                <-Reducer 15 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_45]
+                                <-Reducer 15 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_45]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_35] (rows=316797606 width=88)
                                       Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query45.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query45.q.out b/ql/src/test/results/clientpositive/perf/query45.q.out
index 19cf365..112b47b 100644
--- a/ql/src/test/results/clientpositive/perf/query45.q.out
+++ b/ql/src/test/results/clientpositive/perf/query45.q.out
@@ -49,7 +49,7 @@ Reducer 3 <- Reducer 10 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 12 (CUSTOM_SIMPLE_EDGE), Reducer 3 (CUSTOM_SIMPLE_EDGE)
 Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
-Reducer 9 <- Map 8 (SIMPLE_EDGE), Reducer 11 (SIMPLE_EDGE)
+Reducer 9 <- Map 8 (SIMPLE_EDGE), Reducer 11 (ONE_TO_ONE_EDGE)
 
 Stage-0
   Fetch Operator
@@ -137,8 +137,8 @@ Stage-0
                                             Filter Operator [FIL_77] (rows=462000 width=1436)
                                               predicate:i_item_sk is not null
                                                Please refer to the previous TableScan [TS_6]
-                                      <-Reducer 11 [SIMPLE_EDGE]
-                                        SHUFFLE [RS_27]
+                                      <-Reducer 11 [ONE_TO_ONE_EDGE]
+                                        FORWARD [RS_27]
                                           PartitionCols:_col0
                                           Select Operator [SEL_15] (rows=115500 width=1436)
                                             Output:["_col0","_col1"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query56.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query56.q.out b/ql/src/test/results/clientpositive/perf/query56.q.out
index 8f4606a..395a53b 100644
--- a/ql/src/test/results/clientpositive/perf/query56.q.out
+++ b/ql/src/test/results/clientpositive/perf/query56.q.out
@@ -136,14 +136,14 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 10 <- Reducer 9 (SIMPLE_EDGE), Union 5 (CONTAINS)
-Reducer 11 <- Map 1 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE)
+Reducer 11 <- Map 1 (SIMPLE_EDGE), Reducer 17 (ONE_TO_ONE_EDGE)
 Reducer 12 <- Reducer 11 (SIMPLE_EDGE), Reducer 25 (SIMPLE_EDGE)
 Reducer 13 <- Reducer 12 (SIMPLE_EDGE), Union 5 (CONTAINS)
 Reducer 15 <- Map 14 (SIMPLE_EDGE)
 Reducer 16 <- Map 14 (SIMPLE_EDGE)
 Reducer 17 <- Map 14 (SIMPLE_EDGE)
 Reducer 19 <- Map 18 (SIMPLE_EDGE), Map 21 (SIMPLE_EDGE)
-Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE)
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 15 (ONE_TO_ONE_EDGE)
 Reducer 20 <- Map 26 (SIMPLE_EDGE), Reducer 19 (SIMPLE_EDGE)
 Reducer 22 <- Map 21 (SIMPLE_EDGE), Map 27 (SIMPLE_EDGE)
 Reducer 23 <- Map 26 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
@@ -153,7 +153,7 @@ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 20 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Union 5 (CONTAINS)
 Reducer 6 <- Union 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
-Reducer 8 <- Map 1 (SIMPLE_EDGE), Reducer 16 (SIMPLE_EDGE)
+Reducer 8 <- Map 1 (SIMPLE_EDGE), Reducer 16 (ONE_TO_ONE_EDGE)
 Reducer 9 <- Reducer 23 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
 
 Stage-0
@@ -238,8 +238,8 @@ Stage-0
                                         predicate:(i_item_id is not null and i_item_sk is not null)
                                         TableScan [TS_0] (rows=462000 width=1436)
                                           default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_item_id"]
-                                <-Reducer 16 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_64]
+                                <-Reducer 16 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_64]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_45] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0
@@ -277,8 +277,8 @@ Stage-0
                                   SHUFFLE [RS_101]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_2]
-                                <-Reducer 17 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_102]
+                                <-Reducer 17 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_102]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_83] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0
@@ -338,8 +338,8 @@ Stage-0
                                   SHUFFLE [RS_26]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_2]
-                                <-Reducer 15 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_27]
+                                <-Reducer 15 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_27]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_8] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query58.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query58.q.out b/ql/src/test/results/clientpositive/perf/query58.q.out
index 5b37f36..7be1ff5 100644
--- a/ql/src/test/results/clientpositive/perf/query58.q.out
+++ b/ql/src/test/results/clientpositive/perf/query58.q.out
@@ -136,9 +136,9 @@ Reducer 10 <- Reducer 9 (SIMPLE_EDGE)
 Reducer 11 <- Map 33 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
 Reducer 12 <- Reducer 11 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE)
 Reducer 13 <- Reducer 12 (SIMPLE_EDGE)
-Reducer 15 <- Map 14 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
-Reducer 16 <- Map 14 (SIMPLE_EDGE), Reducer 26 (SIMPLE_EDGE)
-Reducer 17 <- Map 14 (SIMPLE_EDGE), Reducer 30 (SIMPLE_EDGE)
+Reducer 15 <- Map 14 (SIMPLE_EDGE), Reducer 22 (ONE_TO_ONE_EDGE)
+Reducer 16 <- Map 14 (SIMPLE_EDGE), Reducer 26 (ONE_TO_ONE_EDGE)
+Reducer 17 <- Map 14 (SIMPLE_EDGE), Reducer 30 (ONE_TO_ONE_EDGE)
 Reducer 19 <- Map 18 (CUSTOM_SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
 Reducer 20 <- Map 31 (CUSTOM_SIMPLE_EDGE), Reducer 19 (CUSTOM_SIMPLE_EDGE)
@@ -154,7 +154,7 @@ Reducer 29 <- Map 31 (SIMPLE_EDGE), Reducer 28 (SIMPLE_EDGE)
 Reducer 3 <- Reducer 15 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 30 <- Reducer 29 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 10 (SIMPLE_EDGE), Reducer 13 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 13 (ONE_TO_ONE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 8 <- Map 32 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
 Reducer 9 <- Reducer 16 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
@@ -177,8 +177,8 @@ Stage-0
                   predicate:(_col5 BETWEEN (0.9 * _col1) AND (1.1 * _col1) and _col5 BETWEEN (0.9 * _col3) AND (1.1 * _col3) and _col1 BETWEEN (0.9 * _col5) AND (1.1 * _col5) and _col3 BETWEEN (0.9 * _col5) AND (1.1 * _col5) and _col1 BETWEEN (0.9 * _col3) AND (1.1 * _col3) and _col3 BETWEEN (0.9 * _col1) AND (1.1 * _col1))
                   Merge Join Operator [MERGEJOIN_279] (rows=766650239 width=88)
                     Conds:RS_150._col0=RS_151._col0(Inner),RS_150._col0=RS_152._col0(Inner),Output:["_col0","_col1","_col3","_col5"]
-                  <-Reducer 10 [SIMPLE_EDGE]
-                    SHUFFLE [RS_151]
+                  <-Reducer 10 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_151]
                       PartitionCols:_col0
                       Group By Operator [GBY_98] (rows=348477374 width=88)
                         Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -203,8 +203,8 @@ Stage-0
                                         predicate:(d_date is not null and d_date_sk is not null)
                                         TableScan [TS_6] (rows=73049 width=1119)
                                           default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_date"]
-                                <-Reducer 26 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_86]
+                                <-Reducer 26 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_86]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_83] (rows=40176 width=1119)
                                       Output:["_col0"],keys:KEY._col0
@@ -275,8 +275,8 @@ Stage-0
                                         predicate:(ss_item_sk is not null and ss_sold_date_sk is not null)
                                         TableScan [TS_50] (rows=575995635 width=88)
                                           default@store_sales,store_sales,Tbl:COMPLETE,Col:NONE,Output:["ss_sold_date_sk","ss_item_sk","ss_ext_sales_price"]
-                  <-Reducer 13 [SIMPLE_EDGE]
-                    SHUFFLE [RS_152]
+                  <-Reducer 13 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_152]
                       PartitionCols:_col0
                       Group By Operator [GBY_148] (rows=87121617 width=135)
                         Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -314,8 +314,8 @@ Stage-0
                                   SHUFFLE [RS_135]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_8]
-                                <-Reducer 30 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_136]
+                                <-Reducer 30 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_136]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_133] (rows=40176 width=1119)
                                       Output:["_col0"],keys:KEY._col0
@@ -356,8 +356,8 @@ Stage-0
                                                       <-Map 18 [CUSTOM_SIMPLE_EDGE]
                                                         PARTITION_ONLY_SHUFFLE [RS_113]
                                                            Please refer to the previous Group By Operator [GBY_12]
-                  <-Reducer 4 [SIMPLE_EDGE]
-                    SHUFFLE [RS_150]
+                  <-Reducer 4 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_150]
                       PartitionCols:_col0
                       Group By Operator [GBY_48] (rows=174233858 width=135)
                         Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -377,8 +377,8 @@ Stage-0
                                   SHUFFLE [RS_35]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_8]
-                                <-Reducer 22 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_36]
+                                <-Reducer 22 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_36]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_33] (rows=40176 width=1119)
                                       Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query6.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query6.q.out b/ql/src/test/results/clientpositive/perf/query6.q.out
index 6ea20e9..1c685e2 100644
--- a/ql/src/test/results/clientpositive/perf/query6.q.out
+++ b/ql/src/test/results/clientpositive/perf/query6.q.out
@@ -58,7 +58,7 @@ Reducer 12 <- Reducer 11 (CUSTOM_SIMPLE_EDGE), Reducer 19 (CUSTOM_SIMPLE_EDGE)
 Reducer 13 <- Map 20 (SIMPLE_EDGE), Reducer 12 (SIMPLE_EDGE)
 Reducer 16 <- Map 15 (SIMPLE_EDGE), Map 17 (SIMPLE_EDGE)
 Reducer 19 <- Map 18 (SIMPLE_EDGE)
-Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
 Reducer 3 <- Map 14 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 16 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
 Reducer 5 <- Reducer 13 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
@@ -208,8 +208,8 @@ Stage-0
                                           predicate:(d_date_sk is not null and d_month_seq is not null)
                                           TableScan [TS_0] (rows=73049 width=1119)
                                             default@date_dim,d,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_month_seq"]
-                                  <-Reducer 9 [SIMPLE_EDGE]
-                                    SHUFFLE [RS_56]
+                                  <-Reducer 9 [ONE_TO_ONE_EDGE]
+                                    FORWARD [RS_56]
                                       PartitionCols:_col0
                                       Group By Operator [GBY_8] (rows=9131 width=1119)
                                         Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query60.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query60.q.out b/ql/src/test/results/clientpositive/perf/query60.q.out
index 7d63986..13974bc 100644
--- a/ql/src/test/results/clientpositive/perf/query60.q.out
+++ b/ql/src/test/results/clientpositive/perf/query60.q.out
@@ -156,14 +156,14 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 10 <- Reducer 9 (SIMPLE_EDGE), Union 5 (CONTAINS)
-Reducer 11 <- Map 1 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE)
+Reducer 11 <- Map 1 (SIMPLE_EDGE), Reducer 17 (ONE_TO_ONE_EDGE)
 Reducer 12 <- Reducer 11 (SIMPLE_EDGE), Reducer 25 (SIMPLE_EDGE)
 Reducer 13 <- Reducer 12 (SIMPLE_EDGE), Union 5 (CONTAINS)
 Reducer 15 <- Map 14 (SIMPLE_EDGE)
 Reducer 16 <- Map 14 (SIMPLE_EDGE)
 Reducer 17 <- Map 14 (SIMPLE_EDGE)
 Reducer 19 <- Map 18 (SIMPLE_EDGE), Map 21 (SIMPLE_EDGE)
-Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE)
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 15 (ONE_TO_ONE_EDGE)
 Reducer 20 <- Map 26 (SIMPLE_EDGE), Reducer 19 (SIMPLE_EDGE)
 Reducer 22 <- Map 21 (SIMPLE_EDGE), Map 27 (SIMPLE_EDGE)
 Reducer 23 <- Map 26 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
@@ -173,7 +173,7 @@ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 20 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Union 5 (CONTAINS)
 Reducer 6 <- Union 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
-Reducer 8 <- Map 1 (SIMPLE_EDGE), Reducer 16 (SIMPLE_EDGE)
+Reducer 8 <- Map 1 (SIMPLE_EDGE), Reducer 16 (ONE_TO_ONE_EDGE)
 Reducer 9 <- Reducer 23 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
 
 Stage-0
@@ -258,8 +258,8 @@ Stage-0
                                         predicate:(i_item_id is not null and i_item_sk is not null)
                                         TableScan [TS_0] (rows=462000 width=1436)
                                           default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_item_id"]
-                                <-Reducer 16 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_64]
+                                <-Reducer 16 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_64]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_45] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0
@@ -297,8 +297,8 @@ Stage-0
                                   SHUFFLE [RS_101]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_2]
-                                <-Reducer 17 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_102]
+                                <-Reducer 17 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_102]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_83] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0
@@ -358,8 +358,8 @@ Stage-0
                                   SHUFFLE [RS_26]
                                     PartitionCols:_col1
                                      Please refer to the previous Select Operator [SEL_2]
-                                <-Reducer 15 [SIMPLE_EDGE]
-                                  SHUFFLE [RS_27]
+                                <-Reducer 15 [ONE_TO_ONE_EDGE]
+                                  FORWARD [RS_27]
                                     PartitionCols:_col0
                                     Group By Operator [GBY_8] (rows=115500 width=1436)
                                       Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query64.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query64.q.out b/ql/src/test/results/clientpositive/perf/query64.q.out
index ddd0614..b31fd7f 100644
--- a/ql/src/test/results/clientpositive/perf/query64.q.out
+++ b/ql/src/test/results/clientpositive/perf/query64.q.out
@@ -251,14 +251,14 @@ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 20 (SIMPLE_EDGE)
 Reducer 21 <- Map 20 (SIMPLE_EDGE), Reducer 43 (SIMPLE_EDGE)
 Reducer 22 <- Map 48 (SIMPLE_EDGE), Reducer 21 (SIMPLE_EDGE)
 Reducer 23 <- Reducer 22 (SIMPLE_EDGE), Reducer 35 (SIMPLE_EDGE)
-Reducer 24 <- Reducer 23 (SIMPLE_EDGE), Reducer 51 (SIMPLE_EDGE)
+Reducer 24 <- Reducer 23 (SIMPLE_EDGE), Reducer 51 (ONE_TO_ONE_EDGE)
 Reducer 25 <- Map 55 (SIMPLE_EDGE), Reducer 24 (SIMPLE_EDGE)
 Reducer 26 <- Map 39 (SIMPLE_EDGE), Reducer 25 (SIMPLE_EDGE)
 Reducer 27 <- Map 20 (SIMPLE_EDGE), Reducer 45 (SIMPLE_EDGE)
 Reducer 28 <- Map 48 (SIMPLE_EDGE), Reducer 27 (SIMPLE_EDGE)
 Reducer 29 <- Reducer 28 (SIMPLE_EDGE), Reducer 37 (SIMPLE_EDGE)
 Reducer 3 <- Map 20 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
-Reducer 30 <- Reducer 29 (SIMPLE_EDGE), Reducer 53 (SIMPLE_EDGE)
+Reducer 30 <- Reducer 29 (SIMPLE_EDGE), Reducer 53 (ONE_TO_ONE_EDGE)
 Reducer 31 <- Map 55 (SIMPLE_EDGE), Reducer 30 (SIMPLE_EDGE)
 Reducer 32 <- Map 39 (SIMPLE_EDGE), Reducer 31 (SIMPLE_EDGE)
 Reducer 34 <- Map 33 (SIMPLE_EDGE), Map 38 (SIMPLE_EDGE)
@@ -529,8 +529,8 @@ Stage-0
                                                             SHUFFLE [RS_210]
                                                               PartitionCols:_col0
                                                                Please refer to the previous Select Operator [SEL_14]
-                                                  <-Reducer 53 [SIMPLE_EDGE]
-                                                    SHUFFLE [RS_216]
+                                                  <-Reducer 53 [ONE_TO_ONE_EDGE]
+                                                    FORWARD [RS_216]
                                                       PartitionCols:_col0
                                                       Select Operator [SEL_202] (rows=52798137 width=135)
                                                         Output:["_col0"]
@@ -681,8 +681,8 @@ Stage-0
                                                             SHUFFLE [RS_82]
                                                               PartitionCols:_col0
                                                                Please refer to the previous Select Operator [SEL_14]
-                                                  <-Reducer 51 [SIMPLE_EDGE]
-                                                    SHUFFLE [RS_88]
+                                                  <-Reducer 51 [ONE_TO_ONE_EDGE]
+                                                    FORWARD [RS_88]
                                                       PartitionCols:_col0
                                                       Select Operator [SEL_74] (rows=52798137 width=135)
                                                         Output:["_col0"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query69.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query69.q.out b/ql/src/test/results/clientpositive/perf/query69.q.out
index 2769bc5..1228894 100644
--- a/ql/src/test/results/clientpositive/perf/query69.q.out
+++ b/ql/src/test/results/clientpositive/perf/query69.q.out
@@ -101,8 +101,8 @@ Reducer 16 <- Map 13 (SIMPLE_EDGE), Map 19 (SIMPLE_EDGE)
 Reducer 17 <- Reducer 16 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
 Reducer 3 <- Map 9 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
-Reducer 4 <- Reducer 12 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 17 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 4 <- Reducer 12 (ONE_TO_ONE_EDGE), Reducer 15 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 17 (ONE_TO_ONE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
 
@@ -133,8 +133,8 @@ Stage-0
                           predicate:_col15 is null
                           Merge Join Operator [MERGEJOIN_114] (rows=383325119 width=88)
                             Conds:RS_64._col0=RS_65._col0(Left Outer),Output:["_col6","_col7","_col8","_col9","_col10","_col15"]
-                          <-Reducer 17 [SIMPLE_EDGE]
-                            SHUFFLE [RS_65]
+                          <-Reducer 17 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_65]
                               PartitionCols:_col0
                               Select Operator [SEL_63] (rows=158394413 width=135)
                                 Output:["_col0","_col1"]
@@ -176,8 +176,8 @@ Stage-0
                                     Output:["_col0","_col6","_col7","_col8","_col9","_col10","_col13"]
                                     Merge Join Operator [MERGEJOIN_113] (rows=696954748 width=88)
                                       Conds:RS_43._col0=RS_44._col0(Left Outer),RS_43._col0=RS_45._col0(Inner),Output:["_col0","_col6","_col7","_col8","_col9","_col10","_col12"]
-                                    <-Reducer 12 [SIMPLE_EDGE]
-                                      SHUFFLE [RS_44]
+                                    <-Reducer 12 [ONE_TO_ONE_EDGE]
+                                      FORWARD [RS_44]
                                         PartitionCols:_col0
                                         Select Operator [SEL_22] (rows=79201469 width=135)
                                           Output:["_col0","_col1"]
@@ -203,8 +203,8 @@ Stage-0
                                                         predicate:(ws_bill_customer_sk is not null and ws_sold_date_sk is not null)
                                                         TableScan [TS_9] (rows=144002668 width=135)
                                                           default@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_date_sk","ws_bill_customer_sk"]
-                                    <-Reducer 15 [SIMPLE_EDGE]
-                                      SHUFFLE [RS_45]
+                                    <-Reducer 15 [ONE_TO_ONE_EDGE]
+                                      FORWARD [RS_45]
                                         PartitionCols:_col0
                                         Group By Operator [GBY_35] (rows=316797606 width=88)
                                           Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query77.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query77.q.out b/ql/src/test/results/clientpositive/perf/query77.q.out
index db42b18..d10e226 100644
--- a/ql/src/test/results/clientpositive/perf/query77.q.out
+++ b/ql/src/test/results/clientpositive/perf/query77.q.out
@@ -226,13 +226,13 @@ Reducer 18 <- Map 29 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 19 <- Map 30 (SIMPLE_EDGE), Reducer 18 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 20 <- Reducer 19 (SIMPLE_EDGE)
-Reducer 21 <- Reducer 20 (SIMPLE_EDGE), Reducer 24 (SIMPLE_EDGE), Union 6 (CONTAINS)
+Reducer 21 <- Reducer 20 (ONE_TO_ONE_EDGE), Reducer 24 (ONE_TO_ONE_EDGE), Union 6 (CONTAINS)
 Reducer 22 <- Map 31 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 23 <- Map 30 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
 Reducer 24 <- Reducer 23 (SIMPLE_EDGE)
 Reducer 3 <- Map 25 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 12 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE), Union 6 (CONTAINS)
+Reducer 5 <- Reducer 12 (ONE_TO_ONE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE), Union 6 (CONTAINS)
 Reducer 7 <- Union 6 (SIMPLE_EDGE)
 Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
 
@@ -323,8 +323,8 @@ Stage-0
                           Output:["_col0","_col1","_col2","_col3","_col4"]
                           Merge Join Operator [MERGEJOIN_189] (rows=95833780 width=135)
                             Conds:RS_117._col0=RS_118._col0(Left Outer),Output:["_col0","_col1","_col2","_col4","_col5"]
-                          <-Reducer 20 [SIMPLE_EDGE]
-                            SHUFFLE [RS_117]
+                          <-Reducer 20 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_117]
                               PartitionCols:_col0
                               Group By Operator [GBY_95] (rows=87121617 width=135)
                                 Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)"],keys:KEY._col0
@@ -362,8 +362,8 @@ Stage-0
                                                 predicate:(ws_sold_date_sk is not null and ws_web_page_sk is not null)
                                                 TableScan [TS_77] (rows=144002668 width=135)
                                                   default@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_date_sk","ws_web_page_sk","ws_ext_sales_price","ws_net_profit"]
-                          <-Reducer 24 [SIMPLE_EDGE]
-                            SHUFFLE [RS_118]
+                          <-Reducer 24 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_118]
                               PartitionCols:_col0
                               Group By Operator [GBY_115] (rows=8711072 width=92)
                                 Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)"],keys:KEY._col0
@@ -405,8 +405,8 @@ Stage-0
                           Output:["_col0","_col1","_col2","_col3","_col4"]
                           Merge Join Operator [MERGEJOIN_187] (rows=383325119 width=88)
                             Conds:RS_40._col0=RS_41._col0(Left Outer),Output:["_col0","_col1","_col2","_col4","_col5"]
-                          <-Reducer 12 [SIMPLE_EDGE]
-                            SHUFFLE [RS_41]
+                          <-Reducer 12 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_41]
                               PartitionCols:_col0
                               Group By Operator [GBY_38] (rows=34842647 width=77)
                                 Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)"],keys:KEY._col0
@@ -444,8 +444,8 @@ Stage-0
                                                 predicate:(sr_returned_date_sk is not null and sr_store_sk is not null)
                                                 TableScan [TS_20] (rows=57591150 width=77)
                                                   default@store_returns,store_returns,Tbl:COMPLETE,Col:NONE,Output:["sr_returned_date_sk","sr_store_sk","sr_return_amt","sr_net_loss"]
-                          <-Reducer 4 [SIMPLE_EDGE]
-                            SHUFFLE [RS_40]
+                          <-Reducer 4 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_40]
                               PartitionCols:_col0
                               Group By Operator [GBY_18] (rows=348477374 width=88)
                                 Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query78.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query78.q.out b/ql/src/test/results/clientpositive/perf/query78.q.out
index 2fccac2..91c244f 100644
--- a/ql/src/test/results/clientpositive/perf/query78.q.out
+++ b/ql/src/test/results/clientpositive/perf/query78.q.out
@@ -121,7 +121,7 @@ Reducer 15 <- Map 14 (SIMPLE_EDGE), Map 16 (SIMPLE_EDGE)
 Reducer 18 <- Map 17 (SIMPLE_EDGE), Map 19 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 12 (SIMPLE_EDGE)
 Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
 Reducer 5 <- Reducer 10 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Map 1 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE)
@@ -203,8 +203,8 @@ Stage-0
                           predicate:(COALESCE(_col7,0) > 0)
                           Merge Join Operator [MERGEJOIN_112] (rows=191662559 width=88)
                             Conds:RS_42._col1, _col0=RS_43._col1, _col0(Left Outer),Output:["_col0","_col1","_col2","_col3","_col4","_col7","_col8","_col9"]
-                          <-Reducer 3 [SIMPLE_EDGE]
-                            SHUFFLE [RS_42]
+                          <-Reducer 3 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_42]
                               PartitionCols:_col1, _col0
                               Select Operator [SEL_20] (rows=174238687 width=88)
                                 Output:["_col0","_col1","_col2","_col3","_col4"]
@@ -246,8 +246,8 @@ Stage-0
                                                     Output:["_col0","_col1"]
                                                     TableScan [TS_6] (rows=57591150 width=77)
                                                       default@store_returns,store_returns,Tbl:COMPLETE,Col:NONE,Output:["sr_item_sk","sr_ticket_number"]
-                          <-Reducer 8 [SIMPLE_EDGE]
-                            SHUFFLE [RS_43]
+                          <-Reducer 8 [ONE_TO_ONE_EDGE]
+                            FORWARD [RS_43]
                               PartitionCols:_col1, _col0
                               Select Operator [SEL_41] (rows=43560808 width=135)
                                 Output:["_col0","_col1","_col2","_col3","_col4"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query83.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query83.q.out b/ql/src/test/results/clientpositive/perf/query83.q.out
index 4f14cc6..dfae770 100644
--- a/ql/src/test/results/clientpositive/perf/query83.q.out
+++ b/ql/src/test/results/clientpositive/perf/query83.q.out
@@ -137,9 +137,9 @@ Reducer 10 <- Reducer 9 (SIMPLE_EDGE)
 Reducer 11 <- Map 27 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
 Reducer 12 <- Reducer 11 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE)
 Reducer 13 <- Reducer 12 (SIMPLE_EDGE)
-Reducer 15 <- Map 14 (SIMPLE_EDGE), Reducer 20 (SIMPLE_EDGE)
-Reducer 16 <- Map 14 (SIMPLE_EDGE), Reducer 22 (SIMPLE_EDGE)
-Reducer 17 <- Map 14 (SIMPLE_EDGE), Reducer 24 (SIMPLE_EDGE)
+Reducer 15 <- Map 14 (SIMPLE_EDGE), Reducer 20 (ONE_TO_ONE_EDGE)
+Reducer 16 <- Map 14 (SIMPLE_EDGE), Reducer 22 (ONE_TO_ONE_EDGE)
+Reducer 17 <- Map 14 (SIMPLE_EDGE), Reducer 24 (ONE_TO_ONE_EDGE)
 Reducer 19 <- Map 18 (SIMPLE_EDGE), Map 25 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
 Reducer 20 <- Reducer 19 (SIMPLE_EDGE)
@@ -149,7 +149,7 @@ Reducer 23 <- Map 18 (SIMPLE_EDGE), Map 25 (SIMPLE_EDGE)
 Reducer 24 <- Reducer 23 (SIMPLE_EDGE)
 Reducer 3 <- Reducer 15 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 10 (SIMPLE_EDGE), Reducer 13 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 13 (ONE_TO_ONE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 8 <- Map 26 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
 Reducer 9 <- Reducer 16 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
@@ -170,8 +170,8 @@ Stage-0
                 Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"]
                 Merge Join Operator [MERGEJOIN_222] (rows=76653825 width=77)
                   Conds:RS_120._col0=RS_121._col0(Inner),RS_120._col0=RS_122._col0(Inner),Output:["_col0","_col1","_col3","_col5"]
-                <-Reducer 10 [SIMPLE_EDGE]
-                  SHUFFLE [RS_121]
+                <-Reducer 10 [ONE_TO_ONE_EDGE]
+                  FORWARD [RS_121]
                     PartitionCols:_col0
                     Group By Operator [GBY_78] (rows=34842647 width=77)
                       Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -196,8 +196,8 @@ Stage-0
                                       predicate:(d_date is not null and d_date_sk is not null)
                                       TableScan [TS_6] (rows=73049 width=1119)
                                         default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_date"]
-                              <-Reducer 22 [SIMPLE_EDGE]
-                                SHUFFLE [RS_66]
+                              <-Reducer 22 [ONE_TO_ONE_EDGE]
+                                FORWARD [RS_66]
                                   PartitionCols:_col0
                                   Group By Operator [GBY_63] (rows=40176 width=1119)
                                     Output:["_col0"],keys:KEY._col0
@@ -251,8 +251,8 @@ Stage-0
                                       predicate:(sr_item_sk is not null and sr_returned_date_sk is not null)
                                       TableScan [TS_40] (rows=57591150 width=77)
                                         default@store_returns,store_returns,Tbl:COMPLETE,Col:NONE,Output:["sr_returned_date_sk","sr_item_sk","sr_return_quantity"]
-                <-Reducer 13 [SIMPLE_EDGE]
-                  SHUFFLE [RS_122]
+                <-Reducer 13 [ONE_TO_ONE_EDGE]
+                  FORWARD [RS_122]
                     PartitionCols:_col0
                     Group By Operator [GBY_118] (rows=8711072 width=92)
                       Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -290,8 +290,8 @@ Stage-0
                                 SHUFFLE [RS_105]
                                   PartitionCols:_col1
                                    Please refer to the previous Select Operator [SEL_8]
-                              <-Reducer 24 [SIMPLE_EDGE]
-                                SHUFFLE [RS_106]
+                              <-Reducer 24 [ONE_TO_ONE_EDGE]
+                                FORWARD [RS_106]
                                   PartitionCols:_col0
                                   Group By Operator [GBY_103] (rows=40176 width=1119)
                                     Output:["_col0"],keys:KEY._col0
@@ -310,8 +310,8 @@ Stage-0
                                           SHUFFLE [RS_98]
                                             PartitionCols:_col0
                                              Please refer to the previous Group By Operator [GBY_16]
-                <-Reducer 4 [SIMPLE_EDGE]
-                  SHUFFLE [RS_120]
+                <-Reducer 4 [ONE_TO_ONE_EDGE]
+                  FORWARD [RS_120]
                     PartitionCols:_col0
                     Group By Operator [GBY_38] (rows=17423323 width=106)
                       Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
@@ -331,8 +331,8 @@ Stage-0
                                 SHUFFLE [RS_25]
                                   PartitionCols:_col1
                                    Please refer to the previous Select Operator [SEL_8]
-                              <-Reducer 20 [SIMPLE_EDGE]
-                                SHUFFLE [RS_26]
+                              <-Reducer 20 [ONE_TO_ONE_EDGE]
+                                FORWARD [RS_26]
                                   PartitionCols:_col0
                                   Group By Operator [GBY_23] (rows=40176 width=1119)
                                     Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query92.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query92.q.out b/ql/src/test/results/clientpositive/perf/query92.q.out
index 761753f..adcbd833 100644
--- a/ql/src/test/results/clientpositive/perf/query92.q.out
+++ b/ql/src/test/results/clientpositive/perf/query92.q.out
@@ -65,7 +65,7 @@ Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
 Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
-Reducer 8 <- Map 10 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+Reducer 8 <- Map 10 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
 
 Stage-0
   Fetch Operator
@@ -130,8 +130,8 @@ Stage-0
                                       predicate:((i_manufact_id = 269) and i_item_sk is not null)
                                       TableScan [TS_20] (rows=462000 width=1436)
                                         default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_manufact_id"]
-                              <-Reducer 7 [SIMPLE_EDGE]
-                                SHUFFLE [RS_23]
+                              <-Reducer 7 [ONE_TO_ONE_EDGE]
+                                FORWARD [RS_23]
                                   PartitionCols:_col1
                                   Select Operator [SEL_19] (rows=79201469 width=135)
                                     Output:["_col0","_col1"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query94.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query94.q.out b/ql/src/test/results/clientpositive/perf/query94.q.out
index 77fba1b..b76fed3 100644
--- a/ql/src/test/results/clientpositive/perf/query94.q.out
+++ b/ql/src/test/results/clientpositive/perf/query94.q.out
@@ -62,7 +62,7 @@ Reducer 15 <- Map 14 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 3 <- Map 10 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Map 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 13 (SIMPLE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 13 (ONE_TO_ONE_EDGE), Reducer 15 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (CUSTOM_SIMPLE_EDGE)
 Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
@@ -102,8 +102,8 @@ Stage-0
                                   Output:["_col4","_col5","_col6","_col16"]
                                   Merge Join Operator [MERGEJOIN_82] (rows=421668645 width=135)
                                     Conds:RS_35._col4=RS_36._col0(Left Outer),RS_35._col4=RS_37._col1(Inner),Output:["_col3","_col4","_col5","_col6","_col14","_col15"],residual filter predicates:{(_col3 <> _col15)}
-                                  <-Reducer 13 [SIMPLE_EDGE]
-                                    SHUFFLE [RS_36]
+                                  <-Reducer 13 [ONE_TO_ONE_EDGE]
+                                    FORWARD [RS_36]
                                       PartitionCols:_col0
                                       Select Operator [SEL_18] (rows=7199233 width=92)
                                         Output:["_col0","_col1"]

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query95.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query95.q.out b/ql/src/test/results/clientpositive/perf/query95.q.out
index 28db5a6..f036a47 100644
--- a/ql/src/test/results/clientpositive/perf/query95.q.out
+++ b/ql/src/test/results/clientpositive/perf/query95.q.out
@@ -71,7 +71,7 @@ Reducer 17 <- Reducer 16 (SIMPLE_EDGE)
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 3 <- Map 10 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
 Reducer 4 <- Map 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-Reducer 5 <- Reducer 14 (SIMPLE_EDGE), Reducer 17 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 5 <- Reducer 14 (ONE_TO_ONE_EDGE), Reducer 17 (ONE_TO_ONE_EDGE), Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
 Reducer 7 <- Reducer 6 (CUSTOM_SIMPLE_EDGE)
 Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
@@ -105,8 +105,8 @@ Stage-0
                             Output:["_col0","_col2","_col3"],aggregations:["sum(_col4)","sum(_col5)"],keys:_col3
                             Merge Join Operator [MERGEJOIN_127] (rows=421668645 width=135)
                               Conds:RS_58._col3=RS_59._col0(Inner),RS_58._col3=RS_60._col0(Inner),Output:["_col3","_col4","_col5"]
-                            <-Reducer 14 [SIMPLE_EDGE]
-                              SHUFFLE [RS_59]
+                            <-Reducer 14 [ONE_TO_ONE_EDGE]
+                              FORWARD [RS_59]
                                 PartitionCols:_col0
                                 Group By Operator [GBY_25] (rows=79201469 width=135)
                                   Output:["_col0"],keys:KEY._col0
@@ -139,8 +139,8 @@ Stage-0
                                                   predicate:ws_order_number is not null
                                                   TableScan [TS_15] (rows=144002668 width=135)
                                                     default@web_sales,ws2,Tbl:COMPLETE,Col:NONE,Output:["ws_warehouse_sk","ws_order_number"]
-                            <-Reducer 17 [SIMPLE_EDGE]
-                              SHUFFLE [RS_60]
+                            <-Reducer 17 [ONE_TO_ONE_EDGE]
+                              FORWARD [RS_60]
                                 PartitionCols:_col0
                                 Group By Operator [GBY_47] (rows=87121617 width=135)
                                   Output:["_col0"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/perf/query97.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/perf/query97.q.out b/ql/src/test/results/clientpositive/perf/query97.q.out
index 51dd341..81589f7 100644
--- a/ql/src/test/results/clientpositive/perf/query97.q.out
+++ b/ql/src/test/results/clientpositive/perf/query97.q.out
@@ -51,7 +51,7 @@ Plan optimized by CBO.
 Vertex dependency in root stage
 Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
 Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
 Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
 Reducer 7 <- Map 6 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
 Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
@@ -74,8 +74,8 @@ Stage-0
                   Output:["_col0","_col1","_col2"]
                   Merge Join Operator [MERGEJOIN_49] (rows=348477374 width=88)
                     Conds:RS_28._col0, _col1=RS_29._col0, _col1(Outer),Output:["_col0","_col2"]
-                  <-Reducer 3 [SIMPLE_EDGE]
-                    SHUFFLE [RS_28]
+                  <-Reducer 3 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_28]
                       PartitionCols:_col0, _col1
                       Group By Operator [GBY_12] (rows=316797606 width=88)
                         Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
@@ -104,8 +104,8 @@ Stage-0
                                     predicate:ss_sold_date_sk is not null
                                     TableScan [TS_0] (rows=575995635 width=88)
                                       default@store_sales,store_sales,Tbl:COMPLETE,Col:NONE,Output:["ss_sold_date_sk","ss_item_sk","ss_customer_sk"]
-                  <-Reducer 8 [SIMPLE_EDGE]
-                    SHUFFLE [RS_29]
+                  <-Reducer 8 [ONE_TO_ONE_EDGE]
+                    FORWARD [RS_29]
                       PartitionCols:_col0, _col1
                       Group By Operator [GBY_26] (rows=158394413 width=135)
                         Output:["_col0","_col1"],keys:KEY._col0, KEY._col1


[2/3] hive git commit: HIVE-17037: Use 1-to-1 Tez edge to avoid unnecessary input data shuffle (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Posted by jc...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out b/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
index 799de73..ecfa860 100644
--- a/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
+++ b/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
@@ -22,7 +22,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -190,7 +190,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -358,7 +358,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -524,7 +524,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -690,7 +690,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -856,7 +856,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1022,7 +1022,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1192,7 +1192,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1364,7 +1364,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
@@ -1552,7 +1552,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
@@ -1738,7 +1738,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1919,7 +1919,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/correlationoptimizer3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/correlationoptimizer3.q.out b/ql/src/test/results/clientpositive/llap/correlationoptimizer3.q.out
index 611ba06..33aa7ac 100644
--- a/ql/src/test/results/clientpositive/llap/correlationoptimizer3.q.out
+++ b/ql/src/test/results/clientpositive/llap/correlationoptimizer3.q.out
@@ -22,7 +22,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
@@ -239,7 +239,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
@@ -667,7 +667,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
@@ -884,7 +884,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/correlationoptimizer6.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/correlationoptimizer6.q.out b/ql/src/test/results/clientpositive/llap/correlationoptimizer6.q.out
index 177cc97..a47a7d3 100644
--- a/ql/src/test/results/clientpositive/llap/correlationoptimizer6.q.out
+++ b/ql/src/test/results/clientpositive/llap/correlationoptimizer6.q.out
@@ -25,7 +25,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
         Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -269,7 +269,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
         Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -733,7 +733,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -874,7 +874,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1015,7 +1015,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1196,7 +1196,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1379,7 +1379,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Map 6 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 4 <- Map 6 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1582,7 +1582,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Map 6 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 4 <- Map 6 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1785,7 +1785,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2003,7 +2003,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2223,7 +2223,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -2441,7 +2441,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -2660,9 +2660,9 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-        Reducer 5 <- Map 8 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 5 <- Map 8 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2914,9 +2914,9 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
-        Reducer 5 <- Map 8 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 5 <- Map 8 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -3167,7 +3167,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 9 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
         Reducer 8 <- Map 10 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
         Reducer 9 <- Reducer 8 (SIMPLE_EDGE)
@@ -3424,7 +3424,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE)
         Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
index eb1268c..b36fe2a 100644
--- a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
@@ -1737,7 +1737,7 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 2 <- Map 1 (SIMPLE_EDGE)
-Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
 Reducer 4 <- Map 1 (SIMPLE_EDGE)
 
 Stage-0
@@ -1752,8 +1752,8 @@ Stage-0
             predicate:_col4 is null
             Merge Join Operator [MERGEJOIN_21] (rows=250 width=178)
               Conds:RS_13._col0, _col1=RS_14._col0, _col1(Left Outer),Output:["_col0","_col1","_col4"]
-            <-Reducer 2 [SIMPLE_EDGE] llap
-              SHUFFLE [RS_13]
+            <-Reducer 2 [ONE_TO_ONE_EDGE] llap
+              FORWARD [RS_13]
                 PartitionCols:_col0, _col1
                 Group By Operator [GBY_4] (rows=250 width=178)
                   Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
@@ -1766,8 +1766,8 @@ Stage-0
                         Output:["key","value"]
                         TableScan [TS_0] (rows=500 width=178)
                           default@src_cbo,b,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]
-            <-Reducer 4 [SIMPLE_EDGE] llap
-              SHUFFLE [RS_14]
+            <-Reducer 4 [ONE_TO_ONE_EDGE] llap
+              FORWARD [RS_14]
                 PartitionCols:_col0, _col1
                 Select Operator [SEL_12] (rows=83 width=182)
                   Output:["_col0","_col1","_col2"]
@@ -1948,7 +1948,7 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 2 <- Map 1 (SIMPLE_EDGE)
-Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
+Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
 Reducer 4 <- Map 6 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
 
 Stage-0
@@ -1986,8 +1986,8 @@ Stage-0
                       predicate:((l_linenumber = 1) and l_partkey is not null)
                       TableScan [TS_7] (rows=100 width=16)
                         default@lineitem,li,Tbl:COMPLETE,Col:COMPLETE,Output:["l_orderkey","l_partkey","l_suppkey","l_linenumber"]
-              <-Reducer 2 [SIMPLE_EDGE] llap
-                SHUFFLE [RS_13]
+              <-Reducer 2 [ONE_TO_ONE_EDGE] llap
+                FORWARD [RS_13]
                   PartitionCols:_col0
                   Group By Operator [GBY_5] (rows=50 width=4)
                     Output:["_col0"],keys:KEY._col0
@@ -2165,7 +2165,7 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE)
 Reducer 7 <- Map 5 (SIMPLE_EDGE)
@@ -2209,8 +2209,8 @@ Stage-0
                                 predicate:(key > '2')
                                 TableScan [TS_2] (rows=500 width=87)
                                   default@src_cbo,s1,Tbl:COMPLETE,Col:COMPLETE,Output:["key"]
-                <-Reducer 7 [SIMPLE_EDGE] llap
-                  SHUFFLE [RS_20]
+                <-Reducer 7 [ONE_TO_ONE_EDGE] llap
+                  FORWARD [RS_20]
                     PartitionCols:_col0
                     Select Operator [SEL_15] (rows=83 width=91)
                       Output:["_col0","_col1"]
@@ -2244,8 +2244,8 @@ POSTHOOK: type: QUERY
 Plan optimized by CBO.
 
 Vertex dependency in root stage
-Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
-Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
+Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
 Reducer 4 <- Map 1 (SIMPLE_EDGE)
 Reducer 5 <- Map 1 (SIMPLE_EDGE)
 
@@ -2273,8 +2273,8 @@ Stage-0
                       Output:["_col0","_col1","_col2"]
                       TableScan [TS_0] (rows=26 width=223)
                         default@part,b,Tbl:COMPLETE,Col:COMPLETE,Output:["p_name","p_mfgr","p_size"]
-                <-Reducer 4 [SIMPLE_EDGE] llap
-                  SHUFFLE [RS_19]
+                <-Reducer 4 [ONE_TO_ONE_EDGE] llap
+                  FORWARD [RS_19]
                     PartitionCols:_col0
                     Group By Operator [GBY_7] (rows=2 width=114)
                       Output:["_col0","_col1","_col2"],aggregations:["count(VALUE._col0)","count(VALUE._col1)"],keys:KEY._col0
@@ -2288,8 +2288,8 @@ Stage-0
                             Filter Operator [FIL_29] (rows=8 width=223)
                               predicate:((p_size < 10) and p_mfgr is not null)
                                Please refer to the previous TableScan [TS_0]
-            <-Reducer 5 [SIMPLE_EDGE] llap
-              SHUFFLE [RS_22]
+            <-Reducer 5 [ONE_TO_ONE_EDGE] llap
+              FORWARD [RS_22]
                 PartitionCols:_col0, _col1
                 Select Operator [SEL_17] (rows=4 width=223)
                   Output:["_col0","_col1","_col2"]
@@ -2409,7 +2409,7 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 2 <- Map 1 (SIMPLE_EDGE)
-Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
 Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 Reducer 6 <- Map 1 (SIMPLE_EDGE)
@@ -2451,8 +2451,8 @@ Stage-0
                                 Output:["p_mfgr","p_retailprice"]
                                 TableScan [TS_0] (rows=26 width=106)
                                   default@part,b,Tbl:COMPLETE,Col:COMPLETE,Output:["p_mfgr","p_retailprice"]
-                    <-Reducer 7 [SIMPLE_EDGE] llap
-                      SHUFFLE [RS_28]
+                    <-Reducer 7 [ONE_TO_ONE_EDGE] llap
+                      FORWARD [RS_28]
                         PartitionCols:_col0
                         Group By Operator [GBY_16] (rows=1 width=24)
                           Output:["_col0","_col1","_col2"],aggregations:["count(VALUE._col0)","count(VALUE._col1)"],keys:KEY._col0

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/mrr.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mrr.q.out b/ql/src/test/results/clientpositive/llap/mrr.q.out
index c422021..bfa26e4 100644
--- a/ql/src/test/results/clientpositive/llap/mrr.q.out
+++ b/ql/src/test/results/clientpositive/llap/mrr.q.out
@@ -1293,7 +1293,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
         Reducer 6 <- Map 1 (SIMPLE_EDGE)

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/parallel.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/parallel.q.out b/ql/src/test/results/clientpositive/llap/parallel.q.out
index 7dba122..aea9417 100644
--- a/ql/src/test/results/clientpositive/llap/parallel.q.out
+++ b/ql/src/test/results/clientpositive/llap/parallel.q.out
@@ -38,7 +38,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/parallel_colstats.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/parallel_colstats.q.out b/ql/src/test/results/clientpositive/llap/parallel_colstats.q.out
index 6c95f3a..57498a6 100644
--- a/ql/src/test/results/clientpositive/llap/parallel_colstats.q.out
+++ b/ql/src/test/results/clientpositive/llap/parallel_colstats.q.out
@@ -40,7 +40,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/partialdhj.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/partialdhj.q.out b/ql/src/test/results/clientpositive/llap/partialdhj.q.out
new file mode 100644
index 0000000..4e62c4f
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/partialdhj.q.out
@@ -0,0 +1,460 @@
+PREHOOK: query: EXPLAIN
+SELECT *
+FROM (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+JOIN src
+ON (a.value = src.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT *
+FROM (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+JOIN src
+ON (a.value = src.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 4 (CUSTOM_SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string)
+                  1 KEY.reducesinkkey0 (type: string)
+                outputColumnNames: _col0
+                input vertices:
+                  1 Map 4
+                Statistics: Num rows: 32 Data size: 2848 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                Group By Operator
+                  keys: _col0 (type: string)
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Map-reduce partition columns: _col0 (type: string)
+                    Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 6 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string)
+                  1 KEY.reducesinkkey0 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                input vertices:
+                  0 Reducer 3
+                Statistics: Num rows: 25 Data size: 6675 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 25 Data size: 6675 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT *
+FROM (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+JOIN src
+ON (a.value = src.value)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT *
+FROM (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+JOIN src
+ON (a.value = src.value)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@src1
+#### A masked pattern was here ####
+val_311	311	val_311
+val_165	165	val_165
+val_409	409	val_409
+val_98	98	val_98
+val_484	484	val_484
+val_311	311	val_311
+val_311	311	val_311
+val_165	165	val_165
+val_98	98	val_98
+val_409	409	val_409
+val_409	409	val_409
+val_27	27	val_27
+val_146	146	val_146
+val_146	146	val_146
+val_238	238	val_238
+val_238	238	val_238
+val_193	193	val_193
+val_273	273	val_273
+val_193	193	val_193
+val_273	273	val_273
+val_193	193	val_193
+val_273	273	val_273
+val_406	406	val_406
+val_406	406	val_406
+val_406	406	val_406
+val_406	406	val_406
+val_66	66	val_66
+val_213	213	val_213
+val_213	213	val_213
+val_278	278	val_278
+val_401	401	val_401
+val_278	278	val_278
+val_401	401	val_401
+val_401	401	val_401
+val_401	401	val_401
+val_401	401	val_401
+val_150	150	val_150
+val_255	255	val_255
+val_265	265	val_265
+val_255	255	val_255
+val_265	265	val_265
+PREHOOK: query: EXPLAIN
+SELECT *
+FROM src
+JOIN (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+ON (src.value = a.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT *
+FROM src
+JOIN (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+ON (src.value = a.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE), Map 6 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string)
+                  1 KEY.reducesinkkey0 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                input vertices:
+                  1 Reducer 5
+                Statistics: Num rows: 25 Data size: 6675 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 25 Data size: 6675 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string)
+                  1 KEY.reducesinkkey0 (type: string)
+                outputColumnNames: _col0
+                input vertices:
+                  1 Map 6
+                Statistics: Num rows: 32 Data size: 2848 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                Group By Operator
+                  keys: _col0 (type: string)
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Map-reduce partition columns: _col0 (type: string)
+                    Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 5 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 16 Data size: 1424 Basic stats: COMPLETE Column stats: COMPLETE
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT *
+FROM src
+JOIN (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+ON (src.value = a.value)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT *
+FROM src
+JOIN (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+ON (src.value = a.value)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@src1
+#### A masked pattern was here ####
+311	val_311	val_311
+165	val_165	val_165
+409	val_409	val_409
+98	val_98	val_98
+484	val_484	val_484
+311	val_311	val_311
+311	val_311	val_311
+165	val_165	val_165
+98	val_98	val_98
+409	val_409	val_409
+409	val_409	val_409
+27	val_27	val_27
+146	val_146	val_146
+146	val_146	val_146
+238	val_238	val_238
+238	val_238	val_238
+193	val_193	val_193
+273	val_273	val_273
+193	val_193	val_193
+273	val_273	val_273
+193	val_193	val_193
+273	val_273	val_273
+406	val_406	val_406
+406	val_406	val_406
+406	val_406	val_406
+406	val_406	val_406
+66	val_66	val_66
+213	val_213	val_213
+213	val_213	val_213
+278	val_278	val_278
+401	val_401	val_401
+278	val_278	val_278
+401	val_401	val_401
+401	val_401	val_401
+401	val_401	val_401
+401	val_401	val_401
+150	val_150	val_150
+255	val_255	val_255
+265	val_265	val_265
+255	val_255	val_255
+265	val_265	val_265

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/subquery_in.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_in.q.out b/ql/src/test/results/clientpositive/llap/subquery_in.q.out
index 6596b68..a831289 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_in.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_in.q.out
@@ -744,7 +744,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -901,7 +901,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
+        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 6 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1249,7 +1249,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1380,7 +1380,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1523,7 +1523,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1657,7 +1657,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1778,7 +1778,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2082,7 +2082,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2212,7 +2212,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2339,7 +2339,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
-        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2730,7 +2730,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
@@ -2955,7 +2955,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
@@ -3111,7 +3111,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
@@ -4115,7 +4115,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
@@ -4578,8 +4578,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 7 <- Map 5 (SIMPLE_EDGE)
@@ -4831,8 +4831,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 8 <- Map 7 (SIMPLE_EDGE)
@@ -5591,7 +5591,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE), Reducer 8 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 9 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
index bbc8a5b..5673f0e 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
@@ -88,7 +88,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 6 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -261,9 +261,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 7 <- Map 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 8 <- Map 5 (SIMPLE_EDGE)
@@ -462,9 +462,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 8 (CUSTOM_SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 8 <- Map 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 9 <- Map 7 (SIMPLE_EDGE)
@@ -696,9 +696,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 8 (CUSTOM_SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 8 <- Map 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 9 <- Map 7 (SIMPLE_EDGE)
@@ -1134,7 +1134,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
@@ -1316,7 +1316,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
@@ -1472,7 +1472,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
@@ -1665,9 +1665,9 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 12 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
         Reducer 11 <- Reducer 10 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Reducer 11 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 8 <- Map 12 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
         Reducer 9 <- Reducer 8 (SIMPLE_EDGE)
@@ -1981,7 +1981,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 6 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2156,9 +2156,9 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 12 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
         Reducer 11 <- Reducer 10 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Reducer 11 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 8 <- Map 12 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
         Reducer 9 <- Reducer 8 (SIMPLE_EDGE)
@@ -2472,9 +2472,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 8 <- Map 7 (SIMPLE_EDGE)
         Reducer 9 <- Map 7 (SIMPLE_EDGE)
@@ -2714,7 +2714,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
-        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2901,7 +2901,7 @@ STAGE PLANS:
         Reducer 10 <- Map 9 (CUSTOM_SIMPLE_EDGE)
         Reducer 11 <- Map 9 (CUSTOM_SIMPLE_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
+        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE), Reducer 10 (CUSTOM_SIMPLE_EDGE)
         Reducer 8 <- Reducer 11 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
@@ -3873,7 +3873,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -4036,7 +4036,7 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 9 (SIMPLE_EDGE)
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 10 (CUSTOM_SIMPLE_EDGE), Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/subquery_notin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_notin.q.out b/ql/src/test/results/clientpositive/llap/subquery_notin.q.out
index 79a4dda..3c6e95a 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_notin.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_notin.q.out
@@ -25,7 +25,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -317,8 +317,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 1 (SIMPLE_EDGE)
@@ -893,8 +893,8 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 5 (SIMPLE_EDGE)
         Reducer 11 <- Reducer 10 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
@@ -1478,7 +1478,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 4 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 1 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1640,8 +1640,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1841,7 +1841,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 4 (SIMPLE_EDGE)
@@ -2056,7 +2056,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 4 (SIMPLE_EDGE)
@@ -2279,7 +2279,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 10 <- Map 9 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
@@ -2505,7 +2505,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 4 (SIMPLE_EDGE)
@@ -2705,8 +2705,8 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 9 (SIMPLE_EDGE)
         Reducer 11 <- Map 9 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
         Reducer 7 <- Map 4 (SIMPLE_EDGE), Reducer 11 (SIMPLE_EDGE)
@@ -2975,8 +2975,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -3140,8 +3140,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -3311,11 +3311,11 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 9 (SIMPLE_EDGE)
         Reducer 11 <- Map 9 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
-        Reducer 5 <- Map 4 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
+        Reducer 5 <- Map 4 (SIMPLE_EDGE), Reducer 10 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
-        Reducer 7 <- Map 4 (SIMPLE_EDGE), Reducer 11 (SIMPLE_EDGE)
+        Reducer 7 <- Map 4 (SIMPLE_EDGE), Reducer 11 (ONE_TO_ONE_EDGE)
         Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -3589,7 +3589,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)
@@ -3862,8 +3862,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
         Reducer 6 <- Map 1 (SIMPLE_EDGE)
@@ -4162,7 +4162,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 7 <- Reducer 6 (CUSTOM_SIMPLE_EDGE)
@@ -4390,7 +4390,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
         Reducer 7 <- Reducer 6 (CUSTOM_SIMPLE_EDGE)
@@ -4615,7 +4615,7 @@ STAGE PLANS:
         Map 8 <- Union 9 (CONTAINS)
         Reducer 10 <- Union 9 (SIMPLE_EDGE)
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 10 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 2 (SIMPLE_EDGE)
         Reducer 6 <- Union 5 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -5324,7 +5324,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 10 <- Map 9 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
@@ -5581,7 +5581,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -5736,8 +5736,8 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 9 (SIMPLE_EDGE)
         Reducer 11 <- Map 9 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE)
         Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
         Reducer 7 <- Map 4 (SIMPLE_EDGE), Reducer 11 (SIMPLE_EDGE)
@@ -6036,8 +6036,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -6251,10 +6251,10 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
         Reducer 5 <- Map 3 (SIMPLE_EDGE)
-        Reducer 6 <- Reducer 5 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 6 <- Reducer 5 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 8 <- Map 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -6477,10 +6477,10 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE), Reducer 5 (SIMPLE_EDGE)
         Reducer 3 <- Map 1 (SIMPLE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
-        Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -6667,8 +6667,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -6827,7 +6827,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -6982,7 +6982,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 4 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 1 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -7145,8 +7145,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 10 <- Map 8 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 9 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 10 (CUSTOM_SIMPLE_EDGE)

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out b/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
index a65147a..a62c494 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
@@ -1317,7 +1317,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1447,7 +1447,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -1574,7 +1574,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1691,7 +1691,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)
@@ -1885,7 +1885,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)
@@ -2083,7 +2083,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2263,7 +2263,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -3155,7 +3155,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -3336,7 +3336,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 8 (CUSTOM_SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 8 <- Map 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 9 <- Map 7 (SIMPLE_EDGE)
@@ -3572,8 +3572,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 7 <- Map 5 (SIMPLE_EDGE)
         Reducer 9 <- Map 8 (SIMPLE_EDGE)
@@ -3800,8 +3800,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -4009,8 +4009,8 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 7 <- Map 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -4211,7 +4211,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -4370,7 +4370,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
-        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -4673,7 +4673,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -4795,7 +4795,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -4911,8 +4911,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -5094,7 +5094,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)
@@ -5775,7 +5775,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/subquery_select.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_select.q.out b/ql/src/test/results/clientpositive/llap/subquery_select.q.out
index eafb77a..fd10898 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_select.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_select.q.out
@@ -200,7 +200,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
@@ -586,8 +586,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
         Reducer 6 <- Map 1 (SIMPLE_EDGE)
@@ -983,7 +983,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1254,7 +1254,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1391,7 +1391,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1529,7 +1529,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -1804,7 +1804,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
         Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE)
@@ -1979,7 +1979,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
-        Reducer 3 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 3 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2154,7 +2154,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2608,7 +2608,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -2743,7 +2743,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -3069,7 +3069,7 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 9 (CUSTOM_SIMPLE_EDGE)
         Reducer 11 <- Map 9 (CUSTOM_SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 7 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 10 (CUSTOM_SIMPLE_EDGE), Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 5 <- Reducer 11 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
@@ -3362,7 +3362,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
@@ -4812,7 +4812,7 @@ STAGE PLANS:
         Reducer 11 <- Map 10 (SIMPLE_EDGE)
         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
         Reducer 4 <- Map 3 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
-        Reducer 5 <- Reducer 11 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 11 (ONE_TO_ONE_EDGE), Reducer 4 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 9 <- Map 8 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -5107,10 +5107,10 @@ STAGE PLANS:
       Edges:
         Reducer 10 <- Map 7 (SIMPLE_EDGE)
         Reducer 12 <- Map 11 (SIMPLE_EDGE), Map 13 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
-        Reducer 5 <- Reducer 10 (SIMPLE_EDGE), Reducer 12 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 12 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE)
         Reducer 8 <- Map 7 (SIMPLE_EDGE)
         Reducer 9 <- Map 7 (SIMPLE_EDGE)
@@ -5478,7 +5478,7 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 3 (ONE_TO_ONE_EDGE)
         Reducer 3 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/subquery_views.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_views.q.out b/ql/src/test/results/clientpositive/llap/subquery_views.q.out
index a2b3fd2..094d6b2 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_views.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_views.q.out
@@ -124,15 +124,15 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 10 <- Reducer 13 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 10 <- Reducer 13 (ONE_TO_ONE_EDGE), Reducer 9 (SIMPLE_EDGE)
         Reducer 12 <- Map 11 (SIMPLE_EDGE)
         Reducer 13 <- Map 11 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (ONE_TO_ONE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
         Reducer 4 <- Map 1 (SIMPLE_EDGE)
         Reducer 5 <- Map 1 (SIMPLE_EDGE)
-        Reducer 6 <- Reducer 12 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
-        Reducer 7 <- Map 1 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 6 <- Reducer 12 (ONE_TO_ONE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 7 <- Map 1 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
         Reducer 8 <- Map 1 (SIMPLE_EDGE)
         Reducer 9 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/tez_union_group_by.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_union_group_by.q.out b/ql/src/test/results/clientpositive/llap/tez_union_group_by.q.out
index 6dcf53f..67cd110 100644
--- a/ql/src/test/results/clientpositive/llap/tez_union_group_by.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_union_group_by.q.out
@@ -151,7 +151,7 @@ STAGE PLANS:
         Map 5 <- Union 2 (CONTAINS)
         Map 6 <- Union 2 (CONTAINS)
         Reducer 3 <- Union 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (ONE_TO_ONE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
         Reducer 8 <- Map 10 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
         Reducer 9 <- Reducer 8 (SIMPLE_EDGE)
 #### A masked pattern was here ####


[3/3] hive git commit: HIVE-17037: Use 1-to-1 Tez edge to avoid unnecessary input data shuffle (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Posted by jc...@apache.org.
HIVE-17037: Use 1-to-1 Tez edge to avoid unnecessary input data shuffle (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a96d9f71
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a96d9f71
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a96d9f71

Branch: refs/heads/master
Commit: a96d9f71da093b73e451fd8d6b146cd4e919d1af
Parents: a0df0ac
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Jul 13 18:48:16 2017 +0200
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Jul 20 14:55:48 2017 +0200

----------------------------------------------------------------------
 .../hadoop/hive/common/jsonexplain/Vertex.java  |   2 +-
 .../common/jsonexplain/tez/TezJsonParser.java   |   2 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../test/resources/testconfiguration.properties |   3 +-
 .../hadoop/hive/ql/exec/tez/DagUtils.java       |  16 +-
 .../hive/ql/optimizer/MapJoinProcessor.java     |   5 +-
 .../ql/optimizer/ReduceSinkMapJoinProc.java     |   6 +-
 .../correlation/CorrelationUtilities.java       |  51 +-
 .../correlation/ReduceSinkDeDuplication.java    | 476 +--------------
 .../ReduceSinkDeDuplicationUtils.java           | 584 +++++++++++++++++++
 .../ReduceSinkJoinDeDuplication.java            | 255 ++++++++
 .../hadoop/hive/ql/parse/GenTezUtils.java       |   3 +
 .../hadoop/hive/ql/parse/TezCompiler.java       |  30 +
 .../hadoop/hive/ql/plan/ReduceSinkDesc.java     |   9 +
 .../hadoop/hive/ql/plan/TezEdgeProperty.java    |   5 +
 ql/src/test/queries/clientpositive/partialdhj.q |  54 ++
 .../llap/auto_smb_mapjoin_14.q.out              |   2 +-
 .../llap/correlationoptimizer2.q.out            |  24 +-
 .../llap/correlationoptimizer3.q.out            |   8 +-
 .../llap/correlationoptimizer6.q.out            |  36 +-
 .../clientpositive/llap/explainuser_1.q.out     |  40 +-
 .../test/results/clientpositive/llap/mrr.q.out  |   2 +-
 .../results/clientpositive/llap/parallel.q.out  |   2 +-
 .../clientpositive/llap/parallel_colstats.q.out |   2 +-
 .../clientpositive/llap/partialdhj.q.out        | 460 +++++++++++++++
 .../clientpositive/llap/subquery_in.q.out       |  38 +-
 .../clientpositive/llap/subquery_multi.q.out    |  48 +-
 .../clientpositive/llap/subquery_notin.q.out    |  88 +--
 .../clientpositive/llap/subquery_scalar.q.out   |  46 +-
 .../clientpositive/llap/subquery_select.q.out   |  36 +-
 .../clientpositive/llap/subquery_views.q.out    |   8 +-
 .../llap/tez_union_group_by.q.out               |   2 +-
 .../llap/vector_auto_smb_mapjoin_14.q.out       |   6 +-
 .../results/clientpositive/perf/query10.q.out   |  16 +-
 .../results/clientpositive/perf/query14.q.out   |  38 +-
 .../results/clientpositive/perf/query16.q.out   |   6 +-
 .../results/clientpositive/perf/query2.q.out    |  12 +-
 .../results/clientpositive/perf/query23.q.out   |   2 +-
 .../results/clientpositive/perf/query31.q.out   |  28 +-
 .../results/clientpositive/perf/query32.q.out   |   6 +-
 .../results/clientpositive/perf/query33.q.out   |  18 +-
 .../results/clientpositive/perf/query35.q.out   |  16 +-
 .../results/clientpositive/perf/query45.q.out   |   6 +-
 .../results/clientpositive/perf/query56.q.out   |  18 +-
 .../results/clientpositive/perf/query58.q.out   |  32 +-
 .../results/clientpositive/perf/query6.q.out    |   6 +-
 .../results/clientpositive/perf/query60.q.out   |  18 +-
 .../results/clientpositive/perf/query64.q.out   |  12 +-
 .../results/clientpositive/perf/query69.q.out   |  16 +-
 .../results/clientpositive/perf/query77.q.out   |  20 +-
 .../results/clientpositive/perf/query78.q.out   |  10 +-
 .../results/clientpositive/perf/query83.q.out   |  32 +-
 .../results/clientpositive/perf/query92.q.out   |   6 +-
 .../results/clientpositive/perf/query94.q.out   |   6 +-
 .../results/clientpositive/perf/query95.q.out   |  10 +-
 .../results/clientpositive/perf/query97.q.out   |  10 +-
 56 files changed, 1853 insertions(+), 844 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
index 799355a..b7dc88c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
@@ -74,7 +74,7 @@ public final class Vertex implements Comparable<Vertex>{
   public VertexType vertexType;
 
   public static enum EdgeType {
-    BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN
+    BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, UNKNOWN
   };
   public String edgeType;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
index 294dc6b..69e5358 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
@@ -34,6 +34,8 @@ public class TezJsonParser extends DagJsonParser {
         return "PARTITION_ONLY_SHUFFLE";
       case "CUSTOM_EDGE":
         return "MULTICAST";
+      case "ONE_TO_ONE_EDGE":
+        return "FORWARD";
       default:
         return "UNKNOWN";
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 69205bc..f360dfa 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1601,6 +1601,10 @@ public class HiveConf extends Configuration {
         "Reduce deduplication merges two RSs by moving key/parts/reducer-num of the child RS to parent RS. \n" +
         "That means if reducer-num of the child RS is fixed (order by or forced bucketing) and small, it can make very slow, single MR.\n" +
         "The optimization will be automatically disabled if number of reducers would be less than specified value."),
+    HIVEOPTJOINREDUCEDEDUPLICATION("hive.optimize.joinreducededuplication", true,
+        "Remove extra shuffle/sorting operations after join algorithm selection has been executed. \n" +
+        "Currently it only works with Apache Tez. This should always be set to true. \n" +
+        "Since it is a new feature, it has been made configurable."),
 
     HIVEOPTSORTDYNAMICPARTITION("hive.optimize.sort.dynamic.partition", false,
         "When enabled dynamic partitioning column will be globally sorted.\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 1cc0104..362a796 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -742,7 +742,8 @@ minillaplocal.query.files=acid_globallimit.q,\
   groupby_resolution.q,\
   windowing_windowspec2.q,\
   vectorized_join46.q,\
-  vectorized_multi_output_select.q
+  vectorized_multi_output_select.q,\
+  partialdhj.q
 
 encrypted.query.files=encryption_join_unencrypted_tbl.q,\
   encryption_insert_partition_static.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 51beaad..64e0d9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -293,6 +293,10 @@ public class DagUtils {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       break;
 
+    case ONE_TO_ONE_EDGE:
+      mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+      break;
+
     case SIMPLE_EDGE:
       setupAutoReducerParallelism(edgeProp, w);
       // fall through
@@ -398,18 +402,26 @@ public class DagUtils {
           .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
           .build();
       return et3Conf.createDefaultEdgeProperty();
+    case ONE_TO_ONE_EDGE:
+      UnorderedKVEdgeConfig et4Conf = UnorderedKVEdgeConfig
+          .newBuilder(keyClass, valClass)
+          .setFromConfiguration(conf)
+          .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et4Conf.createDefaultOneToOneEdgeProperty();
     case SIMPLE_EDGE:
     default:
       assert partitionerClassName != null;
       partitionerConf = createPartitionerConf(partitionerClassName, conf);
-      OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
+      OrderedPartitionedKVEdgeConfig et5Conf = OrderedPartitionedKVEdgeConfig
           .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
           .setFromConfiguration(conf)
           .setKeySerializationClass(TezBytesWritableSerialization.class.getName(),
               TezBytesComparator.class.getName(), null)
           .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
           .build();
-      return et4Conf.createDefaultEdgeProperty();
+      return et5Conf.createDefaultEdgeProperty();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index d84a1e6..0538176 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -1073,8 +1073,11 @@ public class MapJoinProcessor extends Transform {
         ExprNodeDesc expr = colExprMap.get(column.getInternalName());
         int index = ExprNodeDescUtils.indexOf(expr, values);
         if (index >= 0) {
-          colExprMap.put(column.getInternalName(), newValues.get(index));
           schema.set(i, null);
+          if (adjustParentsChildren) {
+            // Since we remove reduce sink parents, replace original expressions
+            colExprMap.put(column.getInternalName(), newValues.get(index));
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index ac234d0..70919a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -270,7 +270,11 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
         }
       }
     } else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
-      edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+      if (parentRS.getConf().isForwarding()) {
+        edgeType = EdgeType.ONE_TO_ONE_EDGE;
+      } else {
+        edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+      }
     }
     if (edgeType == EdgeType.CUSTOM_EDGE) {
       // disable auto parallelism for bucket map joins

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
index 388399c..c63c28d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
@@ -29,8 +29,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.ForwardOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -43,16 +43,15 @@ import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication.ReduceSinkDeduplicateProcCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc.Mode;
 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -271,6 +270,52 @@ public final class CorrelationUtilities {
     return result;
   }
 
+  protected static <T extends Operator<?>> T findFirstPossibleParent(
+      Operator<?> start, Class<T> target, boolean trustScript) throws SemanticException {
+    // Preserve only partitioning
+    return findFirstPossibleParent(start, target, trustScript, false);
+  }
+
+  protected static <T extends Operator<?>> T findFirstPossibleParentPreserveSortOrder(
+      Operator<?> start, Class<T> target, boolean trustScript) throws SemanticException {
+    // Preserve partitioning and ordering
+    return findFirstPossibleParent(start, target, trustScript, true);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T extends Operator<?>> T findFirstPossibleParent(
+      Operator<?> start, Class<T> target, boolean trustScript, boolean preserveSortOrder)
+          throws SemanticException {
+    Operator<?> cursor = CorrelationUtilities.getSingleParent(start);
+    for (; cursor != null; cursor = CorrelationUtilities.getSingleParent(cursor)) {
+      if (target.isAssignableFrom(cursor.getClass())) {
+        return (T) cursor;
+      }
+      if (cursor instanceof CommonJoinOperator) {
+        for (Operator<?> op : ((CommonJoinOperator<?>) cursor).getParentOperators()) {
+          if (target.isAssignableFrom(op.getClass())) {
+            return (T) op;
+          }
+        }
+        return null;
+      }
+      if (cursor instanceof ScriptOperator && !trustScript) {
+        return null;
+      }
+      if (!(cursor instanceof SelectOperator
+          || cursor instanceof FilterOperator
+          || cursor instanceof ForwardOperator
+          || cursor instanceof ScriptOperator
+          || (cursor instanceof GroupByOperator
+              && (!preserveSortOrder
+                  || ((GroupByOperator) cursor).getConf().getMode() != Mode.HASH)) // Not order preserving
+          || cursor instanceof ReduceSinkOperator)) {
+        return null;
+      }
+    }
+    return null;
+  }
+
   /**
    * Search the query plan tree from startPoint to the bottom. If there is no ReduceSinkOperator
    * between startPoint and the corresponding TableScanOperator, return the corresponding

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
index 701bde4..154c607 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
@@ -23,18 +23,14 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCON
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -49,13 +45,6 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.optimizer.Transform;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -184,457 +173,6 @@ public class ReduceSinkDeDuplication extends Transform {
 
     protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
         ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException;
-
-    // for JOIN-RS case, it's not possible generally to merge if child has
-    // less key/partition columns than parents
-    protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
-        throws SemanticException {
-      List<Operator<?>> parents = pJoin.getParentOperators();
-      ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
-      ReduceSinkDesc cRSc = cRS.getConf();
-      for (ReduceSinkOperator pRSNs : pRSs) {
-        ReduceSinkDesc pRSNc = pRSNs.getConf();
-        if (cRSc.getKeyCols().size() != pRSNc.getKeyCols().size()) {
-          return false;
-        }
-        if (cRSc.getPartitionCols().size() != pRSNc.getPartitionCols().size()) {
-          return false;
-        }
-        Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRSNc.getNumReducers());
-        if (moveReducerNumTo == null ||
-            moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
-          return false;
-        }
-
-        Integer moveRSOrderTo = checkOrder(true, cRSc.getOrder(), pRSNc.getOrder(),
-                cRSc.getNullOrder(), pRSNc.getNullOrder());
-        if (moveRSOrderTo == null) {
-          return false;
-        }
-      }
-
-      boolean[] sorted = CorrelationUtilities.getSortedTags(pJoin);
-
-      int cKeySize = cRSc.getKeyCols().size();
-      for (int i = 0; i < cKeySize; i++) {
-        ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
-        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
-        for (int tag = 0; tag < pRSs.length; tag++) {
-          pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
-        }
-        int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
-        if (found != i) {
-          return false;
-        }
-      }
-      int cPartSize = cRSc.getPartitionCols().size();
-      for (int i = 0; i < cPartSize; i++) {
-        ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
-        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
-        for (int tag = 0; tag < pRSs.length; tag++) {
-          pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
-        }
-        int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
-        if (found != i) {
-          return false;
-        }
-      }
-
-      for (ReduceSinkOperator pRS : pRSs) {
-        pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
-      }
-
-      return true;
-    }
-
-    /**
-     * Current RSDedup remove/replace child RS. For key columns,
-     * sorting order, and the number of reducers, copy
-     * more specific part of configurations of child RS to that of parent RS.
-     * For partitioning columns, if both child RS and parent RS have been assigned
-     * partitioning columns, we will choose the more general partitioning columns.
-     * If parent RS has not been assigned any partitioning column, we will use
-     * partitioning columns (if exist) of child RS.
-     */
-    protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
-        throws SemanticException {
-      int[] result = extractMergeDirections(cRS, pRS, minReducer);
-      if (result == null) {
-        return false;
-      }
-
-      if (result[0] > 0) {
-        // The sorting columns of the child RS are more specific than
-        // those of the parent RS. Assign sorting columns of the child RS
-        // to the parent RS.
-        List<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
-        pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
-      }
-
-      if (result[1] < 0) {
-        // The partitioning columns of the parent RS are more specific than
-        // those of the child RS.
-        List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
-        if (childPCs != null && !childPCs.isEmpty()) {
-          // If partitioning columns of the child RS are assigned,
-          // assign these to the partitioning columns of the parent RS.
-          pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
-        }
-      } else if (result[1] > 0) {
-        // The partitioning columns of the child RS are more specific than
-        // those of the parent RS.
-        List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
-        if (parentPCs == null || parentPCs.isEmpty()) {
-          // If partitioning columns of the parent RS are not assigned,
-          // assign partitioning columns of the child RS to the parent RS.
-          ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
-          pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
-        }
-      }
-
-      if (result[2] > 0) {
-        // The sorting order of the child RS is more specific than
-        // that of the parent RS. Assign the sorting order of the child RS
-        // to the parent RS.
-        if (result[0] <= 0) {
-          // Sorting columns of the parent RS are more specific than those of the
-          // child RS but Sorting order of the child RS is more specific than
-          // that of the parent RS.
-          throw new SemanticException("Sorting columns and order don't match. " +
-              "Try set " + HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION + "=false;");
-        }
-        pRS.getConf().setOrder(cRS.getConf().getOrder());
-        pRS.getConf().setNullOrder(cRS.getConf().getNullOrder());
-      } else {
-        // The sorting order of the parent RS is more specific or they are equal.
-        // We will copy the order from the child RS, and then fill in the order
-        // of the rest of columns with the one taken from parent RS.
-        StringBuilder order = new StringBuilder(cRS.getConf().getOrder());
-        StringBuilder orderNull = new StringBuilder(cRS.getConf().getNullOrder());
-        order.append(pRS.getConf().getOrder().substring(order.length()));
-        orderNull.append(pRS.getConf().getNullOrder().substring(orderNull.length()));
-        pRS.getConf().setOrder(order.toString());
-        pRS.getConf().setNullOrder(orderNull.toString());
-      }
-
-      if (result[3] > 0) {
-        // The number of reducers of the child RS is more specific than
-        // that of the parent RS. Assign the number of reducers of the child RS
-        // to the parent RS.
-        pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
-      }
-
-      if (result[4] > 0) {
-        // This case happens only when pRS key is empty in which case we can use
-        // number of distribution keys and key serialization info from cRS
-        if (pRS.getConf().getKeyCols() != null && pRS.getConf().getKeyCols().size() == 0
-            && cRS.getConf().getKeyCols() != null && cRS.getConf().getKeyCols().size() == 0) {
-          // As setNumDistributionKeys is a subset of keycols, the size should
-          // be 0 too. This condition maybe too strict. We may extend it in the
-          // future.
-          TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(new ArrayList<FieldSchema>(), pRS
-              .getConf().getOrder(), pRS.getConf().getNullOrder());
-          pRS.getConf().setKeySerializeInfo(keyTable);
-        }
-      }
-      return true;
-    }
-
-    /**
-     * Returns merge directions between two RSs for criterias (ordering, number of reducers,
-     * reducer keys, partition keys). Returns null if any of categories is not mergeable.
-     *
-     * Values for each index can be -1, 0, 1
-     * 1. 0 means two configuration in the category is the same
-     * 2. for -1, configuration of parent RS is more specific than child RS
-     * 3. for 1, configuration of child RS is more specific than parent RS
-     */
-    private int[] extractMergeDirections(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
-        throws SemanticException {
-      ReduceSinkDesc cConf = cRS.getConf();
-      ReduceSinkDesc pConf = pRS.getConf();
-      // If there is a PTF between cRS and pRS we cannot ignore the order direction
-      final boolean checkStrictEquality = isStrictEqualityNeeded(cRS, pRS);
-      Integer moveRSOrderTo = checkOrder(checkStrictEquality, cConf.getOrder(), pConf.getOrder(),
-              cConf.getNullOrder(), pConf.getNullOrder());
-      if (moveRSOrderTo == null) {
-        return null;
-      }
-      // if cRS is being used for distinct - the two reduce sinks are incompatible
-      if (cConf.getDistinctColumnIndices().size() >= 2) {
-        return null;
-      }
-      Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
-      if (moveReducerNumTo == null ||
-          moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
-        return null;
-      }
-      List<ExprNodeDesc> ckeys = cConf.getKeyCols();
-      List<ExprNodeDesc> pkeys = pConf.getKeyCols();
-      Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
-      if (moveKeyColTo == null) {
-        return null;
-      }
-      List<ExprNodeDesc> cpars = cConf.getPartitionCols();
-      List<ExprNodeDesc> ppars = pConf.getPartitionCols();
-      Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
-      if (movePartitionColTo == null) {
-        return null;
-      }
-      Integer moveNumDistKeyTo = checkNumDistributionKey(cConf.getNumDistributionKeys(),
-          pConf.getNumDistributionKeys());
-      return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo,
-          moveReducerNumTo, moveNumDistKeyTo};
-    }
-
-    private boolean isStrictEqualityNeeded(ReduceSinkOperator cRS, ReduceSinkOperator pRS) {
-      Operator<? extends OperatorDesc> parent = cRS.getParentOperators().get(0);
-      while (parent != pRS) {
-        assert parent.getNumParent() == 1;
-        if (parent instanceof PTFOperator) {
-          return true;
-        }
-        parent = parent.getParentOperators().get(0);
-      }
-      return false;
-    }
-
-    private Integer checkNumDistributionKey(int cnd, int pnd) {
-      // number of distribution keys of cRS is chosen only when numDistKeys of pRS
-      // is 0 or less. In all other cases, distribution of the keys is based on
-      // the pRS which is more generic than cRS.
-      // Examples:
-      // case 1: if pRS sort key is (a, b) and cRS sort key is (a, b, c) and number of
-      // distribution keys are 2 and 3 resp. then after merge the sort keys will
-      // be (a, b, c) while the number of distribution keys will be 2.
-      // case 2: if pRS sort key is empty and number of distribution keys is 0
-      // and if cRS sort key is (a, b) and number of distribution keys is 2 then
-      // after merge new sort key will be (a, b) and number of distribution keys
-      // will be 2.
-      if (pnd <= 0) {
-        return 1;
-      }
-      return 0;
-    }
-
-    /**
-     * Overlapping part of keys should be the same between parent and child.
-     * And if child has more keys than parent, non-overlapping part of keys
-     * should be backtrackable to parent.
-     */
-    private Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
-        ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
-      // If ckeys or pkeys have constant node expressions avoid the merge.
-      for (ExprNodeDesc ck : ckeys) {
-        if (ck instanceof ExprNodeConstantDesc) {
-          return null;
-        }
-      }
-      for (ExprNodeDesc pk : pkeys) {
-        if (pk instanceof ExprNodeConstantDesc) {
-          return null;
-        }
-      }
-
-      Integer moveKeyColTo = 0;
-      if (ckeys == null || ckeys.isEmpty()) {
-        if (pkeys != null && !pkeys.isEmpty()) {
-          moveKeyColTo = -1;
-        }
-      } else {
-        if (pkeys == null || pkeys.isEmpty()) {
-          for (ExprNodeDesc ckey : ckeys) {
-            if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
-              // cKey is not present in parent
-              return null;
-            }
-          }
-          moveKeyColTo = 1;
-        } else {
-          moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
-        }
-      }
-      return moveKeyColTo;
-    }
-
-    // backtrack key exprs of child to parent and compare it with parent's
-    protected Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
-        Operator<?> child, Operator<?> parent) throws SemanticException {
-      int common = Math.min(cexprs.size(), pexprs.size());
-      int limit = Math.max(cexprs.size(), pexprs.size());
-      int i = 0;
-      for (; i < common; i++) {
-        ExprNodeDesc pexpr = pexprs.get(i);
-        ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
-        if (cexpr == null || !pexpr.isSame(cexpr)) {
-          return null;
-        }
-      }
-      for (; i < limit; i++) {
-        if (cexprs.size() > pexprs.size()) {
-          if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
-            // cKey is not present in parent
-            return null;
-          }
-        }
-      }
-      return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
-    }
-
-    protected Integer checkOrder(boolean checkStrictEquality, String corder, String porder,
-            String cNullOrder, String pNullOrder) {
-      assert corder.length() == cNullOrder.length();
-      assert porder.length() == pNullOrder.length();
-      if (corder == null || corder.trim().equals("")) {
-        if (porder == null || porder.trim().equals("")) {
-          return 0;
-        }
-        return -1;
-      }
-      if (porder == null || porder.trim().equals("")) {
-        return 1;
-      }
-      corder = corder.trim();
-      porder = porder.trim();
-      if (checkStrictEquality) {
-        // order of overlapping keys should be exactly the same
-        cNullOrder = cNullOrder.trim();
-        pNullOrder = pNullOrder.trim();
-        int target = Math.min(corder.length(), porder.length());
-        if (!corder.substring(0, target).equals(porder.substring(0, target)) ||
-                !cNullOrder.substring(0, target).equals(pNullOrder.substring(0, target))) {
-          return null;
-        }
-      }
-      return Integer.valueOf(corder.length()).compareTo(porder.length());
-    }
-
-    /**
-     * If number of reducers for RS is -1, the RS can have any number of reducers.
-     * It's generally true except for order-by or forced bucketing cases.
-     * if both of num-reducers are not -1, those number should be the same.
-     */
-    protected Integer checkNumReducer(int creduce, int preduce) {
-      if (creduce < 0) {
-        if (preduce < 0) {
-          return 0;
-        }
-        return -1;
-      }
-      if (preduce < 0) {
-        return 1;
-      }
-      if (creduce != preduce) {
-        return null;
-      }
-      return 0;
-    }
-
-    protected boolean aggressiveDedup(ReduceSinkOperator cRS, ReduceSinkOperator pRS,
-            ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException {
-      assert cRS.getNumParent() == 1;
-
-      ReduceSinkDesc cConf = cRS.getConf();
-      ReduceSinkDesc pConf = pRS.getConf();
-      List<ExprNodeDesc> cKeys = cConf.getKeyCols();
-      List<ExprNodeDesc> pKeys = pConf.getKeyCols();
-
-      // Check that in the path between cRS and pRS, there are only Select operators
-      // i.e. the sequence must be pRS-SEL*-cRS
-      Operator<? extends OperatorDesc> parent = cRS.getParentOperators().get(0);
-      while (parent != pRS) {
-        assert parent.getNumParent() == 1;
-        if (!(parent instanceof SelectOperator)) {
-          return false;
-        }
-        parent = parent.getParentOperators().get(0);
-      }
-
-      // If child keys are null or empty, we bail out
-      if (cKeys == null || cKeys.isEmpty()) {
-        return false;
-      }
-      // If parent keys are null or empty, we bail out
-      if (pKeys == null || pKeys.isEmpty()) {
-        return false;
-      }
-
-      // Backtrack key columns of cRS to pRS
-      // If we cannot backtrack any of the columns, bail out
-      List<ExprNodeDesc> cKeysInParentRS = ExprNodeDescUtils.backtrack(cKeys, cRS, pRS);
-      for (int i = 0; i < cKeysInParentRS.size(); i++) {
-        ExprNodeDesc pexpr = cKeysInParentRS.get(i);
-        if (pexpr == null) {
-          // We cannot backtrack the expression, we bail out
-          return false;
-        }
-      }
-      cRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(cKeysInParentRS, cRS, pRS));
-
-      // Backtrack partition columns of cRS to pRS
-      // If we cannot backtrack any of the columns, bail out
-      List<ExprNodeDesc> cPartitionInParentRS = ExprNodeDescUtils.backtrack(
-              cConf.getPartitionCols(), cRS, pRS);
-      for (int i = 0; i < cPartitionInParentRS.size(); i++) {
-        ExprNodeDesc pexpr = cPartitionInParentRS.get(i);
-        if (pexpr == null) {
-          // We cannot backtrack the expression, we bail out
-          return false;
-        }
-      }
-      cRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(cPartitionInParentRS, cRS, pRS));
-
-      // Backtrack value columns of cRS to pRS
-      // If we cannot backtrack any of the columns, bail out
-      List<ExprNodeDesc> cValueInParentRS = ExprNodeDescUtils.backtrack(
-              cConf.getValueCols(), cRS, pRS);
-      for (int i = 0; i < cValueInParentRS.size(); i++) {
-        ExprNodeDesc pexpr = cValueInParentRS.get(i);
-        if (pexpr == null) {
-          // We cannot backtrack the expression, we bail out
-          return false;
-        }
-      }
-      cRS.getConf().setValueCols(ExprNodeDescUtils.backtrack(cValueInParentRS, cRS, pRS));
-
-      // Backtrack bucket columns of cRS to pRS (if any)
-      // If we cannot backtrack any of the columns, bail out
-      if (cConf.getBucketCols() != null) {
-        List<ExprNodeDesc> cBucketInParentRS = ExprNodeDescUtils.backtrack(
-                cConf.getBucketCols(), cRS, pRS);
-        for (int i = 0; i < cBucketInParentRS.size(); i++) {
-          ExprNodeDesc pexpr = cBucketInParentRS.get(i);
-          if (pexpr == null) {
-            // We cannot backtrack the expression, we bail out
-            return false;
-          }
-        }
-        cRS.getConf().setBucketCols(ExprNodeDescUtils.backtrack(cBucketInParentRS, cRS, pRS));
-      }
-
-      // Update column expression map
-      for (Entry<String, ExprNodeDesc> e : cRS.getColumnExprMap().entrySet()) {
-        e.setValue(ExprNodeDescUtils.backtrack(e.getValue(), cRS, pRS));
-      }
-
-      // Replace pRS with cRS and remove operator sequence from pRS to cRS
-      // Recall that the sequence must be pRS-SEL*-cRS
-      parent = cRS.getParentOperators().get(0);
-      while (parent != pRS) {
-        dedupCtx.addRemovedOperator(parent);
-        parent = parent.getParentOperators().get(0);
-      }
-      dedupCtx.addRemovedOperator(pRS);
-      cRS.getParentOperators().clear();
-      for (Operator<? extends OperatorDesc> op : pRS.getParentOperators()) {
-        op.replaceChild(pRS, cRS);
-        cRS.getParentOperators().add(op);
-      }
-      pRS.getParentOperators().clear();
-      pRS.getChildOperators().clear();
-
-      return true;
-    }
   }
 
   static class GroupbyReducerProc extends AbsctractReducerReducerProc {
@@ -652,7 +190,7 @@ public class ReduceSinkDeDuplication extends Transform {
       ReduceSinkOperator pRS =
           CorrelationUtilities.findPossibleParent(
               pGBY, ReduceSinkOperator.class, dedupCtx.trustScript());
-      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+      if (pRS != null && ReduceSinkDeDuplicationUtils.merge(cRS, pRS, dedupCtx.minReducer())) {
         CorrelationUtilities.replaceReduceSinkWithSelectOperator(
             cRS, dedupCtx.getPctx(), dedupCtx);
         pRS.getConf().setDeduplicated(true);
@@ -675,7 +213,7 @@ public class ReduceSinkDeDuplication extends Transform {
       }
       ReduceSinkOperator pRS =
           CorrelationUtilities.getSingleParent(pGBY, ReduceSinkOperator.class);
-      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+      if (pRS != null && ReduceSinkDeDuplicationUtils.merge(cRS, pRS, dedupCtx.minReducer())) {
         CorrelationUtilities.removeReduceSinkForGroupBy(
             cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
         pRS.getConf().setDeduplicated(true);
@@ -693,7 +231,7 @@ public class ReduceSinkDeDuplication extends Transform {
         throws SemanticException {
       JoinOperator pJoin =
           CorrelationUtilities.findPossibleParent(cRS, JoinOperator.class, dedupCtx.trustScript());
-      if (pJoin != null && merge(cRS, pJoin, dedupCtx.minReducer())) {
+      if (pJoin != null && ReduceSinkDeDuplicationUtils.merge(cRS, pJoin, dedupCtx.minReducer())) {
         pJoin.getConf().setFixedAsSorted(true);
         CorrelationUtilities.replaceReduceSinkWithSelectOperator(
             cRS, dedupCtx.getPctx(), dedupCtx);
@@ -717,7 +255,7 @@ public class ReduceSinkDeDuplication extends Transform {
       JoinOperator pJoin =
           CorrelationUtilities.findPossibleParent(
               start, JoinOperator.class, dedupCtx.trustScript());
-      if (pJoin != null && merge(cRS, pJoin, dedupCtx.minReducer())) {
+      if (pJoin != null && ReduceSinkDeDuplicationUtils.merge(cRS, pJoin, dedupCtx.minReducer())) {
         pJoin.getConf().setFixedAsSorted(true);
         CorrelationUtilities.removeReduceSinkForGroupBy(
             cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
@@ -744,11 +282,11 @@ public class ReduceSinkDeDuplication extends Transform {
               cRS, ReduceSinkOperator.class, dedupCtx.trustScript());
       if (pRS != null) {
         // Try extended deduplication
-        if (aggressiveDedup(cRS, pRS, dedupCtx)) {
+        if (ReduceSinkDeDuplicationUtils.aggressiveDedup(cRS, pRS, dedupCtx)) {
           return true;
         }
         // Normal deduplication
-        if (merge(cRS, pRS, dedupCtx.minReducer())) {
+        if (ReduceSinkDeDuplicationUtils.merge(cRS, pRS, dedupCtx.minReducer())) {
           CorrelationUtilities.replaceReduceSinkWithSelectOperator(
               cRS, dedupCtx.getPctx(), dedupCtx);
           pRS.getConf().setDeduplicated(true);
@@ -767,7 +305,7 @@ public class ReduceSinkDeDuplication extends Transform {
       ReduceSinkOperator pRS =
           CorrelationUtilities.findPossibleParent(
               start, ReduceSinkOperator.class, dedupCtx.trustScript());
-      if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
+      if (pRS != null && ReduceSinkDeDuplicationUtils.merge(cRS, pRS, dedupCtx.minReducer())) {
         if (dedupCtx.getPctx().getConf().getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
           return false;
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
new file mode 100644
index 0000000..8f55369
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication.ReduceSinkDeduplicateProcCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+import com.google.common.collect.ImmutableList;
+
+public class ReduceSinkDeDuplicationUtils {
+
+  // for JOIN-RS case, it's not possible generally to merge if child has
+  // less key/partition columns than parents
+  public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
+      throws SemanticException {
+    List<Operator<?>> parents = pJoin.getParentOperators();
+    ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
+    ReduceSinkDesc cRSc = cRS.getConf();
+    for (ReduceSinkOperator pRSNs : pRSs) {
+      ReduceSinkDesc pRSNc = pRSNs.getConf();
+      if (cRSc.getKeyCols().size() != pRSNc.getKeyCols().size()) {
+        return false;
+      }
+      if (cRSc.getPartitionCols().size() != pRSNc.getPartitionCols().size()) {
+        return false;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRSNc.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
+        return false;
+      }
+
+      Integer moveRSOrderTo = checkOrder(true, cRSc.getOrder(), pRSNc.getOrder(),
+              cRSc.getNullOrder(), pRSNc.getNullOrder());
+      if (moveRSOrderTo == null) {
+        return false;
+      }
+    }
+
+    boolean[] sorted = CorrelationUtilities.getSortedTags(pJoin);
+
+    int cKeySize = cRSc.getKeyCols().size();
+    for (int i = 0; i < cKeySize; i++) {
+      ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
+      ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+      for (int tag = 0; tag < pRSs.length; tag++) {
+        pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
+      }
+      int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+      if (found != i) {
+        return false;
+      }
+    }
+    int cPartSize = cRSc.getPartitionCols().size();
+    for (int i = 0; i < cPartSize; i++) {
+      ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
+      ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+      for (int tag = 0; tag < pRSs.length; tag++) {
+        pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
+      }
+      int found = CorrelationUtilities.indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+      if (found != i) {
+        return false;
+      }
+    }
+
+    for (ReduceSinkOperator pRS : pRSs) {
+      pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+    }
+
+    return true;
+  }
+
+  /**
+   * Current RSDedup remove/replace child RS. For key columns,
+   * sorting order, and the number of reducers, copy
+   * more specific part of configurations of child RS to that of parent RS.
+   * For partitioning columns, if both child RS and parent RS have been assigned
+   * partitioning columns, we will choose the more general partitioning columns.
+   * If parent RS has not been assigned any partitioning column, we will use
+   * partitioning columns (if exist) of child RS.
+   */
+  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+      throws SemanticException {
+    int[] result = extractMergeDirections(cRS, pRS, minReducer);
+    if (result == null) {
+      return false;
+    }
+
+    if (result[0] > 0) {
+      // The sorting columns of the child RS are more specific than
+      // those of the parent RS. Assign sorting columns of the child RS
+      // to the parent RS.
+      List<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
+      pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
+    }
+
+    if (result[1] < 0) {
+      // The partitioning columns of the parent RS are more specific than
+      // those of the child RS.
+      List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+      if (childPCs != null && !childPCs.isEmpty()) {
+        // If partitioning columns of the child RS are assigned,
+        // assign these to the partitioning columns of the parent RS.
+        pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+      }
+    } else if (result[1] > 0) {
+      // The partitioning columns of the child RS are more specific than
+      // those of the parent RS.
+      List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+      if (parentPCs == null || parentPCs.isEmpty()) {
+        // If partitioning columns of the parent RS are not assigned,
+        // assign partitioning columns of the child RS to the parent RS.
+        ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+        pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+      }
+    }
+
+    if (result[2] > 0) {
+      // The sorting order of the child RS is more specific than
+      // that of the parent RS. Assign the sorting order of the child RS
+      // to the parent RS.
+      if (result[0] <= 0) {
+        // Sorting columns of the parent RS are more specific than those of the
+        // child RS but Sorting order of the child RS is more specific than
+        // that of the parent RS.
+        throw new SemanticException("Sorting columns and order don't match. " +
+            "Try set " + HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION + "=false;");
+      }
+      pRS.getConf().setOrder(cRS.getConf().getOrder());
+      pRS.getConf().setNullOrder(cRS.getConf().getNullOrder());
+    } else {
+      // The sorting order of the parent RS is more specific or they are equal.
+      // We will copy the order from the child RS, and then fill in the order
+      // of the rest of columns with the one taken from parent RS.
+      StringBuilder order = new StringBuilder(cRS.getConf().getOrder());
+      StringBuilder orderNull = new StringBuilder(cRS.getConf().getNullOrder());
+      order.append(pRS.getConf().getOrder().substring(order.length()));
+      orderNull.append(pRS.getConf().getNullOrder().substring(orderNull.length()));
+      pRS.getConf().setOrder(order.toString());
+      pRS.getConf().setNullOrder(orderNull.toString());
+    }
+
+    if (result[3] > 0) {
+      // The number of reducers of the child RS is more specific than
+      // that of the parent RS. Assign the number of reducers of the child RS
+      // to the parent RS.
+      pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+    }
+
+    if (result[4] > 0) {
+      // This case happens only when pRS key is empty in which case we can use
+      // number of distribution keys and key serialization info from cRS
+      if (pRS.getConf().getKeyCols() != null && pRS.getConf().getKeyCols().size() == 0
+          && cRS.getConf().getKeyCols() != null && cRS.getConf().getKeyCols().size() == 0) {
+        // As setNumDistributionKeys is a subset of keycols, the size should
+        // be 0 too. This condition maybe too strict. We may extend it in the
+        // future.
+        TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(new ArrayList<FieldSchema>(), pRS
+            .getConf().getOrder(), pRS.getConf().getNullOrder());
+        pRS.getConf().setKeySerializeInfo(keyTable);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This is a more strict version of the merge check, where:
+   * - cRS and pRS should have exactly the same keys in the same positions, and
+   * - cRS and pRS should have exactly the same partition columns in the same positions, and
+   * - cRS and pRS should have exactly the same bucket columns in the same positions, and
+   * - cRS and pRS should sort in the same direction
+   */
+  public static boolean strictMerge(ReduceSinkOperator cRS, ReduceSinkOperator pRS)
+          throws SemanticException {
+    return strictMerge(cRS, ImmutableList.of(pRS));
+  }
+  
+  public static boolean strictMerge(ReduceSinkOperator cRS, List<ReduceSinkOperator> pRSs)
+          throws SemanticException {
+    ReduceSinkDesc cRSc = cRS.getConf();
+    for (ReduceSinkOperator pRS : pRSs) {
+      ReduceSinkDesc pRSc = pRS.getConf();
+      if (cRSc.getKeyCols().size() != pRSc.getKeyCols().size()) {
+        return false;
+      }
+      if (cRSc.getPartitionCols().size() != pRSc.getPartitionCols().size()) {
+        return false;
+      }
+
+      Integer moveRSOrderTo = checkOrder(true, cRSc.getOrder(), pRSc.getOrder(),
+              cRSc.getNullOrder(), pRSc.getNullOrder());
+      if (moveRSOrderTo == null) {
+        return false;
+      }
+  
+      int cKeySize = cRSc.getKeyCols().size();
+      for (int i = 0; i < cKeySize; i++) {
+        ExprNodeDesc cExpr = cRSc.getKeyCols().get(i);
+        ExprNodeDesc pExpr = pRSc.getKeyCols().get(i);
+        if (cExpr instanceof ExprNodeConstantDesc || pExpr instanceof ExprNodeConstantDesc) {
+          // If ckeys or pkeys have constant node expressions avoid the merge.
+          return false;
+        }
+        ExprNodeDesc backtrackCExpr = ExprNodeDescUtils.backtrack(cExpr, cRS, pRS);
+        if (backtrackCExpr == null || !pExpr.isSame(backtrackCExpr)) {
+          return false;
+        }
+      }
+  
+      int cPartSize = cRSc.getPartitionCols().size();
+      for (int i = 0; i < cPartSize; i++) {
+        ExprNodeDesc cExpr = cRSc.getPartitionCols().get(i);
+        ExprNodeDesc pExpr = pRSc.getPartitionCols().get(i);
+        if (cExpr instanceof ExprNodeConstantDesc || pExpr instanceof ExprNodeConstantDesc) {
+          // If cpartcols or ppartcols have constant node expressions avoid the merge.
+          return false;
+        }
+        ExprNodeDesc backtrackCExpr = ExprNodeDescUtils.backtrack(cExpr, cRS, pRS);
+        if (backtrackCExpr == null || !pExpr.isSame(backtrackCExpr)) {
+          return false;
+        }
+      }
+
+      if (cRSc.getBucketCols() != null || pRSc.getBucketCols() != null) {
+        if (cRSc.getBucketCols() == null || pRSc.getBucketCols() == null) {
+          return false;
+        }
+        if (cRSc.getBucketCols().size() != pRSc.getBucketCols().size()) {
+          return false;
+        }
+        int cBucketColsSize = cRSc.getBucketCols().size();
+        for (int i = 0; i < cBucketColsSize; i++) {
+          ExprNodeDesc cExpr = cRSc.getBucketCols().get(i);
+          ExprNodeDesc pExpr = pRSc.getBucketCols().get(i);
+          if (cExpr instanceof ExprNodeConstantDesc || pExpr instanceof ExprNodeConstantDesc) {
+            // If cbucketcols or pbucketcols have constant node expressions avoid the merge.
+            return false;
+          }
+          ExprNodeDesc backtrackCExpr = ExprNodeDescUtils.backtrack(cExpr, cRS, pRS);
+          if (backtrackCExpr == null || !pExpr.isSame(backtrackCExpr)) {
+            return false;
+          }
+        }
+      }
+
+      // Meets all requirements
+      return true;
+    }
+
+    // Default
+    return false;
+  }
+
+  /**
+   * Returns merge directions between two RSs for criterias (ordering, number of reducers,
+   * reducer keys, partition keys). Returns null if any of categories is not mergeable.
+   *
+   * Values for each index can be -1, 0, 1
+   * 1. 0 means two configuration in the category is the same
+   * 2. for -1, configuration of parent RS is more specific than child RS
+   * 3. for 1, configuration of child RS is more specific than parent RS
+   */
+  private static int[] extractMergeDirections(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+      throws SemanticException {
+    ReduceSinkDesc cConf = cRS.getConf();
+    ReduceSinkDesc pConf = pRS.getConf();
+    // If there is a PTF between cRS and pRS we cannot ignore the order direction
+    final boolean checkStrictEquality = isStrictEqualityNeeded(cRS, pRS);
+    Integer moveRSOrderTo = checkOrder(checkStrictEquality, cConf.getOrder(), pConf.getOrder(),
+            cConf.getNullOrder(), pConf.getNullOrder());
+    if (moveRSOrderTo == null) {
+      return null;
+    }
+    // if cRS is being used for distinct - the two reduce sinks are incompatible
+    if (cConf.getDistinctColumnIndices().size() >= 2) {
+      return null;
+    }
+    Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
+    if (moveReducerNumTo == null ||
+        moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
+      return null;
+    }
+    List<ExprNodeDesc> ckeys = cConf.getKeyCols();
+    List<ExprNodeDesc> pkeys = pConf.getKeyCols();
+    Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
+    if (moveKeyColTo == null) {
+      return null;
+    }
+    List<ExprNodeDesc> cpars = cConf.getPartitionCols();
+    List<ExprNodeDesc> ppars = pConf.getPartitionCols();
+    Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
+    if (movePartitionColTo == null) {
+      return null;
+    }
+    Integer moveNumDistKeyTo = checkNumDistributionKey(cConf.getNumDistributionKeys(),
+        pConf.getNumDistributionKeys());
+    return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo,
+        moveReducerNumTo, moveNumDistKeyTo};
+  }
+
+  private static boolean isStrictEqualityNeeded(ReduceSinkOperator cRS, ReduceSinkOperator pRS) {
+    Operator<? extends OperatorDesc> parent = cRS.getParentOperators().get(0);
+    while (parent != pRS) {
+      assert parent.getNumParent() == 1;
+      if (parent instanceof PTFOperator) {
+        return true;
+      }
+      parent = parent.getParentOperators().get(0);
+    }
+    return false;
+  }
+
+  private static Integer checkNumDistributionKey(int cnd, int pnd) {
+    // number of distribution keys of cRS is chosen only when numDistKeys of pRS
+    // is 0 or less. In all other cases, distribution of the keys is based on
+    // the pRS which is more generic than cRS.
+    // Examples:
+    // case 1: if pRS sort key is (a, b) and cRS sort key is (a, b, c) and number of
+    // distribution keys are 2 and 3 resp. then after merge the sort keys will
+    // be (a, b, c) while the number of distribution keys will be 2.
+    // case 2: if pRS sort key is empty and number of distribution keys is 0
+    // and if cRS sort key is (a, b) and number of distribution keys is 2 then
+    // after merge new sort key will be (a, b) and number of distribution keys
+    // will be 2.
+    if (pnd <= 0) {
+      return 1;
+    }
+    return 0;
+  }
+
+  /**
+   * Overlapping part of keys should be the same between parent and child.
+   * And if child has more keys than parent, non-overlapping part of keys
+   * should be backtrackable to parent.
+   */
+  private static Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
+      ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
+    // If ckeys or pkeys have constant node expressions avoid the merge.
+    for (ExprNodeDesc ck : ckeys) {
+      if (ck instanceof ExprNodeConstantDesc) {
+        return null;
+      }
+    }
+    for (ExprNodeDesc pk : pkeys) {
+      if (pk instanceof ExprNodeConstantDesc) {
+        return null;
+      }
+    }
+
+    Integer moveKeyColTo = 0;
+    if (ckeys == null || ckeys.isEmpty()) {
+      if (pkeys != null && !pkeys.isEmpty()) {
+        moveKeyColTo = -1;
+      }
+    } else {
+      if (pkeys == null || pkeys.isEmpty()) {
+        for (ExprNodeDesc ckey : ckeys) {
+          if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
+            // cKey is not present in parent
+            return null;
+          }
+        }
+        moveKeyColTo = 1;
+      } else {
+        moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
+      }
+    }
+    return moveKeyColTo;
+  }
+
+  // backtrack key exprs of child to parent and compare it with parent's
+  protected static Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
+      Operator<?> child, Operator<?> parent) throws SemanticException {
+    int common = Math.min(cexprs.size(), pexprs.size());
+    int limit = Math.max(cexprs.size(), pexprs.size());
+    int i = 0;
+    for (; i < common; i++) {
+      ExprNodeDesc pexpr = pexprs.get(i);
+      ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
+      if (cexpr == null || !pexpr.isSame(cexpr)) {
+        return null;
+      }
+    }
+    for (; i < limit; i++) {
+      if (cexprs.size() > pexprs.size()) {
+        if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
+          // cKey is not present in parent
+          return null;
+        }
+      }
+    }
+    return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
+  }
+
+  protected static Integer checkOrder(boolean checkStrictEquality, String corder, String porder,
+          String cNullOrder, String pNullOrder) {
+    assert corder.length() == cNullOrder.length();
+    assert porder.length() == pNullOrder.length();
+    if (corder == null || corder.trim().equals("")) {
+      if (porder == null || porder.trim().equals("")) {
+        return 0;
+      }
+      return -1;
+    }
+    if (porder == null || porder.trim().equals("")) {
+      return 1;
+    }
+    corder = corder.trim();
+    porder = porder.trim();
+    if (checkStrictEquality) {
+      // order of overlapping keys should be exactly the same
+      cNullOrder = cNullOrder.trim();
+      pNullOrder = pNullOrder.trim();
+      int target = Math.min(corder.length(), porder.length());
+      if (!corder.substring(0, target).equals(porder.substring(0, target)) ||
+              !cNullOrder.substring(0, target).equals(pNullOrder.substring(0, target))) {
+        return null;
+      }
+    }
+    return Integer.valueOf(corder.length()).compareTo(porder.length());
+  }
+
+  /**
+   * If number of reducers for RS is -1, the RS can have any number of reducers.
+   * It's generally true except for order-by or forced bucketing cases.
+   * if both of num-reducers are not -1, those number should be the same.
+   */
+  protected static Integer checkNumReducer(int creduce, int preduce) {
+    if (creduce < 0) {
+      if (preduce < 0) {
+        return 0;
+      }
+      return -1;
+    }
+    if (preduce < 0) {
+      return 1;
+    }
+    if (creduce != preduce) {
+      return null;
+    }
+    return 0;
+  }
+
+  protected static boolean aggressiveDedup(ReduceSinkOperator cRS, ReduceSinkOperator pRS,
+          ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException {
+    assert cRS.getNumParent() == 1;
+
+    ReduceSinkDesc cConf = cRS.getConf();
+    ReduceSinkDesc pConf = pRS.getConf();
+    List<ExprNodeDesc> cKeys = cConf.getKeyCols();
+    List<ExprNodeDesc> pKeys = pConf.getKeyCols();
+
+    // Check that in the path between cRS and pRS, there are only Select operators
+    // i.e. the sequence must be pRS-SEL*-cRS
+    Operator<? extends OperatorDesc> parent = cRS.getParentOperators().get(0);
+    while (parent != pRS) {
+      assert parent.getNumParent() == 1;
+      if (!(parent instanceof SelectOperator)) {
+        return false;
+      }
+      parent = parent.getParentOperators().get(0);
+    }
+
+    // If child keys are null or empty, we bail out
+    if (cKeys == null || cKeys.isEmpty()) {
+      return false;
+    }
+    // If parent keys are null or empty, we bail out
+    if (pKeys == null || pKeys.isEmpty()) {
+      return false;
+    }
+
+    // Backtrack key columns of cRS to pRS
+    // If we cannot backtrack any of the columns, bail out
+    List<ExprNodeDesc> cKeysInParentRS = ExprNodeDescUtils.backtrack(cKeys, cRS, pRS);
+    for (int i = 0; i < cKeysInParentRS.size(); i++) {
+      ExprNodeDesc pexpr = cKeysInParentRS.get(i);
+      if (pexpr == null) {
+        // We cannot backtrack the expression, we bail out
+        return false;
+      }
+    }
+    cRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(cKeysInParentRS, cRS, pRS));
+
+    // Backtrack partition columns of cRS to pRS
+    // If we cannot backtrack any of the columns, bail out
+    List<ExprNodeDesc> cPartitionInParentRS = ExprNodeDescUtils.backtrack(
+            cConf.getPartitionCols(), cRS, pRS);
+    for (int i = 0; i < cPartitionInParentRS.size(); i++) {
+      ExprNodeDesc pexpr = cPartitionInParentRS.get(i);
+      if (pexpr == null) {
+        // We cannot backtrack the expression, we bail out
+        return false;
+      }
+    }
+    cRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(cPartitionInParentRS, cRS, pRS));
+
+    // Backtrack value columns of cRS to pRS
+    // If we cannot backtrack any of the columns, bail out
+    List<ExprNodeDesc> cValueInParentRS = ExprNodeDescUtils.backtrack(
+            cConf.getValueCols(), cRS, pRS);
+    for (int i = 0; i < cValueInParentRS.size(); i++) {
+      ExprNodeDesc pexpr = cValueInParentRS.get(i);
+      if (pexpr == null) {
+        // We cannot backtrack the expression, we bail out
+        return false;
+      }
+    }
+    cRS.getConf().setValueCols(ExprNodeDescUtils.backtrack(cValueInParentRS, cRS, pRS));
+
+    // Backtrack bucket columns of cRS to pRS (if any)
+    // If we cannot backtrack any of the columns, bail out
+    if (cConf.getBucketCols() != null) {
+      List<ExprNodeDesc> cBucketInParentRS = ExprNodeDescUtils.backtrack(
+              cConf.getBucketCols(), cRS, pRS);
+      for (int i = 0; i < cBucketInParentRS.size(); i++) {
+        ExprNodeDesc pexpr = cBucketInParentRS.get(i);
+        if (pexpr == null) {
+          // We cannot backtrack the expression, we bail out
+          return false;
+        }
+      }
+      cRS.getConf().setBucketCols(ExprNodeDescUtils.backtrack(cBucketInParentRS, cRS, pRS));
+    }
+
+    // Update column expression map
+    for (Entry<String, ExprNodeDesc> e : cRS.getColumnExprMap().entrySet()) {
+      e.setValue(ExprNodeDescUtils.backtrack(e.getValue(), cRS, pRS));
+    }
+
+    // Replace pRS with cRS and remove operator sequence from pRS to cRS
+    // Recall that the sequence must be pRS-SEL*-cRS
+    parent = cRS.getParentOperators().get(0);
+    while (parent != pRS) {
+      dedupCtx.addRemovedOperator(parent);
+      parent = parent.getParentOperators().get(0);
+    }
+    dedupCtx.addRemovedOperator(pRS);
+    cRS.getParentOperators().clear();
+    for (Operator<? extends OperatorDesc> op : pRS.getParentOperators()) {
+      op.replaceChild(pRS, cRS);
+      cRS.getParentOperators().add(op);
+    }
+    pRS.getParentOperators().clear();
+    pRS.getChildOperators().clear();
+
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkJoinDeDuplication.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkJoinDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkJoinDeDuplication.java
new file mode 100644
index 0000000..d27320b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkJoinDeDuplication.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.correlation;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Optimization to check whether any ReduceSink operator in the plan can be
+ * simplified so data is not shuffled/sorted if it is already shuffled/sorted.
+ *
+ * This optimization is executed after join algorithm selection logic has run,
+ * and it is intended to optimize new cases that cannot be optimized when
+ * {@link ReduceSinkDeDuplication} runs because some physical algorithms have
+ * not been selected. Instead of removing ReduceSink operators from the plan,
+ * they will be tagged, and then the execution plan compiler might take action,
+ * e.g., on Tez, ReduceSink operators that just need to forward data will be
+ * translated into a ONE-TO-ONE edge. The parallelism degree of these ReduceSink
+ * operators might be adjusted, as a ReduceSink operator that just forwards data
+ * cannot alter the degree of parallelism of the previous task.
+ */
+public class ReduceSinkJoinDeDuplication extends Transform {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ReduceSinkJoinDeDuplication.class);
+
+  protected ParseContext pGraphContext;
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    pGraphContext = pctx;
+
+    ReduceSinkJoinDeDuplicateProcCtx cppCtx = new ReduceSinkJoinDeDuplicateProcCtx(pGraphContext);
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"),
+        ReduceSinkJoinDeDuplicateProcFactory.getReducerMapJoinProc());
+
+    Dispatcher disp = new DefaultRuleDispatcher(
+        ReduceSinkJoinDeDuplicateProcFactory.getDefaultProc(), opRules, cppCtx);
+    GraphWalker ogw = new ForwardWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pGraphContext.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pGraphContext;
+  }
+
+  protected class ReduceSinkJoinDeDuplicateProcCtx extends AbstractCorrelationProcCtx {
+
+    public ReduceSinkJoinDeDuplicateProcCtx(ParseContext pctx) {
+      super(pctx);
+    }
+  }
+
+  static class ReduceSinkJoinDeDuplicateProcFactory {
+
+    public static NodeProcessor getReducerMapJoinProc() {
+      return new ReducerProc();
+    }
+
+    public static NodeProcessor getDefaultProc() {
+      return new DefaultProc();
+    }
+  }
+
+  /*
+   * do nothing.
+   */
+  static class DefaultProc implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      return null;
+    }
+  }
+
+  static class ReducerProc implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      ReduceSinkJoinDeDuplicateProcCtx dedupCtx = (ReduceSinkJoinDeDuplicateProcCtx) procCtx;
+      ReduceSinkOperator cRS = (ReduceSinkOperator) nd;
+      if (cRS.getConf().isForwarding()) {
+        // Already set
+        return false;
+      }
+      if (cRS.getConf().getKeyCols().isEmpty()) {
+        // Not supported
+        return false;
+      }
+      boolean onlyPartitioning = false;
+      Operator<?> cRSChild = cRS.getChildOperators().get(0);
+      if (cRSChild instanceof MapJoinOperator ||
+              cRSChild instanceof CommonMergeJoinOperator) {
+        // If it is a MapJoin or MergeJoin, we make sure that they are on
+        // the reduce side, otherwise we bail out
+        for (Operator<?> parent: cRSChild.getParentOperators()) {
+          if (!(parent instanceof ReduceSinkOperator)) {
+            // MapJoin and SMBJoin not supported
+            return false;
+          }
+        }
+        if (cRSChild instanceof MapJoinOperator) {
+          onlyPartitioning = true;
+        }
+      }
+
+      int maxNumReducers = cRS.getConf().getNumReducers();
+      ReduceSinkOperator pRS;
+      if (onlyPartitioning) {
+        pRS = CorrelationUtilities.findFirstPossibleParent(
+            cRS, ReduceSinkOperator.class, dedupCtx.trustScript());
+      } else {
+        pRS = CorrelationUtilities.findFirstPossibleParentPreserveSortOrder(
+            cRS, ReduceSinkOperator.class, dedupCtx.trustScript());
+      }
+      if (pRS != null) {
+        Operator<?> pRSChild = pRS.getChildOperators().get(0);
+        if (pRSChild instanceof MapJoinOperator) {
+          // Handle MapJoin specially and check for all its children
+          MapJoinOperator pRSChildMJ = (MapJoinOperator) pRSChild;
+          // In this case, both should be DHJ operators as pRSChildMJ can only guarantee
+          // partitioned input, not sorted.
+          if (!pRSChildMJ.getConf().isDynamicPartitionHashJoin() ||
+                  !(cRSChild instanceof MapJoinOperator) ||
+                  !((MapJoinOperator) cRSChild).getConf().isDynamicPartitionHashJoin()) {
+            return false;
+          }
+          ImmutableList.Builder<ReduceSinkOperator> l = ImmutableList.builder();
+          for (Operator<?> parent: pRSChild.getParentOperators()) {
+            ReduceSinkOperator rsOp = (ReduceSinkOperator) parent;
+            l.add(rsOp);
+            if (rsOp.getConf().getNumReducers() > maxNumReducers) {
+              maxNumReducers = rsOp.getConf().getNumReducers();
+            }
+          }
+          if (ReduceSinkDeDuplicationUtils.strictMerge(cRS, l.build())) {
+            LOG.debug("Set {} to forward data", cRS);
+            cRS.getConf().setForwarding(true);
+            propagateMaxNumReducers(dedupCtx, cRS, maxNumReducers);
+            return true;
+          }
+        } else if (pRS.getChildOperators().get(0) instanceof CommonMergeJoinOperator) {
+          // Handle MergeJoin specially and check for all its children
+          ImmutableList.Builder<ReduceSinkOperator> l = ImmutableList.builder();
+          for (Operator<?> parent: pRSChild.getParentOperators()) {
+            if (!(parent instanceof ReduceSinkOperator)) {
+              // SMBJoin not supported
+              return false;
+            }
+            ReduceSinkOperator rsOp = (ReduceSinkOperator) parent;
+            l.add(rsOp);
+            if (rsOp.getConf().getNumReducers() > maxNumReducers) {
+              maxNumReducers = rsOp.getConf().getNumReducers();
+            }
+          }
+          if (ReduceSinkDeDuplicationUtils.strictMerge(cRS, l.build())) {
+            LOG.debug("Set {} to forward data", cRS);
+            cRS.getConf().setForwarding(true);
+            propagateMaxNumReducers(dedupCtx, cRS, maxNumReducers);
+            return true;
+          }
+        } else {
+          // Rest of cases
+          if (pRS.getConf().getNumReducers() > maxNumReducers) {
+            maxNumReducers = pRS.getConf().getNumReducers();
+          }
+          if (ReduceSinkDeDuplicationUtils.strictMerge(cRS, pRS)) {
+            LOG.debug("Set {} to forward data", cRS);
+            cRS.getConf().setForwarding(true);
+            propagateMaxNumReducers(dedupCtx, cRS, maxNumReducers);
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+
+    private static void propagateMaxNumReducers(ReduceSinkJoinDeDuplicateProcCtx dedupCtx,
+            ReduceSinkOperator rsOp, int maxNumReducers) throws SemanticException {
+      if (rsOp == null) {
+        // Bail out
+        return;
+      }
+      if (rsOp.getChildOperators().get(0) instanceof MapJoinOperator ||
+              rsOp.getChildOperators().get(0) instanceof CommonMergeJoinOperator) {
+        for (Operator<?> p : rsOp.getChildOperators().get(0).getParentOperators()) {
+          ReduceSinkOperator pRSOp = (ReduceSinkOperator) p;
+          pRSOp.getConf().setReducerTraits(EnumSet.of(ReducerTraits.FIXED));
+          pRSOp.getConf().setNumReducers(maxNumReducers);
+          LOG.debug("Set {} to FIXED parallelism: {}", pRSOp, maxNumReducers);
+          if (pRSOp.getConf().isForwarding()) {
+            ReduceSinkOperator newRSOp =
+                CorrelationUtilities.findFirstPossibleParent(
+                    pRSOp, ReduceSinkOperator.class, dedupCtx.trustScript());
+            propagateMaxNumReducers(dedupCtx, newRSOp, maxNumReducers);
+          }
+        }
+      } else {
+        rsOp.getConf().setReducerTraits(EnumSet.of(ReducerTraits.FIXED));
+        rsOp.getConf().setNumReducers(maxNumReducers);
+        LOG.debug("Set {} to FIXED parallelism: {}", rsOp, maxNumReducers);
+        if (rsOp.getConf().isForwarding()) {
+          ReduceSinkOperator newRSOp =
+              CorrelationUtilities.findFirstPossibleParent(
+                  rsOp, ReduceSinkOperator.class, dedupCtx.trustScript());
+          propagateMaxNumReducers(dedupCtx, newRSOp, maxNumReducers);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 6423a6d..1b0a2f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -502,6 +502,9 @@ public class GenTezUtils {
   }
 
   public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork followingWork, ReduceSinkOperator reduceSinkOperator) {
+    if(reduceSinkOperator.getConf().isForwarding()) {
+      return EdgeType.ONE_TO_ONE_EDGE;
+    }
     if (followingWork instanceof ReduceWork) {
       // Ideally there should be a better way to determine that the followingWork contains
       // a dynamic partitioned hash join, but in some cases (createReduceWork()) it looks like

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 888627a..5614c26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
 import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
 import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication;
 import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
@@ -154,6 +155,16 @@ public class TezCompiler extends TaskCompiler {
     runStatsDependentOptimizations(procCtx, inputs, outputs);
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run the optimizations that use stats for optimization");
 
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    if(procCtx.conf.getBoolVar(ConfVars.HIVEOPTJOINREDUCEDEDUPLICATION)) {
+      new ReduceSinkJoinDeDuplication().transform(procCtx.parseContext);
+    }
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run reduce sink after join algorithm selection");
+
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    runRemoveDynamicPruningOptimization(procCtx, inputs, outputs);
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run remove dynamic pruning by size");
+
     // Removing semijoin optimization when it may not be beneficial
     removeSemijoinOptimizationByBenefit(procCtx);
 
@@ -414,6 +425,25 @@ public class TezCompiler extends TaskCompiler {
     opRules.put(new RuleRegExp("Convert Join to Map-join",
         JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
 
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new ForwardWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
+
+  private void runRemoveDynamicPruningOptimization(OptimizeTezProcContext procCtx,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+    // Sequence of TableScan operators to be walked
+    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    deque.addAll(procCtx.parseContext.getTopOps().values());
+
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack.
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(
         new RuleRegExp("Remove dynamic pruning by size",
         AppMasterEventOperator.getOperatorName() + "%"),

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 24636c5..d08e700 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -98,6 +98,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
   //flag used to control how TopN handled for PTF/Windowing partitions.
   private boolean isPTFReduceSink = false; 
   private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable
+  private boolean forwarding; // Whether this RS can forward records directly instead of shuffling/sorting
 
   public static enum ReducerTraits {
     UNSET(0), // unset
@@ -432,6 +433,14 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return skipTag;
   }
 
+  public void setForwarding(boolean forwarding) {
+    this.forwarding = forwarding;
+  }
+
+  public boolean isForwarding() {
+    return forwarding;
+  }
+
   @Explain(displayName = "auto parallelism", explainLevels = { Level.EXTENDED })
   public final boolean isAutoParallel() {
     return (this.reduceTraits.contains(ReducerTraits.AUTOPARALLEL));

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
index a55c708..b695f0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
@@ -28,6 +28,7 @@ public class TezEdgeProperty {
     CONTAINS,
     CUSTOM_EDGE,
     CUSTOM_SIMPLE_EDGE,
+    ONE_TO_ONE_EDGE
   }
 
   private HiveConf hiveConf;
@@ -102,4 +103,8 @@ public class TezEdgeProperty {
   public void setSlowStart(boolean slowStart) {
     this.isSlowStart = slowStart;
   }
+
+  public void setEdgeType(EdgeType type) {
+    this.edgeType = type;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/queries/clientpositive/partialdhj.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/partialdhj.q b/ql/src/test/queries/clientpositive/partialdhj.q
new file mode 100644
index 0000000..c4fe1f7
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/partialdhj.q
@@ -0,0 +1,54 @@
+set hive.auto.convert.join=true;
+set hive.strict.checks.cartesian.product=false;
+set hive.merge.nway.joins=false;
+set hive.optimize.shared.work=false;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.hashtable.max.entries=10;
+
+-- ONE_TO_ONE ON BIG TABLE SIDE
+EXPLAIN
+SELECT *
+FROM (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+JOIN src
+ON (a.value = src.value);
+
+SELECT *
+FROM (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+JOIN src
+ON (a.value = src.value);
+
+-- ONE_TO_ONE ON SMALL TABLE SIDE
+EXPLAIN
+SELECT *
+FROM src
+JOIN (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+ON (src.value = a.value);
+
+SELECT *
+FROM src
+JOIN (
+  SELECT a.value
+  FROM src1 a
+  JOIN src1 b
+  ON (a.value = b.value)
+  GROUP BY a.value
+) a
+ON (src.value = a.value);

http://git-wip-us.apache.org/repos/asf/hive/blob/a96d9f71/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out b/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
index c563976..8bc280c 100644
--- a/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
@@ -322,7 +322,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (ONE_TO_ONE_EDGE), Reducer 6 (ONE_TO_ONE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices: