You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/07/27 22:44:37 UTC

hive git commit: HIVE-17087: Remove unnecessary HoS DPP trees during map-join conversion (Sahil Takiar, reviewed by Liyun Zhang, Rui Li)

Repository: hive
Updated Branches:
  refs/heads/master e15b2dec7 -> 61d8b7c41


HIVE-17087: Remove unnecessary HoS DPP trees during map-join conversion (Sahil Takiar, reviewed by Liyun Zhang, Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/61d8b7c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/61d8b7c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/61d8b7c4

Branch: refs/heads/master
Commit: 61d8b7c410bb77d03b59d622171dc8a297f8fc5d
Parents: e15b2de
Author: Sahil Takiar <ta...@gmail.com>
Authored: Thu Jul 27 15:44:16 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Thu Jul 27 15:44:16 2017 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../optimizer/spark/SparkMapJoinOptimizer.java  |  50 ++++
 .../hive/ql/parse/spark/SparkCompiler.java      |  26 +-
 .../spark_dynamic_partition_pruning_2.q         |  16 ++
 .../spark_dynamic_partition_pruning_3.q         |  16 ++
 .../spark/spark_dynamic_partition_pruning.q.out |  70 +----
 .../spark_dynamic_partition_pruning_2.q.out     | 119 ++++++++
 .../spark_dynamic_partition_pruning_3.q.out     | 278 +++++++++++++++++++
 8 files changed, 504 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index f66e19b..6e88a4e 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1392,6 +1392,7 @@ spark.query.files=add_part_multiple.q, \
 spark.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning.q,\
   spark_dynamic_partition_pruning_2.q,\
+  spark_dynamic_partition_pruning_3.q,\
   dynamic_rdd_cache.q, \
   spark_multi_insert_parallel_orderby.q,\
   spark_explainuser_1.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 81c2348..7a3fae6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +29,8 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -456,6 +459,27 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
     Operator<? extends OperatorDesc> parentBigTableOp =
         mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
+
+      for (Operator<?> parentOp : parentBigTableOp.getParentOperators()) {
+        // we might have generated a dynamic partition operator chain. Since
+        // we're removing the reduce sink we need do remove that too.
+        Set<SparkPartitionPruningSinkOperator> partitionPruningSinkOps = new HashSet<>();
+        for (Operator<?> childOp : parentOp.getChildOperators()) {
+          SparkPartitionPruningSinkOperator partitionPruningSinkOp = findPartitionPruningSinkOperator(childOp);
+          if (partitionPruningSinkOp != null) {
+            partitionPruningSinkOps.add(partitionPruningSinkOp);
+          }
+        }
+
+        for (SparkPartitionPruningSinkOperator partitionPruningSinkOp : partitionPruningSinkOps) {
+            OperatorUtils.removeBranch(partitionPruningSinkOp);
+            // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+            LOG.info("Disabling dynamic pruning for: "
+                    + (partitionPruningSinkOp.getConf()).getTableScan().getName()
+                    + ". Need to be removed together with reduce sink");
+        }
+      }
+
       mapJoinOp.getParentOperators().remove(bigTablePosition);
       if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
@@ -476,6 +500,32 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
     return mapJoinOp;
   }
 
