You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/07/31 01:33:24 UTC
hive git commit: HIVE-16998: Add config to enable HoS DPP only for
map-joins (Janaki Lahorani, reviewed by Sahil Takiar)
Repository: hive
Updated Branches:
refs/heads/master 483ac74d8 -> 723b4ef7f
HIVE-16998: Add config to enable HoS DPP only for map-joins (Janaki Lahorani, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/723b4ef7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/723b4ef7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/723b4ef7
Branch: refs/heads/master
Commit: 723b4ef7f923780275e08bbbd1201efd0248a675
Parents: 483ac74
Author: Janaki Lahorani <ja...@cloudera.com>
Authored: Sun Jul 30 18:33:25 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sun Jul 30 18:33:25 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 29 ++
.../test/resources/testconfiguration.properties | 1 +
.../hadoop/hive/ql/io/HiveInputFormat.java | 2 +-
.../DynamicPartitionPruningOptimization.java | 2 +-
.../ql/optimizer/SparkRemoveDynamicPruning.java | 80 +++
.../SparkRemoveDynamicPruningBySize.java | 65 ---
.../hive/ql/parse/spark/SparkCompiler.java | 14 +-
.../SparkPartitionPruningSinkOperator.java | 49 ++
.../hive/ql/parse/spark/SplitOpTreeForDPP.java | 25 +-
.../hive/ql/ppd/SyntheticJoinPredicate.java | 2 +-
...ark_dynamic_partition_pruning_mapjoin_only.q | 50 ++
...dynamic_partition_pruning_mapjoin_only.q.out | 492 +++++++++++++++++++
12 files changed, 717 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 20b193c..7244a66 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3426,6 +3426,10 @@ public class HiveConf extends Configuration {
SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE(
"hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
"Maximum total data size in dynamic pruning."),
+ SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY(
+ "hive.spark.dynamic.partition.pruning.map.join.only", false,
+ "Turn on dynamic partition pruning only for map joins.\n" +
+ "If hive.spark.dynamic.partition.pruning is set to true, this parameter value is ignored."),
SPARK_USE_GROUPBY_SHUFFLE(
"hive.spark.use.groupby.shuffle", true,
"Spark groupByKey transformation has better performance but uses unbounded memory." +
@@ -4045,6 +4049,17 @@ public class HiveConf extends Configuration {
conf.setBoolean(var.varname, val);
}
+ /* Dynamic partition pruning is enabled in some or all cases if either
+ * hive.spark.dynamic.partition.pruning is true or
+ * hive.spark.dynamic.partition.pruning.map.join.only is true
+ */
+ public static boolean isSparkDPPAny(Configuration conf) {
+ return (conf.getBoolean(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING.varname,
+ ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING.defaultBoolVal) ||
+ conf.getBoolean(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY.varname,
+ ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY.defaultBoolVal));
+ }
+
public boolean getBoolVar(ConfVars var) {
return getBoolVar(this, var);
}
@@ -4658,6 +4673,20 @@ public class HiveConf extends Configuration {
return isWebUiEnabled() && this.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_HISTORIC_QUERIES) > 0;
}
+ /* Dynamic partition pruning is enabled in some or all cases
+ */
+ public boolean isSparkDPPAny() {
+ return isSparkDPPAny(this);
+ }
+
+ /* Dynamic partition pruning is enabled only for map join
+ * hive.spark.dynamic.partition.pruning is false and
+ * hive.spark.dynamic.partition.pruning.map.join.only is true
+ */
+ public boolean isSparkDPPOnlyMapjoin() {
+ return (!this.getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING) &&
+ this.getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY));
+ }
public static boolean isLoadMetastoreConfig() {
return loadMetastoreConfig;
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index ba6c3e4..685b388 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1395,6 +1395,7 @@ spark.only.query.files=spark_combine_equivalent_work.q,\
spark_dynamic_partition_pruning.q,\
spark_dynamic_partition_pruning_2.q,\
spark_dynamic_partition_pruning_3.q,\
+ spark_dynamic_partition_pruning_mapjoin_only.q,\
dynamic_rdd_cache.q, \
spark_multi_insert_parallel_orderby.q,\
spark_explainuser_1.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 21394c6..442c921 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -400,7 +400,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
// Prune partitions
if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
- && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ && HiveConf.isSparkDPPAny(job)) {
SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
try {
pruner.prune(mrwork, job);
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index bd2d73e..b5a9796 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -111,7 +111,7 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
FilterDesc desc = filter.getConf();
if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) &&
- !parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ !parseContext.getConf().isSparkDPPAny()) {
// nothing to do when the optimization is off
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java
new file mode 100644
index 0000000..15f7d9f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+
+/**
+ * Check if dynamic partition pruning should be disabled. Currently the following 2 cases
+ * checked.
+ * 1. The expected number of keys for dynamic pruning is too large
+ * 2. If DPP enabled only for mapjoin and join is not a map join.
+ *
+ * Cloned from RemoveDynamicPruningBySize
+ */
+public class SparkRemoveDynamicPruning implements NodeProcessor {
+
+ static final private Logger LOG =
+ LoggerFactory.getLogger(SparkRemoveDynamicPruning.class.getName());
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext,
+ Object... nodeOutputs)
+ throws SemanticException {
+
+ OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
+ boolean remove = false;
+
+ SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd;
+ SparkPartitionPruningSinkDesc desc = op.getConf();
+
+ if (context.getConf().isSparkDPPOnlyMapjoin() &&
+ !op.isWithMapjoin()) {
+ LOG.info("Disabling dynamic partition pruning based on: " + desc.getTableScan().getName()
+ + ". This is not part of a map join.");
+ remove = true;
+ }
+ else if (desc.getStatistics().getDataSize() > context.getConf()
+ .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
+ LOG.info("Disabling dynamic partition pruning based on: "
+ + desc.getTableScan().getName()
+ + ". Expected data size is too big: " + desc.getStatistics().getDataSize());
+ remove = true;
+ }
+
+ if (remove) {
+ // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+ OperatorUtils.removeBranch(op);
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
deleted file mode 100644
index 26a1088..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer;
-
-import java.util.Stack;
-
-import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
-import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
-
-/**
- * If we expect the number of keys for dynamic pruning to be too large we
- * disable it.
- *
- * Cloned from RemoveDynamicPruningBySize
- */
-public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
-
- static final private Logger LOG = LoggerFactory.getLogger(SparkRemoveDynamicPruningBySize.class.getName());
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext,
- Object... nodeOutputs)
- throws SemanticException {
-
- OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
-
- SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd;
- SparkPartitionPruningSinkDesc desc = op.getConf();
-
- if (desc.getStatistics().getDataSize() > context.getConf()
- .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
- OperatorUtils.removeBranch(op);
- // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
- LOG.info("Disabling dynamic pruning for: "
- + desc.getTableScan().getName()
- + ". Expected data size is too big: " + desc.getStatistics().getDataSize());
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index c195ee9..73e596e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.hive.ql.lib.TypeRule;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
-import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize;
+import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruning;
import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
@@ -125,7 +125,7 @@ public class SparkCompiler extends TaskCompiler {
runJoinOptimizations(procCtx);
// Remove DPP based on expected size of the output data
- runRemoveDynamicPruningBySize(procCtx);
+ runRemoveDynamicPruning(procCtx);
// Remove cyclic dependencies for DPP
runCycleAnalysisForPartitionPruning(procCtx);
@@ -133,13 +133,13 @@ public class SparkCompiler extends TaskCompiler {
PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
}
- private void runRemoveDynamicPruningBySize(OptimizeSparkProcContext procCtx) throws SemanticException {
+ private void runRemoveDynamicPruning(OptimizeSparkProcContext procCtx) throws SemanticException {
ParseContext pCtx = procCtx.getParseContext();
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size",
+ opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning",
SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
- new SparkRemoveDynamicPruningBySize());
+ new SparkRemoveDynamicPruning());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -153,7 +153,7 @@ public class SparkCompiler extends TaskCompiler {
}
private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) {
- if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ if (!conf.isSparkDPPAny()) {
return;
}
@@ -265,7 +265,7 @@ public class SparkCompiler extends TaskCompiler {
private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx)
throws SemanticException {
- if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ if (!conf.isSparkDPPAny()) {
return;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
index 94230fd..e3146cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -27,7 +27,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
@@ -95,6 +97,53 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr
}
}
+ /* This function determines whether sparkpruningsink is with mapjoin. This will be called
+ to check whether the tree should be split for dpp. For mapjoin it won't be. Also called
+ to determine whether dpp should be enabled for anything other than mapjoin.
+ */
+ public boolean isWithMapjoin() {
+ Operator<?> branchingOp = this.getBranchingOp();
+
+ // Check if this is a MapJoin. If so, do not split.
+ for (Operator<?> childOp : branchingOp.getChildOperators()) {
+ if (childOp instanceof ReduceSinkOperator &&
+ childOp.getChildOperators().get(0) instanceof MapJoinOperator) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /* Locate the op where the branch starts. This function works only for the following pattern.
+ * TS1 TS2
+ * | |
+ * FIL FIL
+ * | |
+ * | ---------
+ * RS | | |
+ * | RS SEL SEL
+ * | / | |
+ * | / GBY GBY
+ * JOIN | |
+ * | SPARKPRUNINGSINK
+ * |
+ * SPARKPRUNINGSINK
+ */
+ public Operator<?> getBranchingOp() {
+ Operator<?> branchingOp = this;
+
+ while (branchingOp != null) {
+ if (branchingOp.getNumChild() > 1) {
+ break;
+ } else {
+ branchingOp = branchingOp.getParentOperators().get(0);
+ }
+ }
+
+ return branchingOp;
+ }
+
private void flushToFile() throws IOException {
// write an intermediate file to the specified path
// the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
index 933d6af..db0f54e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
@@ -89,30 +89,17 @@ public class SplitOpTreeForDPP implements NodeProcessor {
}
}
- // Locate the op where the branch starts
- // This is guaranteed to succeed since the branch always follow the pattern
- // as shown in the first picture above.
- Operator<?> branchingOp = pruningSinkOp;
- while (branchingOp != null) {
- if (branchingOp.getNumChild() > 1) {
- break;
- } else {
- branchingOp = branchingOp.getParentOperators().get(0);
- }
- }
-
- // Check if this is a MapJoin. If so, do not split.
- for (Operator<?> childOp : branchingOp.getChildOperators()) {
- if (childOp instanceof ReduceSinkOperator &&
- childOp.getChildOperators().get(0) instanceof MapJoinOperator) {
- context.pruningSinkSet.add(pruningSinkOp);
- return null;
- }
+ // If pruning sink operator is with map join, then pruning sink need not be split to a
+ // separate tree. Add the pruning sink operator to context and return
+ if (pruningSinkOp.isWithMapjoin()) {
+ context.pruningSinkSet.add(pruningSinkOp);
+ return null;
}
List<Operator<?>> roots = new LinkedList<Operator<?>>();
collectRoots(roots, pruningSinkOp);
+ Operator<?> branchingOp = pruningSinkOp.getBranchingOp();
List<Operator<?>> savedChildOps = branchingOp.getChildOperators();
List<Operator<?>> firstNodesOfPruningBranch = findFirstNodesOfPruningBranch(branchingOp);
branchingOp.setChildOperators(Utilities.makeList(firstNodesOfPruningBranch.toArray(new
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
index 64baa6a..4cafa5e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
@@ -74,7 +74,7 @@ public class SyntheticJoinPredicate extends Transform {
&& pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
enabled = true;
} else if ((queryEngine.equals("spark")
- && pctx.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING))) {
+ && pctx.getConf().isSparkDPPAny())) {
enabled = true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q
new file mode 100644
index 0000000..7c2164d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q
@@ -0,0 +1,50 @@
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.strict.checks.cartesian.product=false;
+
+-- srcpart_date is the small table that will use map join. srcpart2 is the big table.
+-- both srcpart_date and srcpart2 will be joined with srcpart
+create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds;
+create table srcpart2 as select * from srcpart;
+
+-- enable map join and set the size to be small so that only join with srcpart_date gets to be a
+-- map join
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask.size=100;
+
+-- checking with dpp disabled
+-- expectation: 2 spark jobs
+EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11;
+
+-- checking with dpp enabled for all joins
+-- both join parts of srcpart_date and srcpart2 scans will result in partition pruning sink
+-- scan with srcpart2 will get split resulting in additional spark jobs
+-- expectation: 3 spark jobs
+set hive.spark.dynamic.partition.pruning=true;
+EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11;
+
+-- Restrict dpp to be enabled only for map joins
+-- expectation: 2 spark jobs
+set hive.spark.dynamic.partition.pruning.map.join.only=true;
+set hive.spark.dynamic.partition.pruning=false;
+EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11;
+
+drop table srcpart_date;
+drop table srcpart2;
http://git-wip-us.apache.org/repos/asf/hive/blob/723b4ef7/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out
new file mode 100644
index 0000000..564a6a2
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out
@@ -0,0 +1,492 @@
+PREHOOK: query: create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_date
+POSTHOOK: query: create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_date
+POSTHOOK: Lineage: srcpart_date.ds SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart_date.ds2 SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ]
+PREHOOK: query: create table srcpart2 as select * from srcpart
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart2
+POSTHOOK: query: create table srcpart2 as select * from srcpart
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart2
+POSTHOOK: Lineage: srcpart2.ds SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart2.hr SIMPLE [(srcpart)srcpart.FieldSchema(name:hr, type:string, comment:null), ]
+POSTHOOK: Lineage: srcpart2.key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart2.value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-1 depends on stages: Stage-2
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-2
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_date
+ filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+ Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: ds (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ keys:
+ 0 _col2 (type: string)
+ 1 _col0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ filterExpr: ((11.0 = 11.0) and ds is not null) (type: boolean)
+ Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col2 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ input vertices:
+ 1 Map 3
+ Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col3 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col3 (type: string)
+ Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string)
+ Local Work:
+ Map Reduce Local Work
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: srcpart2
+ filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col3 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col3 (type: string)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col3 (type: string)
+ 1 _col3 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-3 depends on stages: Stage-2
+ Stage-1 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-2
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 5
+ Map Operator Tree:
+ TableScan
+ alias: srcpart2
+ filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col3 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: _col0 (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Spark Partition Pruning Sink Operator
+ partition key expr: hr
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ target column name: hr
+ target work: Map 1
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_date
+ filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+ Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: ds (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ keys:
+ 0 _col2 (type: string)
+ 1 _col0 (type: string)
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: _col0 (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Spark Partition Pruning Sink Operator
+ partition key expr: ds
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ target column name: ds
+ target work: Map 1
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col2 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ input vertices:
+ 1 Map 3
+ Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col3 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col3 (type: string)
+ Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string)
+ Local Work:
+ Map Reduce Local Work
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: srcpart2
+ filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col3 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col3 (type: string)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col3 (type: string)
+ 1 _col3 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select *
+ from srcpart
+ join srcpart_date on (srcpart.ds = srcpart_date.ds)
+ join srcpart2 on (srcpart.hr = srcpart2.hr)
+ where srcpart_date.ds2 = '2008-04-08'
+ and srcpart2.hr = 11
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-1 depends on stages: Stage-2
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-2
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_date
+ filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+ Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean)
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: ds (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ keys:
+ 0 _col2 (type: string)
+ 1 _col0 (type: string)
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: _col0 (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ Spark Partition Pruning Sink Operator
+ partition key expr: ds
+ Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
+ target column name: ds
+ target work: Map 1
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col2 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ input vertices:
+ 1 Map 3
+ Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col3 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col3 (type: string)
+ Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string)
+ Local Work:
+ Map Reduce Local Work
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: srcpart2
+ filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(hr) = 11.0) (type: boolean)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col3 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col3 (type: string)
+ Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col3 (type: string)
+ 1 _col3 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: drop table srcpart_date
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_date
+PREHOOK: Output: default@srcpart_date
+POSTHOOK: query: drop table srcpart_date
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_date
+POSTHOOK: Output: default@srcpart_date
+PREHOOK: query: drop table srcpart2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart2
+PREHOOK: Output: default@srcpart2
+POSTHOOK: query: drop table srcpart2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart2
+POSTHOOK: Output: default@srcpart2