You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/24 01:09:08 UTC

[36/50] [abbrv] hive git commit: HIVE-15796: HoS: poor reducer parallelism when operator stats are not accurate (Chao Sun, reviewed by Xuefu Zhang)

HIVE-15796: HoS: poor reducer parallelism when operator stats are not accurate (Chao Sun, reviewed by Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/806d6e1b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/806d6e1b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/806d6e1b

Branch: refs/heads/hive-14535
Commit: 806d6e1b01640e890fa751017d21fc4b107e4f0a
Parents: 8ab1889
Author: Chao Sun <su...@apache.org>
Authored: Fri Feb 17 12:22:45 2017 -0800
Committer: Chao Sun <su...@apache.org>
Committed: Wed Feb 22 09:28:56 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 .../test/resources/testconfiguration.properties |   3 +-
 .../spark/SetSparkReducerParallelism.java       |  79 ++++-
 .../hive/ql/parse/spark/GenSparkUtils.java      |  24 +-
 .../hive/ql/parse/spark/SparkCompiler.java      |  23 +-
 .../queries/clientpositive/spark_use_op_stats.q |  41 +++
 .../spark/spark_use_op_stats.q.out              | 331 +++++++++++++++++++
 7 files changed, 481 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3777fa9..0b315e1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3202,8 +3202,13 @@ public class HiveConf extends Configuration {
             Constants.LLAP_LOGGER_NAME_CONSOLE),
         "logger used for llap-daemons."),
 
+    SPARK_USE_OP_STATS("hive.spark.use.op.stats", true,
+        "Whether to use operator stats to determine reducer parallelism for Hive on Spark. "
+            + "If this is false, Hive will use source table stats to determine reducer "
+            + "parallelism for all first level reduce tasks, and the maximum reducer parallelism "
+            + "from all parents for all the rest (second level and onward) reducer tasks."),
     SPARK_USE_FILE_SIZE_FOR_MAPJOIN("hive.spark.use.file.size.for.mapjoin", false,
-        "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated"
+        "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated "
             + "with TableScan operator on the root of operator tree, instead of using operator statistics."),
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 4a69bcc..d344464 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1483,7 +1483,8 @@ spark.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning.q,\
   spark_dynamic_partition_pruning_2.q,\
   spark_vectorized_dynamic_partition_pruning.q,\
-  spark_use_file_size_for_mapjoin.q
+  spark_use_file_size_for_mapjoin.q,\
+  spark_use_op_stats.q
 
 miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   bucket4.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index 7a5b71f..337f418 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
 import java.util.List;
+import java.util.Set;
 import java.util.Stack;
 
 import org.slf4j.Logger;
@@ -29,7 +30,9 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
@@ -57,6 +60,12 @@ public class SetSparkReducerParallelism implements NodeProcessor {
 
   // Spark memory per task, and total number of cores
   private ObjectPair<Long, Integer> sparkMemoryAndCores;
+  private final boolean useOpStats;
+
+  public SetSparkReducerParallelism(HiveConf conf) {
+    sparkMemoryAndCores = null;
+    useOpStats = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_OP_STATS);
+  }
 
   @Override
   public Object process(Node nd, Stack<Node> stack,
@@ -67,16 +76,28 @@ public class SetSparkReducerParallelism implements NodeProcessor {
 
     ReduceSinkOperator sink = (ReduceSinkOperator) nd;
     ReduceSinkDesc desc = sink.getConf();
+    Set<ReduceSinkOperator> parentSinks = null;
 
     int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS);
     int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
 
+    if (!useOpStats) {
+      parentSinks = OperatorUtils.findOperatorsUpstream(sink, ReduceSinkOperator.class);
+      parentSinks.remove(sink);
+      if (!context.getVisitedReduceSinks().containsAll(parentSinks)) {
+        // We haven't processed all the parent sinks, and we need
+        // them to be done in order to compute the parallelism for this sink.
+        // In this case, skip. We should visit this again from another path.
+        LOG.debug("Skipping sink " + sink + " for now as we haven't seen all its parents.");
+        return false;
+      }
+    }
+
     if (context.getVisitedReduceSinks().contains(sink)) {
       // skip walking the children
       LOG.debug("Already processed reduce sink: " + sink.getName());
       return true;
     }
-
     context.getVisitedReduceSinks().add(sink);
 
     if (needSetParallelism(sink, context.getConf())) {
@@ -96,19 +117,52 @@ public class SetSparkReducerParallelism implements NodeProcessor {
             return false;
           }
         }
+
         long numberOfBytes = 0;
 
-        // we need to add up all the estimates from the siblings of this reduce sink
-        for (Operator<? extends OperatorDesc> sibling
-          : sink.getChildOperators().get(0).getParentOperators()) {
-          if (sibling.getStatistics() != null) {
-            numberOfBytes += sibling.getStatistics().getDataSize();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics());
+        if (useOpStats) {
+          // we need to add up all the estimates from the siblings of this reduce sink
+          for (Operator<? extends OperatorDesc> sibling
+              : sink.getChildOperators().get(0).getParentOperators()) {
+            if (sibling.getStatistics() != null) {
+              numberOfBytes += sibling.getStatistics().getDataSize();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics());
+              }
+            } else {
+              LOG.warn("No stats available from: " + sibling);
             }
-          } else {
-            LOG.warn("No stats available from: " + sibling);
           }
+        } else if (parentSinks.isEmpty()) {
+          // Not using OP stats and this is the first sink in the path, meaning that
+          // we should use TS stats to infer parallelism
+          for (Operator<? extends OperatorDesc> sibling
+              : sink.getChildOperators().get(0).getParentOperators()) {
+            Set<TableScanOperator> sources =
+                OperatorUtils.findOperatorsUpstream(sibling, TableScanOperator.class);
+            for (TableScanOperator source : sources) {
+              if (source.getStatistics() != null) {
+                numberOfBytes += source.getStatistics().getDataSize();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Table source " + source + " has stats: " + source.getStatistics());
+                }
+              } else {
+                LOG.warn("No stats available from table source: " + source);
+              }
+            }
+          }
+          LOG.debug("Gathered stats for sink " + sink + ". Total size is "
+              + numberOfBytes + " bytes.");
+        } else {
+          // Use the maximum parallelism from all parent reduce sinks
+          int numberOfReducers = 0;
+          for (ReduceSinkOperator parent : parentSinks) {
+            numberOfReducers = Math.max(numberOfReducers, parent.getConf().getNumReducers());
+          }
+          desc.setNumReducers(numberOfReducers);
+          LOG.debug("Set parallelism for sink " + sink + " to " + numberOfReducers
+              + " based on its parents");
+          return false;
         }
 
         // Divide it by 2 so that we can have more reducers