+  private SparkPartitionPruningSinkOperator findPartitionPruningSinkOperator(Operator<?> parent) {
+
+    for (Operator<?> op : parent.getChildOperators()) {
+      while (op != null) {
+        if (op instanceof SparkPartitionPruningSinkOperator && op.getConf() instanceof SparkPartitionPruningSinkDesc) {
+          // found dynamic partition pruning operator
+          return (SparkPartitionPruningSinkOperator) op;
+        }
+        if (op instanceof TerminalOperator) {
+          // crossing reduce sink or file sink means the pruning isn't for this parent.
+          break;
+        }
+
+        if (op.getChildOperators().size() != 1) {
+          // dynamic partition pruning pipeline doesn't have multiple children
+          break;
+        }
+
+        op = op.getChildOperators().get(0);
+      }
+    }
+
+    return null;
+  }
+
+
   private boolean containUnionWithoutRS(Operator<? extends OperatorDesc> op) {
     boolean result = false;
     if (op instanceof UnionOperator) {

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 682b987..c195ee9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -124,12 +124,34 @@ public class SparkCompiler extends TaskCompiler {
     // Run Join releated optimizations
     runJoinOptimizations(procCtx);
 
+    // Remove DPP based on expected size of the output data
+    runRemoveDynamicPruningBySize(procCtx);
+
     // Remove cyclic dependencies for DPP
     runCycleAnalysisForPartitionPruning(procCtx);
 
     PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
   }
 
+  private void runRemoveDynamicPruningBySize(OptimizeSparkProcContext procCtx) throws SemanticException {
+    ParseContext pCtx = procCtx.getParseContext();
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+    opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size",
+        SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
+        new SparkRemoveDynamicPruningBySize());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+  }
+
   private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) {
     if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
       return;
@@ -296,10 +318,6 @@ public class SparkCompiler extends TaskCompiler {
 
     opRules.put(new TypeRule(MapJoinOperator.class), new SparkJoinHintOptimizer(pCtx));
 
-    opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size",
-        SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
-        new SparkRemoveDynamicPruningBySize());
-
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_2.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_2.q
index 734f187..2202709 100644
--- a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_2.q
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_2.q
@@ -116,3 +116,19 @@ SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar';
 SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'foo'
 UNION ALL
 SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar';
+
+set hive.spark.dynamic.partition.pruning.max.data.size=10000;
+-- Dynamic partition pruning will be removed as data size exceeds the limit;
+-- and for self join on partitioning column, it should not fail (HIVE-10559).
+explain
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+;
+
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_3.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_3.q
new file mode 100644
index 0000000..ac4ff93
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_3.q
@@ -0,0 +1,16 @@
+set hive.spark.dynamic.partition.pruning=true;
+set hive.auto.convert.join=true;
+
+create table partitioned_table1 (col int) partitioned by (part_col int);
+create table partitioned_table2 (col int) partitioned by (part_col int);
+create table regular_table (col int);
+insert into table regular_table values (1);
+
+alter table partitioned_table1 add partition (part_col = 1);
+insert into table partitioned_table1 partition (part_col = 1) values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
+
+alter table partitioned_table2 add partition (part_col = 1);
+insert into table partitioned_table2 partition (part_col = 1) values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
+
+explain select * from partitioned_table1, partitioned_table2 where partitioned_table1.part_col = partitioned_table2.part_col;
+explain select * from partitioned_table1 where partitioned_table1.part_col in (select regular_table.col from regular_table join partitioned_table2 on regular_table.col = partitioned_table2.part_col);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
index 26680f8..e743af1 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
@@ -4978,8 +4978,7 @@ POSTHOOK: query: EXPLAIN select count(*) from srcpart_date left join srcpart on
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-3 depends on stages: Stage-2
-  Stage-1 depends on stages: Stage-3
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
@@ -4987,38 +4986,6 @@ STAGE PLANS:
     Spark
 #### A masked pattern was here ####
       Vertices:
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: srcpart_date
-                  filterExpr: (date = '2008-04-08') (type: boolean)
-                  Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: (date = '2008-04-08') (type: boolean)
-                    Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: ds (type: string)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col0 (type: string)
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                        Group By Operator
-                          keys: _col0 (type: string)
-                          mode: hash
-                          outputColumnNames: _col0
-                          Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                          Spark Partition Pruning Sink Operator
-                            partition key expr: ds
-                            Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target column name: ds
-                            target work: Map 3
-
-  Stage: Stage-3
-    Spark
-#### A masked pattern was here ####
-      Vertices:
         Map 3 
             Map Operator Tree:
                 TableScan
@@ -5101,8 +5068,7 @@ POSTHOOK: query: EXPLAIN select count(*) from srcpart full outer join srcpart_da
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-3 depends on stages: Stage-2
-  Stage-1 depends on stages: Stage-3
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
@@ -5110,38 +5076,6 @@ STAGE PLANS:
     Spark
 #### A masked pattern was here ####
       Vertices:
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: srcpart_date
-                  filterExpr: (date = '2008-04-08') (type: boolean)
-                  Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: (date = '2008-04-08') (type: boolean)
-                    Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: ds (type: string)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col0 (type: string)
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                        Group By Operator
-                          keys: _col0 (type: string)
-                          mode: hash
-                          outputColumnNames: _col0
-                          Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                          Spark Partition Pruning Sink Operator
-                            partition key expr: ds
-                            Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target column name: ds
-                            target work: Map 1
-
-  Stage: Stage-3
-    Spark
-#### A masked pattern was here ####
-      Vertices:
         Map 1 
             Map Operator Tree:
                 TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
index 3e69f3f..d902694 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
@@ -1027,3 +1027,122 @@ POSTHOOK: Input: default@dim_shops
 4
 5
 6
+PREHOOK: query: explain
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: s2
+                  filterExpr: ds is not null (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: ds (type: string)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                    Spark HashTable Sink Operator
+                      keys:
+                        0 _col0 (type: string)
+                        1 _col0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (GROUP, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: s1
+                  filterExpr: ds is not null (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: ds (type: string)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col0 (type: string)
+                        1 _col0 (type: string)
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: count()
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Local Work:
+              Map Reduce Local Work
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2000000

http://git-wip-us.apache.org/repos/asf/hive/blob/61d8b7c4/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
new file mode 100644
index 0000000..9e583e9
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
@@ -0,0 +1,278 @@
+PREHOOK: query: create table partitioned_table1 (col int) partitioned by (part_col int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partitioned_table1
+POSTHOOK: query: create table partitioned_table1 (col int) partitioned by (part_col int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partitioned_table1
+PREHOOK: query: create table partitioned_table2 (col int) partitioned by (part_col int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partitioned_table2
+POSTHOOK: query: create table partitioned_table2 (col int) partitioned by (part_col int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partitioned_table2
+PREHOOK: query: create table regular_table (col int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@regular_table
+POSTHOOK: query: create table regular_table (col int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@regular_table
+PREHOOK: query: insert into table regular_table values (1)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@regular_table
+POSTHOOK: query: insert into table regular_table values (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@regular_table
+POSTHOOK: Lineage: regular_table.col EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+PREHOOK: query: alter table partitioned_table1 add partition (part_col = 1)
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@partitioned_table1
+POSTHOOK: query: alter table partitioned_table1 add partition (part_col = 1)
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@partitioned_table1
+POSTHOOK: Output: default@partitioned_table1@part_col=1
+PREHOOK: query: insert into table partitioned_table1 partition (part_col = 1) values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@partitioned_table1@part_col=1
+POSTHOOK: query: insert into table partitioned_table1 partition (part_col = 1) values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@partitioned_table1@part_col=1
+POSTHOOK: Lineage: partitioned_table1 PARTITION(part_col=1).col EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+PREHOOK: query: alter table partitioned_table2 add partition (part_col = 1)
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@partitioned_table2
+POSTHOOK: query: alter table partitioned_table2 add partition (part_col = 1)
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@partitioned_table2
+POSTHOOK: Output: default@partitioned_table2@part_col=1
+PREHOOK: query: insert into table partitioned_table2 partition (part_col = 1) values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@partitioned_table2@part_col=1
+POSTHOOK: query: insert into table partitioned_table2 partition (part_col = 1) values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@partitioned_table2@part_col=1
+POSTHOOK: Lineage: partitioned_table2 PARTITION(part_col=1).col EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+PREHOOK: query: explain select * from partitioned_table1, partitioned_table2 where partitioned_table1.part_col = partitioned_table2.part_col
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from partitioned_table1, partitioned_table2 where partitioned_table1.part_col = partitioned_table2.part_col
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: partitioned_table2
+                  Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: col (type: int), part_col (type: int)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                    Spark HashTable Sink Operator
+                      keys:
+                        0 _col1 (type: int)
+                        1 _col1 (type: int)
+                    Select Operator
+                      expressions: _col1 (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        keys: _col0 (type: int)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                        Spark Partition Pruning Sink Operator
+                          partition key expr: part_col
+                          Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                          target column name: part_col
+                          target work: Map 1
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: partitioned_table1
+                  Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: col (type: int), part_col (type: int)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col1 (type: int)
+                        1 _col1 (type: int)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      input vertices:
+                        1 Map 2
+                      Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: explain select * from partitioned_table1 where partitioned_table1.part_col in (select regular_table.col from regular_table join partitioned_table2 on regular_table.col = partitioned_table2.part_col)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from partitioned_table1 where partitioned_table1.part_col in (select regular_table.col from regular_table join partitioned_table2 on regular_table.col = partitioned_table2.part_col)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-3 is a root stage
+  Stage-2 depends on stages: Stage-3
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: regular_table
+                  Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: col is not null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: col (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                      Select Operator
+                        expressions: _col0 (type: int)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: int)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            partition key expr: part_col
+                            Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE
+                            target column name: part_col
+                            target work: Map 3
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: partitioned_table2
+                  Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: part_col (type: int)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col0 (type: int)
+                        1 _col0 (type: int)
+                      outputColumnNames: _col0
+                      input vertices:
+                        0 Map 2
+                      Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        keys: _col0 (type: int)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                        Spark HashTable Sink Operator
+                          keys:
+                            0 _col1 (type: int)
+                            1 _col0 (type: int)
+                        Select Operator
+                          expressions: _col0 (type: int)
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                          Group By Operator
+                            keys: _col0 (type: int)
+                            mode: hash
+                            outputColumnNames: _col0
+                            Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                            Spark Partition Pruning Sink Operator
+                              partition key expr: part_col
+                              Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                              target column name: part_col
+                              target work: Map 1
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: partitioned_table1
+                  Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: col (type: int), part_col (type: int)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Left Semi Join 0 to 1
+                      keys:
+                        0 _col1 (type: int)
+                        1 _col0 (type: int)
+                      outputColumnNames: _col0, _col1
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 12 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 12 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+