You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/24 01:09:14 UTC

[42/50] [abbrv] hive git commit: HIVE-12492: MapJoin: 4 million unique integers seems to be a probe plateau (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-12492: MapJoin: 4 million unique integers seems to be a probe plateau (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e941e63c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e941e63c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e941e63c

Branch: refs/heads/hive-14535
Commit: e941e63c7d2830395e0b535e9b1a3c33d6e5b652
Parents: 759766e
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Feb 23 08:50:20 2017 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Feb 23 18:50:14 2017 +0000

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../test/resources/testconfiguration.properties |   1 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   | 106 +++-
 .../stats/annotation/StatsRulesProcFactory.java |  44 +-
 .../apache/hadoop/hive/ql/stats/StatsUtils.java |   6 +-
 .../queries/clientpositive/join_max_hashtable.q |  37 ++
 .../llap/join_max_hashtable.q.out               | 490 +++++++++++++++++++
 7 files changed, 678 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0b315e1..46be3fb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1422,6 +1422,11 @@ public class HiveConf extends Configuration {
         "This controls how many partitions can be scanned for each partitioned table.\n" +
         "The default value \"-1\" means no limit. (DEPRECATED: Please use " + ConfVars.METASTORE_LIMIT_PARTITION_REQUEST + " in the metastore instead.)"),
 
+    HIVECONVERTJOINMAXENTRIESHASHTABLE("hive.auto.convert.join.hashtable.max.entries", 4194304L,
+        "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" +
+        "However, if it is on, and the predicated number of entries in hashtable for a given join \n" +
+        "input is larger than this number, the join will not be converted to a mapjoin. \n" +
+        "The value \"-1\" means no limit."),
     HIVEHASHTABLEKEYCOUNTADJUSTMENT("hive.hashtable.key.count.adjustment", 1.0f,
         "Adjustment to mapjoin hashtable size derived from table and column statistics; the estimate" +
         " of the number of keys is divided by this value. If the value is 0, statistics are not used" +

http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index d344464..5b30157 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -501,6 +501,7 @@ minillaplocal.query.files=acid_globallimit.q,\
   join1.q,\
   join_acid_non_acid.q,\
   join_filters.q,\
+  join_max_hashtable.q,\
   join_nulls.q,\
   join_nullsafe.q,\
   leftsemijoin_mr.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 93e3631..e68618a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -53,6 +52,7 @@ import org.apache.hadoop.hive.ql.parse.GenTezUtils;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +118,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize);
+    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
     if (mapJoinConversionPos < 0) {
       Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
       if (retval == null) {
@@ -141,7 +142,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // check if we can convert to map join no bucket scaling.
     LOG.info("Convert to non-bucketed map join");
     if (numBuckets != 1) {
-      mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize);
+      mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize, true);
     }
     if (mapJoinConversionPos < 0) {
       // we are just converting to a common merge join operator. The shuffle
@@ -519,8 +520,22 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return false;
   }
 
+  /**
+   * Obtain big table position for join.
+   *
+   * @param joinOp join operator
+   * @param context optimization context
+   * @param buckets bucket count for Bucket Map Join conversion consideration or reduce count
+   * for Dynamic Hash Join conversion consideration
+   * @param skipJoinTypeChecks whether to skip join type checking
+   * @param maxSize size threshold for Map Join conversion
+   * @param checkHashTableEntries whether to check threshold for distinct keys in hash table for Map Join
+   * @return returns big table position or -1 if it cannot be determined
+   * @throws SemanticException
+   */
   public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
-      int buckets, boolean skipJoinTypeChecks, long maxSize) throws SemanticException {
+      int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkHashTableEntries)
+              throws SemanticException {
     if (!skipJoinTypeChecks) {
       /*
        * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is
@@ -628,10 +643,20 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         // We are replacing the current big table with a new one, thus
         // we need to count the current one as a map table then.
         totalSize += bigInputStat.getDataSize();
+        // Check if number of distinct keys is larger than given max
+        // number of entries for HashMap. If it is, we do not convert.
+        if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) {
+          return -1;
+        }
       } else if (!selectedBigTable) {
         // This is not the first table and we are not using it as big table,
         // in fact, we're adding this table as a map table
         totalSize += inputSize;
+        // Check if number of distinct keys is larger than given max
+        // number of entries for HashMap. If it is, we do not convert.
+        if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) {
+          return -1;
+        }
       }
 
       if (totalSize/buckets > maxSize) {
@@ -905,8 +930,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     int numReducers = estimateNumBuckets(joinOp, false);
     LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
     int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
-                          context.conf.getLongVar(
-                              HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD));
+            context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
+            false);
     if (bigTablePos >= 0) {
       // Now that we have the big table index, get real numReducers value based on big table RS
       ReduceSinkOperator bigTableParentRS =
@@ -951,7 +976,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     }
 
     int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false),
-                  true, Long.MAX_VALUE);
+                  true, Long.MAX_VALUE, false);
     if (pos < 0) {
       LOG.info("Could not get a valid join position. Defaulting to position 0");
       pos = 0;
@@ -961,4 +986,71 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     LOG.info("Fallback to common merge join operator");
     convertJoinSMBJoin(joinOp, context, pos, 0, false);
   }
+
+  /* Returns true if it passes the test, false otherwise. */
+  private boolean checkNumberOfEntriesForHashTable(JoinOperator joinOp, int position,
+          OptimizeTezProcContext context) {
+    long max = HiveConf.getLongVar(context.parseContext.getConf(),
+            HiveConf.ConfVars.HIVECONVERTJOINMAXENTRIESHASHTABLE);
+    if (max < 1) {
+      // Max is disabled, we can safely return true
+      return true;
+    }
+    // Calculate number of different entries and evaluate
+    ReduceSinkOperator rsOp = (ReduceSinkOperator) joinOp.getParentOperators().get(position);
+    List<String> keys = StatsUtils.getQualifedReducerKeyNames(rsOp.getConf().getOutputKeyColumnNames());
+    Statistics inputStats = rsOp.getStatistics();
+    List<ColStatistics> columnStats = new ArrayList<>();
+    for (String key : keys) {
+      ColStatistics cs = inputStats.getColumnStatisticsFromColName(key);
+      if (cs == null) {
+        LOG.debug("Couldn't get statistics for: {}", key);
+        return true;
+      }
+      columnStats.add(cs);
+    }
+    long numRows = inputStats.getNumRows();
+    long estimation = estimateNDV(numRows, columnStats);
+    LOG.debug("Estimated NDV for input {}: {}; Max NDV for MapJoin conversion: {}",
+            position, estimation, max);
+    if (estimation > max) {
+      // Estimation larger than max
+      LOG.debug("Number of different entries for HashTable is greater than the max; "
+          + "we do not converting to MapJoin");
+      return false;
+    }
+    // We can proceed with the conversion
+    return true;
+  }
+
+  private static long estimateNDV(long numRows, List<ColStatistics> columnStats) {
+    // If there is a single column, return the number of distinct values
+    if (columnStats.size() == 1) {
+      return columnStats.get(0).getCountDistint();
+    }
+
+    // The expected number of distinct values when choosing p values
+    // with replacement from n integers is n . (1 - ((n - 1) / n) ^ p).
+    //
+    // If we have several uniformly distributed attributes A1 ... Am
+    // with N1 ... Nm distinct values, they behave as one uniformly
+    // distributed attribute with N1 * ... * Nm distinct values.
+    long n = 1L;
+    for (ColStatistics cs : columnStats) {
+      final long ndv = cs.getCountDistint();
+      if (ndv > 1) {
+        n = StatsUtils.safeMult(n, ndv);
+      }
+    }
+    final double nn = (double) n;
+    final double a = (nn - 1d) / nn;
+    if (a == 1d) {
+      // A under-flows if nn is large.
+      return numRows;
+    }
+    final double v = nn * (1d - Math.pow(a, numRows));
+    // Cap at fact-row-count, because numerical artifacts can cause it
+    // to go a few % over.
+    return Math.min(Math.round(v), numRows);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
index 61f1374..bdb09a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
@@ -51,10 +51,43 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualNS;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -1409,7 +1442,12 @@ public class StatsRulesProcFactory {
         // get the join keys from parent ReduceSink operators
         for (int pos = 0; pos < parents.size(); pos++) {
           ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
-          Statistics parentStats = parent.getStatistics();
+          Statistics parentStats;
+          try {
+            parentStats = parent.getStatistics().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
+          }
           keyExprs = StatsUtils.getQualifedReducerKeyNames(parent.getConf()
               .getOutputKeyColumnNames());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
index 0da7ea4..e48b609 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
@@ -1278,7 +1278,11 @@ public class StatsUtils {
         ColStatistics colStats = parentStats.getColumnStatisticsFromColName(colName);
         if (colStats != null) {
           /* If statistics for the column already exist use it. */
-          return colStats;
+          try {
+            return colStats.clone();
+          } catch (CloneNotSupportedException e) {
+            return null;
+          }
         }
 
         // virtual columns

http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/ql/src/test/queries/clientpositive/join_max_hashtable.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/join_max_hashtable.q b/ql/src/test/queries/clientpositive/join_max_hashtable.q
new file mode 100644
index 0000000..9c30a0d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/join_max_hashtable.q
@@ -0,0 +1,37 @@
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.hashtable.max.entries=500;
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key);
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);
+
+set hive.auto.convert.join.hashtable.max.entries=300;
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key);
+
+-- DO NOT CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);
+
+set hive.auto.convert.join.hashtable.max.entries=10;
+
+-- DO NOT CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key);
+
+-- DO NOT CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);

http://git-wip-us.apache.org/repos/asf/hive/blob/e941e63c/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
new file mode 100644
index 0000000..85d45fe
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
@@ -0,0 +1,490 @@
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string), _col1 (type: string)
+                          1 _col0 (type: string), _col1 (type: string)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 5 Data size: 890 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 5 Data size: 890 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
+                  1 KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
+                outputColumnNames: _col0, _col1
+                input vertices:
+                  1 Map 3
+                Statistics: Num rows: 5 Data size: 890 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 5 Data size: 890 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string)
+                  1 KEY.reducesinkkey0 (type: string)
+                outputColumnNames: _col0, _col1
+                input vertices:
+                  1 Map 3
+                Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Map Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
+                  1 KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
+                outputColumnNames: _col0, _col1
+                input vertices:
+                  1 Map 3
+                Statistics: Num rows: 5 Data size: 890 Basic stats: COMPLETE Column stats: COMPLETE
+                HybridGraceHashJoin: true
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 5 Data size: 890 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+