@@ -134,7 +188,7 @@ public class SetSparkReducerParallelism implements NodeProcessor {
         desc.setNumReducers(numReducers);
       }
     } else {
-      LOG.info("Number of reducers determined to be: " + desc.getNumReducers());
+      LOG.info("Number of reducers for sink " + sink + " was already determined to be: " + desc.getNumReducers());
     }
 
     return false;
@@ -165,6 +219,9 @@ public class SetSparkReducerParallelism implements NodeProcessor {
   }
 
   private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException {
+    if (sparkMemoryAndCores != null) {
+      return;
+    }
     if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) {
       // If dynamic allocation is enabled, numbers for memory and cores are meaningless. So, we don't
       // try to get it.

http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 36bde30..d0a82af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -102,21 +102,21 @@ public class GenSparkUtils {
     reduceWork.setReducer(root);
     reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
 
-    // All parents should be reduce sinks. We pick the one we just walked
-    // to choose the number of reducers. In the join/union case they will
-    // all be -1. In sort/order case where it matters there will be only
-    // one parent.
-    Preconditions.checkArgument(context.parentOfRoot instanceof ReduceSinkOperator,
-      "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was "
-      + context.parentOfRoot.getClass().getName());
-    ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
-
-    reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+    // Pick the maximum # reducers across all parents as the # of reduce tasks.
+    int maxExecutors = -1;
+    for (Operator<? extends OperatorDesc> parentOfRoot : root.getParentOperators()) {
+      Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator,
+          "AssertionError: expected parentOfRoot to be an "
+              + "instance of ReduceSinkOperator, but was "
+              + parentOfRoot.getClass().getName());
+      ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot;
+      maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers());
+    }
+    reduceWork.setNumReduceTasks(maxExecutors);
 
+    ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
     setupReduceSink(context, reduceWork, reduceSink);
-
     sparkWork.add(reduceWork);
-
     SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork);
 
     sparkWork.connect(context.preceedingWork, reduceWork, edgeProp);

http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index c4b1640..682b987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.lib.TypeRule;
@@ -117,6 +118,9 @@ public class SparkCompiler extends TaskCompiler {
     // Annotation OP tree with statistics
     runStatsAnnotation(procCtx);
 
+    // Set reducer parallelism
+    runSetReducerParallelism(procCtx);
+
     // Run Join releated optimizations
     runJoinOptimizations(procCtx);
 
@@ -266,12 +270,27 @@ public class SparkCompiler extends TaskCompiler {
     }
   }
 
