You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2018/08/27 09:54:04 UTC

[4/4] hive git commit: HIVE-20439: Use the inflated memory limit during join selection for llap (Zoltan Haindrich reviewed by Ashutosh Chauhan)

HIVE-20439: Use the inflated memory limit during join selection for llap (Zoltan Haindrich reviewed by Ashutosh Chauhan)

Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb7a676b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb7a676b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb7a676b

Branch: refs/heads/master
Commit: fb7a676b3f6baa3d156b3e7b3d1961a83bb7d698
Parents: f5e62eb
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Mon Aug 27 11:04:07 2018 +0200
Committer: Zoltan Haindrich <ki...@rxd.hu>
Committed: Mon Aug 27 11:53:49 2018 +0200

----------------------------------------------------------------------
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |   45 +-
 .../hadoop/hive/ql/exec/TestOperators.java      |   17 +-
 .../clientpositive/bucket_map_join_tez2.q       |    4 +-
 .../bucketsortoptimize_insert_6.q               |    2 +-
 .../queries/clientpositive/join32_lessSize.q    |    2 +-
 .../test/queries/clientpositive/tez_smb_main.q  |    4 +-
 .../queries/clientpositive/unionDistinct_1.q    |    2 +-
 .../results/clientpositive/llap/orc_llap.q.out  |   59 +-
 .../clientpositive/spark/join32_lessSize.q.out  | 1159 +++++++++---------
 9 files changed, 678 insertions(+), 616 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 4145baf..52855e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -87,6 +87,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName());
   private float hashTableLoadFactor;
