You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/07/31 01:33:24 UTC

hive git commit: HIVE-16998: Add config to enable HoS DPP only for map-joins (Janaki Lahorani, reviewed by Sahil Takiar)

Repository: hive
Updated Branches:
  refs/heads/master 483ac74d8 -> 723b4ef7f


HIVE-16998: Add config to enable HoS DPP only for map-joins (Janaki Lahorani, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/723b4ef7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/723b4ef7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/723b4ef7

Branch: refs/heads/master
Commit: 723b4ef7f923780275e08bbbd1201efd0248a675
Parents: 483ac74
Author: Janaki Lahorani <ja...@cloudera.com>
Authored: Sun Jul 30 18:33:25 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sun Jul 30 18:33:25 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  29 ++
 .../test/resources/testconfiguration.properties |   1 +
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   2 +-
 .../DynamicPartitionPruningOptimization.java    |   2 +-
 .../ql/optimizer/SparkRemoveDynamicPruning.java |  80 +++
 .../SparkRemoveDynamicPruningBySize.java        |  65 ---
 .../hive/ql/parse/spark/SparkCompiler.java      |  14 +-
 .../SparkPartitionPruningSinkOperator.java      |  49 ++
 .../hive/ql/parse/spark/SplitOpTreeForDPP.java  |  25 +-
 .../hive/ql/ppd/SyntheticJoinPredicate.java     |   2 +-
 ...ark_dynamic_partition_pruning_mapjoin_only.q |  50 ++
 ...dynamic_partition_pruning_mapjoin_only.q.out | 492 +++++++++++++++++++
 12 files changed, 717 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 20b193c..7244a66 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3426,6 +3426,10 @@ public class HiveConf extends Configuration {
     SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE(
         "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
         "Maximum total data size in dynamic pruning."),
+    SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY(
+        "hive.spark.dynamic.partition.pruning.map.join.only", false,
+        "Turn on dynamic partition pruning only for map joins.\n" +
+        "If hive.spark.dynamic.partition.pruning is set to true, this parameter value is ignored."),
     SPARK_USE_GROUPBY_SHUFFLE(
         "hive.spark.use.groupby.shuffle", true,
         "Spark groupByKey transformation has better performance but uses unbounded memory." +
@@ -4045,6 +4049,17 @@ public class HiveConf extends Configuration {
     conf.setBoolean(var.varname, val);
   }
 
+  /* Dynamic partition pruning is enabled in some or all cases if either
+   * hive.spark.dynamic.partition.pruning is true or
+   * hive.spark.dynamic.partition.pruning.map.join.only is true
+   */
+  public static boolean isSparkDPPAny(Configuration conf) {
+    return (conf.getBoolean(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING.varname,
+            ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING.defaultBoolVal) ||
+            conf.getBoolean(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY.varname,
+            ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY.defaultBoolVal));
+  }
+
   public boolean getBoolVar(ConfVars var) {
     return getBoolVar(this, var);
   }
@@ -4658,6 +4673,20 @@ public class HiveConf extends Configuration {
     return isWebUiEnabled() && this.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_HISTORIC_QUERIES) > 0;
   }
 
+  /* Dynamic partition pruning is enabled in some or all cases
+   */
+  public boolean isSparkDPPAny() {
+    return isSparkDPPAny(this);
+  }
+
+  /* Dynamic partition pruning is enabled only for map join
+   * hive.spark.dynamic.partition.pruning is false and
+   * hive.spark.dynamic.partition.pruning.map.join.only is true
+   */
+  public boolean isSparkDPPOnlyMapjoin() {
+    return (!this.getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING) &&
+            this.getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY));
+  }
 
   public static boolean isLoadMetastoreConfig() {
     return loadMetastoreConfig;

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index ba6c3e4..685b388 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1395,6 +1395,7 @@ spark.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning.q,\
   spark_dynamic_partition_pruning_2.q,\
   spark_dynamic_partition_pruning_3.q,\
+  spark_dynamic_partition_pruning_mapjoin_only.q,\
   dynamic_rdd_cache.q, \
   spark_multi_insert_parallel_orderby.q,\
   spark_explainuser_1.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 21394c6..442c921 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -400,7 +400,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
 
       // Prune partitions
       if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
-          && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+          && HiveConf.isSparkDPPAny(job)) {
         SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
         try {
           pruner.prune(mrwork, job);

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index bd2d73e..b5a9796 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -111,7 +111,7 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
     FilterDesc desc = filter.getConf();
 
     if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) &&
-        !parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+        !parseContext.getConf().isSparkDPPAny()) {
       // nothing to do when the optimization is off
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java
new file mode 100644
index 0000000..15f7d9f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+
+/**
+ * Check if dynamic partition pruning should be disabled.  Currently the following 2 cases
+ * checked.
+ * 1.  The expected number of keys for dynamic pruning is too large
+ * 2.  If DPP enabled only for mapjoin and join is not a map join.
+ *
+ * Cloned from RemoveDynamicPruningBySize
+ */
+public class SparkRemoveDynamicPruning implements NodeProcessor {
+
+  static final private Logger LOG =
+      LoggerFactory.getLogger(SparkRemoveDynamicPruning.class.getName());
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext,
+      Object... nodeOutputs)
+      throws SemanticException {
+
+    OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
+    boolean remove = false;
+
+    SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd;
+    SparkPartitionPruningSinkDesc desc = op.getConf();
+
+    if (context.getConf().isSparkDPPOnlyMapjoin() &&
+        !op.isWithMapjoin()) {
+      LOG.info("Disabling dynamic partition pruning based on: " + desc.getTableScan().getName()
+          + ". This is not part of a map join.");
+      remove = true;
+    }
+    else if (desc.getStatistics().getDataSize() > context.getConf()
+        .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
+      LOG.info("Disabling dynamic partition pruning based on: "
+          + desc.getTableScan().getName()
+          + ". Expected data size is too big: " + desc.getStatistics().getDataSize());
+      remove = true;
+    }
+
+    if (remove) {
+      // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+      OperatorUtils.removeBranch(op);
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
deleted file mode 100644
index 26a1088..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer;
-
-import java.util.Stack;
-
-import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
-import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
-
-/**
- * If we expect the number of keys for dynamic pruning to be too large we
- * disable it.
- *
- * Cloned from RemoveDynamicPruningBySize
- */
-public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
-
-  static final private Logger LOG = LoggerFactory.getLogger(SparkRemoveDynamicPruningBySize.class.getName());
-
-  @Override
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext,
-      Object... nodeOutputs)
-      throws SemanticException {
-
-    OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
-
-    SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd;
-    SparkPartitionPruningSinkDesc desc = op.getConf();
-
-    if (desc.getStatistics().getDataSize() > context.getConf()
-        .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
-      OperatorUtils.removeBranch(op);
-      // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
-      LOG.info("Disabling dynamic pruning for: "
-          + desc.getTableScan().getName()
-          + ". Expected data size is too big: " + desc.getStatistics().getDataSize());
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index c195ee9..73e596e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.hive.ql.lib.TypeRule;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
 import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
-import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize;
+import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruning;
 import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
@@ -125,7 +125,7 @@ public class SparkCompiler extends TaskCompiler {
     runJoinOptimizations(procCtx);
 
     // Remove DPP based on expected size of the output data
-    runRemoveDynamicPruningBySize(procCtx);
+    runRemoveDynamicPruning(procCtx);
 
     // Remove cyclic dependencies for DPP
     runCycleAnalysisForPartitionPruning(procCtx);
@@ -133,13 +133,13 @@ public class SparkCompiler extends TaskCompiler {
     PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
   }
 
-  private void runRemoveDynamicPruningBySize(OptimizeSparkProcContext procCtx) throws SemanticException {
+  private void runRemoveDynamicPruning(OptimizeSparkProcContext procCtx) throws SemanticException {
     ParseContext pCtx = procCtx.getParseContext();
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
 
-    opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size",
+    opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning",
         SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
-        new SparkRemoveDynamicPruningBySize());
+        new SparkRemoveDynamicPruning());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -153,7 +153,7 @@ public class SparkCompiler extends TaskCompiler {
   }
 
   private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) {
-    if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+    if (!conf.isSparkDPPAny()) {
       return;
     }
 
@@ -265,7 +265,7 @@ public class SparkCompiler extends TaskCompiler {
 
   private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx)
       throws SemanticException {
-    if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+    if (!conf.isSparkDPPAny()) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
index 94230fd..e3146cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -27,7 +27,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
@@ -95,6 +97,53 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr
     }
   }
 
+  /* This function determines whether sparkpruningsink is with mapjoin.  This will be called
+     to check whether the tree should be split for dpp.  For mapjoin it won't be.  Also called
+     to determine whether dpp should be enabled for anything other than mapjoin.
+   */
+  public boolean isWithMapjoin() {
+    Operator<?> branchingOp = this.getBranchingOp();
+
+    // Check if this is a MapJoin. If so, do not split.
+    for (Operator<?> childOp : branchingOp.getChildOperators()) {
+      if (childOp instanceof ReduceSinkOperator &&
+          childOp.getChildOperators().get(0) instanceof MapJoinOperator) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /* Locate the op where the branch starts.  This function works only for the following pattern.
+   *     TS1       TS2
+   *      |         |
+   *     FIL       FIL
+   *      |         |
+   *      |     ---------
+   *      RS    |   |   |
+   *      |    RS  SEL SEL
+   *      |    /    |   |
+   *      |   /    GBY GBY
+   *      JOIN       |  |
+   *                 |  SPARKPRUNINGSINK
+   *                 |
+   *              SPARKPRUNINGSINK
+   */
+  public Operator<?> getBranchingOp() {
+    Operator<?> branchingOp = this;
+
+    while (branchingOp != null) {
+      if (branchingOp.getNumChild() > 1) {
+        break;
+      } else {
+        branchingOp = branchingOp.getParentOperators().get(0);
+      }
+    }
+
+    return branchingOp;
+  }
+
   private void flushToFile() throws IOException {
     // write an intermediate file to the specified path
     // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
index 933d6af..db0f54e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
@@ -89,30 +89,17 @@ public class SplitOpTreeForDPP implements NodeProcessor {
       }
     }
 
-    // Locate the op where the branch starts
-    // This is guaranteed to succeed since the branch always follow the pattern
-    // as shown in the first picture above.
-    Operator<?> branchingOp = pruningSinkOp;
-    while (branchingOp != null) {
-      if (branchingOp.getNumChild() > 1) {
-        break;
-      } else {
-        branchingOp = branchingOp.getParentOperators().get(0);
-      }
-    }
-
-    // Check if this is a MapJoin. If so, do not split.
-    for (Operator<?> childOp : branchingOp.getChildOperators()) {
-      if (childOp instanceof ReduceSinkOperator &&
-          childOp.getChildOperators().get(0) instanceof MapJoinOperator) {
-        context.pruningSinkSet.add(pruningSinkOp);
-        return null;
-      }
+    // If pruning sink operator is with map join, then pruning sink need not be split to a
+    // separate tree.  Add the pruning sink operator to context and return
+    if (pruningSinkOp.isWithMapjoin()) {
+      context.pruningSinkSet.add(pruningSinkOp);
+      return null;
     }
 
     List<Operator<?>> roots = new LinkedList<Operator<?>>();
     collectRoots(roots, pruningSinkOp);
 
+    Operator<?> branchingOp = pruningSinkOp.getBranchingOp();
     List<Operator<?>> savedChildOps = branchingOp.getChildOperators();
     List<Operator<?>> firstNodesOfPruningBranch = findFirstNodesOfPruningBranch(branchingOp);
     branchingOp.setChildOperators(Utilities.makeList(firstNodesOfPruningBranch.toArray(new

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
index 64baa6a..4cafa5e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
@@ -74,7 +74,7 @@ public class SyntheticJoinPredicate extends Transform {
         && pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
       enabled = true;
     } else if ((queryEngine.equals("spark")
-        && pctx.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING))) {
+        && pctx.getConf().isSparkDPPAny())) {
       enabled = true;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q
new file mode 100644
index 0000000..7c2164d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q
@@ -0,0 +1,50 @@
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.strict.checks.cartesian.product=false;
+
+-- srcpart_date is the small table that will use map join.  srcpart2 is the big table.
+-- both srcpart_date and srcpart2 will be joined with srcpart
+create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds;
+create table srcpart2 as select * from srcpart;
+
+-- enable map join and set the size to be small so that only join with srcpart_date gets to be a
+-- map join
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask.size=100;
+
+-- checking with dpp disabled
+-- expectation: 2 spark jobs
+EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11;
+
+-- checking with dpp enabled for all joins
+-- both join parts of srcpart_date and srcpart2 scans will result in partition pruning sink
+-- scan with srcpart2 will get split resulting in additional spark jobs
+-- expectation: 3 spark jobs
+set hive.spark.dynamic.partition.pruning=true;
+EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11;
+
+-- Restrict dpp to be enabled only for map joins
+-- expectation: 2 spark jobs
+set hive.spark.dynamic.partition.pruning.map.join.only=true;
+set hive.spark.dynamic.partition.pruning=false;
+EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11;
+
+drop table srcpart_date;
+drop table srcpart2;

http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out
new file mode 100644
index 0000000..564a6a2
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out
@@ -0,0 +1,492 @@
+PREHOOK: query: create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_date
+POSTHOOK: query: create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_date
+POSTHOOK: Lineage: srcpart_date.ds SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_date.ds2 SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ]
+PREHOOK: query: create table srcpart2 as select * from srcpart
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart2
+POSTHOOK: query: create table srcpart2 as select * from srcpart
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart2
+POSTHOOK: Lineage: srcpart2.ds SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart2.hr SIMPLE [(srcpart)srcpart.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart2.key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart2.value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_date
+                  filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+                  Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: ds (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col2 (type: string)
+                          1 _col0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  filterExpr: ((11.0 = 11.0) and ds is not null) (type: boolean)
+                  Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                    outputColumnNames: _col0, _col1, _col2, _col3
+                    Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col2 (type: string)
+                        1 _col0 (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col3 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col3 (type: string)
+                        Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart2
+                  filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+                    Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col3 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col3 (type: string)
+                        Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col3 (type: string)
+                  1 _col3 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9
+                Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+                  Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-3 depends on stages: Stage-2
+  Stage-1 depends on stages: Stage-3
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart2
+                  filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+                    Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col3 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            partition key expr: hr
+                            Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                            target column name: hr
+                            target work: Map 1
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_date
+                  filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+                  Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: ds (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col2 (type: string)
+                          1 _col0 (type: string)
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            partition key expr: ds
+                            Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                            target column name: ds
+                            target work: Map 1
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                    outputColumnNames: _col0, _col1, _col2, _col3
+                    Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col2 (type: string)
+                        1 _col0 (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col3 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col3 (type: string)
+                        Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart2
+                  filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+                    Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col3 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col3 (type: string)
+                        Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col3 (type: string)
+                  1 _col3 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9
+                Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+                  Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_date
+                  filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+                  Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: ds (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col2 (type: string)
+                          1 _col0 (type: string)
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            partition key expr: ds
+                            Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+                            target column name: ds
+                            target work: Map 1
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                    outputColumnNames: _col0, _col1, _col2, _col3
+                    Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col2 (type: string)
+                        1 _col0 (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col3 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col3 (type: string)
+                        Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart2
+                  filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+                    Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col3 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col3 (type: string)
+                        Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col3 (type: string)
+                  1 _col3 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9
+                Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+                  Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: drop table srcpart_date
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_date
+PREHOOK: Output: default@srcpart_date
+POSTHOOK: query: drop table srcpart_date
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_date
+POSTHOOK: Output: default@srcpart_date
+PREHOOK: query: drop table srcpart2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart2
+PREHOOK: Output: default@srcpart2
+POSTHOOK: query: drop table srcpart2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart2
+POSTHOOK: Output: default@srcpart2