You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/02/17 20:07:47 UTC

hive git commit: HIVE-15489: Alternatively use table scan stats for HoS (Chao Sun, reviewed by Xuefu Zhang)

Repository: hive
Updated Branches:
  refs/heads/master bba18181a -> 368d916b3


HIVE-15489: Alternatively use table scan stats for HoS (Chao Sun, reviewed by Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/368d916b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/368d916b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/368d916b

Branch: refs/heads/master
Commit: 368d916b369f1adc58da884463b1dedb8c010616
Parents: bba1818
Author: Chao Sun <su...@apache.org>
Authored: Thu Jan 19 16:42:49 2017 -0800
Committer: Chao Sun <su...@apache.org>
Committed: Fri Feb 17 12:06:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../test/resources/testconfiguration.properties |   3 +-
 .../hadoop/hive/ql/exec/OperatorUtils.java      |  34 +++
 .../SparkRemoveDynamicPruningBySize.java        |   4 +-
 .../optimizer/spark/SparkMapJoinOptimizer.java  |  34 ++-
 .../hive/ql/parse/spark/GenSparkUtils.java      |  36 +--
 .../hive/ql/parse/spark/SparkCompiler.java      |   3 +-
 .../spark_use_file_size_for_mapjoin.q           |  30 +++
 .../spark/spark_use_file_size_for_mapjoin.q.out | 257 +++++++++++++++++++
 9 files changed, 364 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0bff243..1af59ba 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3193,6 +3193,9 @@ public class HiveConf extends Configuration {
             Constants.LLAP_LOGGER_NAME_CONSOLE),
         "logger used for llap-daemons."),
 
+    SPARK_USE_FILE_SIZE_FOR_MAPJOIN("hive.spark.use.file.size.for.mapjoin", false,
+        "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated"
+            + "with TableScan operator on the root of operator tree, instead of using operator statistics."),
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
       "Timeout for requests from Hive client to remote Spark driver."),

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e8db920..7c54275 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1481,7 +1481,8 @@ spark.query.files=add_part_multiple.q, \
 spark.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning.q,\
   spark_dynamic_partition_pruning_2.q,\
-  spark_vectorized_dynamic_partition_pruning.q
+  spark_vectorized_dynamic_partition_pruning.q,\
+  spark_use_file_size_for_mapjoin.q
 
 miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   bucket4.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index d294e25..5bbfe12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -346,4 +346,38 @@ public class OperatorUtils {
       }
     }
   }
