You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2018/01/12 03:14:37 UTC

hive git commit: HIVE-18148: NPE in SparkDynamicPartitionPruningResolver (Rui reviewed by Liyun Zhang and Sahil Takiar)

Repository: hive
Updated Branches:
  refs/heads/master a7c41ba2a -> 35ea45a1e


HIVE-18148: NPE in SparkDynamicPartitionPruningResolver (Rui reviewed by Liyun Zhang and Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35ea45a1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35ea45a1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35ea45a1

Branch: refs/heads/master
Commit: 35ea45a1e1a7b700df2984a234f444c4a65ccb0d
Parents: a7c41ba
Author: Rui Li <li...@apache.org>
Authored: Fri Jan 12 11:14:18 2018 +0800
Committer: Rui Li <li...@apache.org>
Committed: Fri Jan 12 11:14:18 2018 +0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../hive/ql/exec/spark/SparkUtilities.java      | 115 ++++++-
 .../hive/ql/parse/spark/SparkCompiler.java      |  16 +-
 .../spark_dynamic_partition_pruning_5.q         |  24 ++
 .../spark_dynamic_partition_pruning_5.q.out     | 335 +++++++++++++++++++
 5 files changed, 482 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/35ea45a1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index ac81995..d51e5cd 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1506,6 +1506,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning_2.q,\
   spark_dynamic_partition_pruning_3.q,\
   spark_dynamic_partition_pruning_4.q,\
+  spark_dynamic_partition_pruning_5.q,\
   spark_dynamic_partition_pruning_mapjoin_only.q,\
   spark_constprog_dpp.q,\
   spark_dynamic_partition_pruning_recursive_mapjoin.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/35ea45a1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index ca19fd0..1303386 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FilenameUtils;
@@ -30,13 +32,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -211,6 +219,33 @@ public class SparkUtilities {
   }
 
   /**
+   * Collect operators of type T starting from root. Matching operators will be put into result.
+   * Set seen can be used to skip search in certain branches.
+   */
+  public static <T extends Operator<?>> void collectOp(Operator<?> root, Class<T> cls,
+      Collection<T> result, Set<Operator<?>> seen) {
+    if (seen.contains(root)) {
+      return;
+    }
+    Deque<Operator<?>> deque = new ArrayDeque<>();
+    deque.add(root);
+    while (!deque.isEmpty()) {
+      Operator<?> op = deque.remove();
+      seen.add(op);
+      if (cls.isInstance(op)) {
+        result.add((T) op);
+      }
+      if (op.getChildOperators() != null) {
+        for (Operator<?> child : op.getChildOperators()) {
+          if (!seen.contains(child)) {
+            deque.add(child);
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * remove currTask from the children of its parentTask
    * remove currTask from the parent of its childrenTask
    * @param currTask
@@ -227,4 +262,80 @@ public class SparkUtilities {
     //remove currTask from childTasks
     currTask.removeFromChildrenTasks();
   }
+
+  /**
+   * For DPP sinks w/ common join, we'll split the tree and what's above the branching
+   * operator is computed multiple times. Therefore it may not be good for performance to support
+   * nested DPP sinks, i.e. one DPP sink depends on other DPP sinks.
+   * The following is an example:
+   *
+   *             TS          TS
+   *             |           |
+   *            ...         FIL
+   *            |           |  \
+   *            RS         RS  SEL
+   *              \        /    |
+   *     TS          JOIN      GBY
+   *     |         /     \      |
+   *    RS        RS    SEL   DPP2
+   *     \       /       |
+   *       JOIN         GBY
+   *                     |
+   *                    DPP1
+   *
+   * where DPP1 depends on DPP2.
+   *
+   * To avoid such case, we'll visit all the branching operators. If a branching operator has any
+   * further away DPP branches w/ common join in its sub-tree, such branches will be removed.
+   * In the above example, the branch of DPP1 will be removed.
+   */
+  public static void removeNestedDPP(OptimizeSparkProcContext procContext) {
+    Set<SparkPartitionPruningSinkOperator> allDPPs = new HashSet<>();
+    Set<Operator<?>> seen = new HashSet<>();
+    // collect all DPP sinks
+    for (TableScanOperator root : procContext.getParseContext().getTopOps().values()) {
+      SparkUtilities.collectOp(root, SparkPartitionPruningSinkOperator.class, allDPPs, seen);
+    }
+    // collect all branching operators
+    Set<Operator<?>> branchingOps = new HashSet<>();
+    for (SparkPartitionPruningSinkOperator dpp : allDPPs) {
+      branchingOps.add(dpp.getBranchingOp());
+    }
+    // remember the branching ops we have visited
+    Set<Operator<?>> visited = new HashSet<>();
+    for (Operator<?> branchingOp : branchingOps) {
+      if (!visited.contains(branchingOp)) {
+        visited.add(branchingOp);
+        seen.clear();
+        Set<SparkPartitionPruningSinkOperator> nestedDPPs = new HashSet<>();
+        for (Operator<?> branch : branchingOp.getChildOperators()) {
+          if (!isDirectDPPBranch(branch)) {
+            SparkUtilities.collectOp(branch, SparkPartitionPruningSinkOperator.class, nestedDPPs,
+                seen);
+          }
+        }
+        for (SparkPartitionPruningSinkOperator nestedDPP : nestedDPPs) {
+          visited.add(nestedDPP.getBranchingOp());
+          // if a DPP is with MJ, the tree won't be split and so we don't have to remove it
+          if (!nestedDPP.isWithMapjoin()) {
+            OperatorUtils.removeBranch(nestedDPP);
+          }
+        }
+      }
+    }
+  }
+
+  // whether of pattern "SEL - GBY - DPP"
+  private static boolean isDirectDPPBranch(Operator<?> op) {
+    if (op instanceof SelectOperator && op.getChildOperators() != null
+        && op.getChildOperators().size() == 1) {
+      op = op.getChildOperators().get(0);
+      if (op instanceof GroupByOperator && op.getChildOperators() != null
+          && op.getChildOperators().size() == 1) {
+        op = op.getChildOperators().get(0);
+        return op instanceof SparkPartitionPruningSinkOperator;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/35ea45a1/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 965044d..aba1518 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -126,11 +126,16 @@ public class SparkCompiler extends TaskCompiler {
     // Run Join releated optimizations
     runJoinOptimizations(procCtx);
 
-    // Remove DPP based on expected size of the output data
-    runRemoveDynamicPruning(procCtx);
+    if(conf.isSparkDPPAny()){
+      // Remove DPP based on expected size of the output data
+      runRemoveDynamicPruning(procCtx);
 
-    // Remove cyclic dependencies for DPP
-    runCycleAnalysisForPartitionPruning(procCtx);
+      // Remove cyclic dependencies for DPP
+      runCycleAnalysisForPartitionPruning(procCtx);
+
+      // Remove nested DPPs
+      SparkUtilities.removeNestedDPP(procCtx);
+    }
 
     // Re-run constant propagation so we fold any new constants introduced by the operator optimizers
     // Specifically necessary for DPP because we might have created lots of "and true and true" conditions
@@ -161,9 +166,6 @@ public class SparkCompiler extends TaskCompiler {
   }
 
   private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) {
-    if (!conf.isSparkDPPAny()) {
-      return;
-    }
 
     boolean cycleFree = false;
     while (!cycleFree) {

http://git-wip-us.apache.org/repos/asf/hive/blob/35ea45a1/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q
new file mode 100644
index 0000000..4883787
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q
@@ -0,0 +1,24 @@
+set hive.spark.dynamic.partition.pruning=true;
+
+-- This qfile tests whether we can handle nested DPP sinks
+
+create table part1(key string, value string) partitioned by (p string);
+insert into table part1 partition (p='1') select * from src;
+
+create table part2(key string, value string) partitioned by (p string);
+insert into table part2 partition (p='1') select * from src;
+
+create table regular1 as select * from src limit 2;
+
+-- nested DPP is removed, upper most DPP is w/ common join
+explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.p;
+
+-- nested DPP is removed, upper most DPP is w/ map join
+set hive.auto.convert.join=true;
+-- ensure regular1 is treated as small table, and partitioned tables are not
+set hive.auto.convert.join.noconditionaltask.size=20;
+explain select * from regular1 join part1 on regular1.key=part1.p join part2 on regular1.value=part2.p;
+
+drop table part1;
+drop table part2;
+drop table regular1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/35ea45a1/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out
new file mode 100644
index 0000000..189a43b
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out
@@ -0,0 +1,335 @@
+PREHOOK: query: create table part1(key string, value string) partitioned by (p string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@part1
+POSTHOOK: query: create table part1(key string, value string) partitioned by (p string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@part1
+PREHOOK: query: insert into table part1 partition (p='1') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@part1@p=1
+POSTHOOK: query: insert into table part1 partition (p='1') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@part1@p=1
+POSTHOOK: Lineage: part1 PARTITION(p=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: part1 PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table part2(key string, value string) partitioned by (p string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@part2
+POSTHOOK: query: create table part2(key string, value string) partitioned by (p string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@part2
+PREHOOK: query: insert into table part2 partition (p='1') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@part2@p=1
+POSTHOOK: query: insert into table part2 partition (p='1') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@part2@p=1
+POSTHOOK: Lineage: part2 PARTITION(p=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: part2 PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table regular1 as select * from src limit 2
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@regular1
+POSTHOOK: query: create table regular1 as select * from src limit 2
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@regular1
+POSTHOOK: Lineage: regular1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: regular1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.p
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.p
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            Target column: [1:p (string)]
+                            partition key expr: [p]
+                            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                            target works: [Map 1]
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4)
+        Reducer 3 <- Map 5 (PARTITION-LEVEL SORT, 4), Reducer 2 (PARTITION-LEVEL SORT, 4)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: part1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), p (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col2 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col2 (type: string)
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string), _col1 (type: string)
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  alias: part2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), p (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col2 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col2 (type: string)
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string), _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col2 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col4 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col4 (type: string)
+                  Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+        Reducer 3 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col4 (type: string)
+                  1 _col2 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
+                Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col3 (type: string), _col4 (type: string), _col0 (type: string), _col1 (type: string), _col2 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
+                  Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: explain select * from regular1 join part1 on regular1.key=part1.p join part2 on regular1.value=part2.p
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from regular1 join part1 on regular1.key=part1.p join part2 on regular1.value=part2.p
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: regular1
+                  Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col2 (type: string)
+                          1 _col0 (type: string)
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            Target column: [1:p (string)]
+                            partition key expr: [p]
+                            Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE
+                            target works: [Map 1]
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: part1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), p (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col2 (type: string)
+                        1 _col0 (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col4 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col4 (type: string)
+                        Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: part2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), p (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col2 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col2 (type: string)
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string), _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col4 (type: string)
+                  1 _col2 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
+                Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col3 (type: string), _col4 (type: string), _col0 (type: string), _col1 (type: string), _col2 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
+                  Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: drop table part1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@part1
+PREHOOK: Output: default@part1
+POSTHOOK: query: drop table part1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@part1
+POSTHOOK: Output: default@part1
+PREHOOK: query: drop table part2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@part2
+PREHOOK: Output: default@part2
+POSTHOOK: query: drop table part2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@part2
+POSTHOOK: Output: default@part2
+PREHOOK: query: drop table regular1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@regular1
+PREHOOK: Output: default@regular1
+POSTHOOK: query: drop table regular1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@regular1
+POSTHOOK: Output: default@regular1