-  private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException {
+  private void runSetReducerParallelism(OptimizeSparkProcContext procCtx) throws SemanticException {
     ParseContext pCtx = procCtx.getParseContext();
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
             ReduceSinkOperator.getOperatorName() + "%"),
-        new SetSparkReducerParallelism());
+        new SetSparkReducerParallelism(pCtx.getConf()));
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    GraphWalker ogw = new PreOrderWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+  }
+
+  private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException {
+    ParseContext pCtx = procCtx.getParseContext();
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
 
     opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx));
 

http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/test/queries/clientpositive/spark_use_op_stats.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_use_op_stats.q b/ql/src/test/queries/clientpositive/spark_use_op_stats.q
new file mode 100644
index 0000000..b559bc0
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_use_op_stats.q
@@ -0,0 +1,41 @@
+set hive.mapred.mode=nonstrict;
+set hive.spark.use.op.stats=false;
+set hive.auto.convert.join=false;
+set hive.exec.reducers.bytes.per.reducer=500;
+
+EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+CREATE TEMPORARY TABLE tmp AS
+SELECT * FROM src WHERE key > 50 AND key < 200;
+
+EXPLAIN
+WITH a AS (
+  SELECT src1.key, src2.value
+  FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+  WHERE src1.key > 100
+),
+b AS (
+  SELECT src1.key, src2.value
+  FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+  WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key;
+
+WITH a AS (
+  SELECT src1.key, src2.value
+  FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+  WHERE src1.key > 100
+),
+b AS (
+  SELECT src1.key, src2.value
+  FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+  WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key;

http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out b/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out
new file mode 100644
index 0000000..76f9936
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out
@@ -0,0 +1,331 @@
+PREHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 43), Map 3 (PARTITION-LEVEL SORT, 43)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0, _col2
+                Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col2 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+97	val_97
+97	val_97
+97	val_97
+97	val_97
+PREHOOK: query: CREATE TEMPORARY TABLE tmp AS
+SELECT * FROM src WHERE key > 50 AND key < 200
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tmp
+POSTHOOK: query: CREATE TEMPORARY TABLE tmp AS
+SELECT * FROM src WHERE key > 50 AND key < 200
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tmp
+PREHOOK: query: EXPLAIN
+WITH a AS (
+  SELECT src1.key, src2.value
+  FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+  WHERE src1.key > 100
+),
+b AS (
+  SELECT src1.key, src2.value
+  FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+  WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+WITH a AS (
+  SELECT src1.key, src2.value
+  FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+  WHERE src1.key > 100
+),
+b AS (
+  SELECT src1.key, src2.value
+  FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+  WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 13), Map 5 (PARTITION-LEVEL SORT, 13)
+        Reducer 3 <- Reducer 2 (PARTITION-LEVEL SORT, 43), Reducer 7 (PARTITION-LEVEL SORT, 43)
+        Reducer 4 <- Reducer 3 (GROUP, 1)
+        Reducer 7 <- Map 6 (PARTITION-LEVEL SORT, 43), Map 8 (PARTITION-LEVEL SORT, 43)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 148 Data size: 1542 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((UDFToDouble(key) > 100.0) and (UDFToDouble(key) > 150.0)) (type: boolean)
+                    Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  alias: src2
+                  Statistics: Num rows: 148 Data size: 1542 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((UDFToDouble(key) > 100.0) and (UDFToDouble(key) > 150.0)) (type: boolean)
+                    Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((UDFToDouble(key) > 150.0) and (UDFToDouble(key) > 100.0)) (type: boolean)
+                    Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+        Map 8 
+            Map Operator Tree:
+                TableScan
+                  alias: src2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((UDFToDouble(key) > 150.0) and (UDFToDouble(key) > 100.0)) (type: boolean)
+                    Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 17 Data size: 182 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 17 Data size: 182 Basic stats: COMPLETE Column stats: NONE
+        Reducer 3 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0, _col2
+                Statistics: Num rows: 66 Data size: 706 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: hash(_col0,_col2) (type: int)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 66 Data size: 706 Basic stats: COMPLETE Column stats: NONE
+                  Group By Operator
+                    aggregations: sum(_col0)
+                    mode: hash
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: bigint)
+        Reducer 4 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 7 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0, _col2
+                Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col2 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Map-reduce partition columns: _col0 (type: string)
+                    Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col1 (type: string)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: WITH a AS (
+  SELECT src1.key, src2.value
+  FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+  WHERE src1.key > 100
+),
+b AS (
+  SELECT src1.key, src2.value
+  FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+  WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@tmp
+#### A masked pattern was here ####
+POSTHOOK: query: WITH a AS (
+  SELECT src1.key, src2.value
+  FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+  WHERE src1.key > 100
+),
+b AS (
+  SELECT src1.key, src2.value
+  FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+  WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@tmp
+#### A masked pattern was here ####
+180817551380