+
+  /**
+   * Given the input operator 'op', walk up the operator tree from 'op', and collect all the
+   * roots that can be reached from it. The results are stored in 'roots'.
+   */
+  public static void findRoots(Operator<?> op, Collection<Operator<?>> roots) {
+    List<Operator<?>> parents = op.getParentOperators();
+    if (parents == null || parents.isEmpty()) {
+      roots.add(op);
+      return;
+    }
+    for (Operator<?> p : parents) {
+      findRoots(p, roots);
+    }
+  }
+
+  /**
+   * Remove the branch that contains the specified operator. Do nothing if there's no branching,
+   * i.e. all the upstream operators have only one child.
+   */
+  public static void removeBranch(Operator<?> op) {
+    Operator<?> child = op;
+    Operator<?> curr = op;
+
+    while (curr.getChildOperators().size() <= 1) {
+      child = curr;
+      if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
+        return;
+      }
+      curr = curr.getParentOperators().get(0);
+    }
+
+    curr.removeChild(child);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
index c41a0c8..26a1088 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.Stack;
 
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 
@@ -54,7 +54,7 @@ public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
 
     if (desc.getStatistics().getDataSize() > context.getConf()
         .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
-      GenSparkUtils.removeBranch(op);
+      OperatorUtils.removeBranch(op);
       // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
       LOG.info("Disabling dynamic pruning for: "
           + desc.getTableScan().getName()

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 7faff88..d8f37ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -191,12 +193,40 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
     int pos = 0;
 
     // bigTableFound means we've encountered a table that's bigger than the
-    // max. This table is either the the big table or we cannot convert.
+    // max. This table is either the big table or we cannot convert.
     boolean bigTableFound = false;
+    boolean useTsStats = context.getConf().getBoolean(HiveConf.ConfVars.SPARK_USE_FILE_SIZE_FOR_MAPJOIN.varname, false);
+    boolean hasUpstreamSinks = false;
 
+    // Check whether there's any upstream RS.
+    // If so, don't use TS stats because they could be inaccurate.
     for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      Set<ReduceSinkOperator> parentSinks =
+          OperatorUtils.findOperatorsUpstream(parentOp, ReduceSinkOperator.class);
+      parentSinks.remove(parentOp);
+      if (!parentSinks.isEmpty()) {
+        hasUpstreamSinks = true;
+      }
+    }
+
+    // If we are using TS stats and this JOIN has at least one upstream RS, disable MapJoin conversion.
+    if (useTsStats && hasUpstreamSinks) {
+      return new long[]{-1, 0, 0};
+    }
+
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      Statistics currInputStat;
+      if (useTsStats) {
+        currInputStat = new Statistics();
+        // Find all root TSs and add up all data sizes
+        // Not adding other stats (e.g., # of rows, col stats) since only data size is used here
+        for (TableScanOperator root : OperatorUtils.findOperatorsUpstream(parentOp, TableScanOperator.class)) {
+          currInputStat.addToDataSize(root.getStatistics().getDataSize());
+        }
+      } else {
+         currInputStat = parentOp.getStatistics();
+      }
 
-      Statistics currInputStat = parentOp.getStatistics();
       if (currInputStat == null) {
         LOG.warn("Couldn't get statistics from: " + parentOp);
         return new long[]{-1, 0, 0};

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 7b2b3c0..36bde30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
@@ -573,7 +574,7 @@ public class GenSparkUtils {
    */
   public BaseWork getEnclosingWork(Operator<?> op, GenSparkProcContext procCtx) {
     List<Operator<?>> ops = new ArrayList<Operator<?>>();
-    findRoots(op, ops);
+    OperatorUtils.findRoots(op, ops);
     for (Operator<?> r : ops) {
       BaseWork work = procCtx.rootToWorkMap.get(r);
       if (work != null) {
@@ -582,37 +583,4 @@ public class GenSparkUtils {
     }
     return null;
   }
-
-  /*
-   * findRoots returns all root operators (in ops) that result in operator op
-   */
-  private void findRoots(Operator<?> op, List<Operator<?>> ops) {
-    List<Operator<?>> parents = op.getParentOperators();
-    if (parents == null || parents.isEmpty()) {
-      ops.add(op);
-      return;
-    }
-    for (Operator<?> p : parents) {
-      findRoots(p, ops);
-    }
-  }
-
-  /**
-   * Remove the branch that contains the specified operator. Do nothing if there's no branching,
-   * i.e. all the upstream operators have only one child.
-   */
-  public static void removeBranch(Operator<?> op) {
-    Operator<?> child = op;
-    Operator<?> curr = op;
-
-    while (curr.getChildOperators().size() <= 1) {
-      child = curr;
-      if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
-        return;
-      }
-      curr = curr.getParentOperators().get(0);
-    }
-
-    curr.removeChild(child);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 71528e8..c4b1640 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -169,7 +170,7 @@ public class SparkCompiler extends TaskCompiler {
       return;
     }
 
-    GenSparkUtils.removeBranch(toRemove);
+    OperatorUtils.removeBranch(toRemove);
     // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
     LOG.info("Disabling dynamic pruning for: "
         + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency");

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q b/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q
new file mode 100644
index 0000000..b623b83
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q
@@ -0,0 +1,30 @@
+set hive.mapred.mode=nonstrict;
+set hive.auto.convert.join=true;
+set hive.spark.use.file.size.for.mapjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=4000;
+
+EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+set hive.auto.convert.join.noconditionaltask.size=8000;
+
+-- This is copied from auto_join2. Without the configuration both joins are mapjoins,
+-- but with the configuration on, Hive should not turn the second join into mapjoin since it
+-- has a upstream reduce sink.
+
+CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value;
+
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value;
+
+SELECT sum(hash(dest.key,dest.value)) FROM dest;

http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out
new file mode 100644
index 0000000..9044140
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out
@@ -0,0 +1,257 @@
+PREHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0, _col2
+                Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col2 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+97	val_97
+97	val_97
+97	val_97
+97	val_97
+PREHOOK: query: CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dest
+POSTHOOK: query: CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dest
+PREHOOK: query: EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-3 is a root stage
+  Stage-1 depends on stages: Stage-3
+  Stage-0 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 3
+                        Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          key expressions: (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double)
+                          sort order: +
+                          Map-reduce partition columns: (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double)
+                          Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: src3
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: UDFToDouble(_col0) (type: double)
+                        sort order: +
+                        Map-reduce partition columns: UDFToDouble(_col0) (type: double)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double)
+                  1 UDFToDouble(_col0) (type: double)
+                outputColumnNames: _col0, _col3
+                Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: UDFToInteger(_col0) (type: int), _col3 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                        name: default.dest
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.dest
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest
+POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest
+POSTHOOK: Lineage: dest.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest.value SIMPLE [(src)src3.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: SELECT sum(hash(dest.key,dest.value)) FROM dest
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT sum(hash(dest.key,dest.value)) FROM dest
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest
+#### A masked pattern was here ####
+33815990627