+  private long maxJoinMemory;
 
   @Override
   /*
@@ -103,15 +104,17 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     hashTableLoadFactor = context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR);
 
     JoinOperator joinOp = (JoinOperator) nd;
-    long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
     // adjust noconditional task size threshold for LLAP
     LlapClusterStateForCompile llapInfo = null;
     if ("llap".equalsIgnoreCase(context.conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
       llapInfo = LlapClusterStateForCompile.getClusterInfo(context.conf);
       llapInfo.initClusterInfo();
     }
-    MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf, llapInfo);
+    MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(context.conf, llapInfo);
     joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo);
+    maxJoinMemory = memoryMonitorInfo.getAdjustedNoConditionalTaskSize();
+
+    LOG.info("maxJoinMemory: {}", maxJoinMemory);
 
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &
@@ -119,11 +122,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     if (!hiveConvertJoin) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
       if (retval == null) {
         return retval;
       } else {
-        fallbackToReduceSideJoin(joinOp, context, maxSize);
+        fallbackToReduceSideJoin(joinOp, context);
         return null;
       }
     }
@@ -138,15 +141,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
+    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxJoinMemory, true);
     if (mapJoinConversionPos < 0) {
-      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
       if (retval == null) {
         return retval;
       } else {
         // only case is full outer join with SMB enabled which is not possible. Convert to regular
         // join.
-        fallbackToReduceSideJoin(joinOp, context, maxSize);
+        fallbackToReduceSideJoin(joinOp, context);
         return null;
       }
     }
@@ -167,12 +170,12 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // check if we can convert to map join no bucket scaling.
     LOG.info("Convert to non-bucketed map join");
     if (numBuckets != 1) {
-      mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize, true);
+      mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxJoinMemory, true);
     }
     if (mapJoinConversionPos < 0) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context, maxSize);
+      fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
 
@@ -238,8 +241,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     if (networkCostDPHJ < networkCostMJ) {
       LOG.info("Dynamically partitioned Hash Join chosen");
-      long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-      return convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize);
+      return convertJoinDynamicPartitionedHashJoin(joinOp, context);
     } else if (numBuckets > 1) {
       LOG.info("Bucket Map Join chosen");
       return convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx);
@@ -271,9 +273,10 @@ public class ConvertJoinMapJoin implements NodeProcessor {
   }
 
   @VisibleForTesting
-  public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize,
+  public MemoryMonitorInfo getMemoryMonitorInfo(
                                                 final HiveConf conf,
                                                 LlapClusterStateForCompile llapInfo) {
+    long maxSize = conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
     final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
     final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
     final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
@@ -315,13 +318,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
   @SuppressWarnings("unchecked")
   private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
-    TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException {
+    TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
     // we cannot convert to bucket map join, we cannot convert to
     // map join either based on the size. Check if we can convert to SMB join.
     if (!(HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN))
       || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
           && joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
-      fallbackToReduceSideJoin(joinOp, context, maxSize);
+      fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
     Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -350,7 +353,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // contains aliases from sub-query
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context, maxSize);
+      fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
 
@@ -360,7 +363,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     } else {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context, maxSize);
+      fallbackToReduceSideJoin(joinOp, context);
     }
     return null;
   }
@@ -893,6 +896,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
 
       long inputSize = computeOnlineDataSize(currInputStat);
+      LOG.info("Join input#{}; onlineDataSize: {}; Statistics: {}", pos, inputSize, currInputStat);
 
       boolean currentInputNotFittingInMemory = false;
       if ((bigInputStat == null)
@@ -1271,14 +1275,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return numBuckets;
   }
 
-  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context,
-    final long maxSize)
+  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
     throws SemanticException {
     // Attempt dynamic partitioned hash join
     // Since we don't have big table index yet, must start with estimate of numReducers
     int numReducers = estimateNumBuckets(joinOp, false);
     LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
-    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false);
+    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxJoinMemory, false);
     if (bigTablePos >= 0) {
       // Now that we have the big table index, get real numReducers value based on big table RS
       ReduceSinkOperator bigTableParentRS =
@@ -1314,11 +1317,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return false;
   }
 
-  private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize)
+  private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
       throws SemanticException {
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
         context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
-      if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) {
+      if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index bbc2453..fe64bf5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -26,8 +26,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -66,6 +64,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
+import junit.framework.TestCase;
+
 /**
  * TestOperators.
  *
@@ -442,6 +442,7 @@ public class TestOperators extends TestCase {
     ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin();
     long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L;
     HiveConf hiveConf = new HiveConf();
+    hiveConf.setLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD, defaultNoConditionalTaskSize);
 
     LlapClusterStateForCompile llapInfo = null;
     if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) {
@@ -449,8 +450,8 @@ public class TestOperators extends TestCase {
       llapInfo.initClusterInfo();
     }
     // execution mode not set, null is returned
-    assertEquals(defaultNoConditionalTaskSize, convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize,
-      hiveConf, llapInfo).getAdjustedNoConditionalTaskSize());
+    assertEquals(defaultNoConditionalTaskSize,
+        convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getAdjustedNoConditionalTaskSize());
     hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap");
 
     if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) {
@@ -464,7 +465,7 @@ public class TestOperators extends TestCase {
     int maxSlots = 3;
     long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots));
     assertEquals(expectedSize,
-      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo)
+        convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo)
         .getAdjustedNoConditionalTaskSize());
 
     // num executors is less than max executors per query (which is not expected case), default executors will be
@@ -473,18 +474,18 @@ public class TestOperators extends TestCase {
     hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5");
     expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots));
     assertEquals(expectedSize,
-      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo)
+        convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo)
         .getAdjustedNoConditionalTaskSize());
 
     // disable memory checking
     hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0");
     assertFalse(
-      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring());
+        convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring());
 
     // invalid inflation factor
     hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000");
     hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f");
     assertFalse(
-      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring());
+        convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring());
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
index ae1ec44..85f2f2b 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
@@ -47,7 +47,7 @@ select key,value from srcbucket_mapjoin_n18;
 analyze table tab1_n5 compute statistics for columns;
 
 -- A negative test as src is not bucketed.
-set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.auto.convert.join.noconditionaltask.size=12000;
 set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select a.key, a.value, b.value
@@ -98,7 +98,7 @@ insert overwrite table tab_part1 partition (ds='2008-04-08')
 select key,value from srcbucket_mapjoin_part_n20;
 analyze table tab_part1 compute statistics for columns;
 
-set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.auto.convert.join.noconditionaltask.size=12000;
 set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select count(*)

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
index cd0a234..2d4907c 100644
--- a/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
+++ b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
@@ -30,7 +30,7 @@ INSERT OVERWRITE TABLE test_table2_n3 PARTITION (ds = '1') SELECT key, key+1, va
 
 -- Insert data into the bucketed table by selecting from another bucketed table
 -- This should be a map-only operation, since the sort-order matches
-set hive.auto.convert.join.noconditionaltask.size=800;
+set hive.auto.convert.join.noconditionaltask.size=400;
 EXPLAIN
 INSERT OVERWRITE TABLE test_table3_n3 PARTITION (ds = '1')
 SELECT a.key, a.key2, concat(a.value, b.value) 

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/join32_lessSize.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/join32_lessSize.q b/ql/src/test/queries/clientpositive/join32_lessSize.q
index 229ba56..fcadbe3 100644
--- a/ql/src/test/queries/clientpositive/join32_lessSize.q
+++ b/ql/src/test/queries/clientpositive/join32_lessSize.q
@@ -9,7 +9,7 @@ CREATE TABLE dest_j2_n1(key STRING, value STRING, val2 STRING) STORED AS TEXTFIL
 
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=6000;
+set hive.auto.convert.join.noconditionaltask.size=4000;
 
 -- Since the inputs are small, it should be automatically converted to mapjoin
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/tez_smb_main.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q b/ql/src/test/queries/clientpositive/tez_smb_main.q
index db8daa3..c7516b8 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_main.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_main.q
@@ -70,7 +70,7 @@ select count(*)
 from tab_n11 a join tab_part_n12 b on a.key = b.key;
 
 
-set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.auto.convert.join.noconditionaltask.size=1000;
 set hive.mapjoin.hybridgrace.minwbsize=125;
 set hive.mapjoin.hybridgrace.minnumpartitions=4;
 set hive.llap.memory.oversubscription.max.executors.per.query=0;
@@ -111,7 +111,7 @@ UNION  ALL
 select s2.key as key, s2.value as value from tab_n11 s2
 ) a join tab_part_n12 b on (a.key = b.key);
 
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=5000;
 
 explain
 select count(*) from

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/unionDistinct_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/unionDistinct_1.q b/ql/src/test/queries/clientpositive/unionDistinct_1.q
index f966f42..75c66b0 100644
--- a/ql/src/test/queries/clientpositive/unionDistinct_1.q
+++ b/ql/src/test/queries/clientpositive/unionDistinct_1.q
@@ -158,7 +158,7 @@ set hive.merge.mapfiles=false;
 
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=15000;
+set hive.auto.convert.join.noconditionaltask.size=8000;
 
 -- Since the inputs are small, it should be automatically converted to mapjoin
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index a639b68..f4f8278 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -1021,8 +1021,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Map 2 <- Map 1 (BROADCAST_EDGE)
+        Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1046,7 +1046,7 @@ STAGE PLANS:
                         value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Map 4 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: o2
@@ -1059,38 +1059,31 @@ STAGE PLANS:
                       expressions: csmallint (type: smallint), cstring2 (type: string)
                       outputColumnNames: _col0, _col2
                       Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: smallint)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: smallint)
-                        Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col2 (type: string)
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: smallint)
+                          1 _col0 (type: smallint)
+                        outputColumnNames: _col2, _col5
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
+                        Select Operator
+                          expressions: hash(_col2,_col5) (type: int)
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
+                          Group By Operator
+                            aggregations: sum(_col0)
+                            mode: hash
+                            outputColumnNames: _col0
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                            Reduce Output Operator
+                              sort order: 
+                              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                              value expressions: _col0 (type: bigint)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Reducer 2 
-            Execution mode: llap
-            Reduce Operator Tree:
-              Merge Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: smallint)
-                  1 _col0 (type: smallint)
-                outputColumnNames: _col2, _col5
-                Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
-                Select Operator
-                  expressions: hash(_col2,_col5) (type: int)
-                  outputColumnNames: _col0
-                  Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: sum(_col0)
-                    mode: hash
-                    outputColumnNames: _col0
-                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      sort order: 
-                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                      value expressions: _col0 (type: bigint)
         Reducer 3 
             Execution mode: vectorized, llap
             Reduce Operator Tree: