You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/02/22 17:30:32 UTC
hive git commit: HIVE-15796: HoS: poor reducer parallelism when
operator stats are not accurate (Chao Sun, reviewed by Xuefu Zhang)
Repository: hive
Updated Branches:
refs/heads/master 8ab1889dd -> 806d6e1b0
HIVE-15796: HoS: poor reducer parallelism when operator stats are not accurate (Chao Sun, reviewed by Xuefu Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/806d6e1b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/806d6e1b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/806d6e1b
Branch: refs/heads/master
Commit: 806d6e1b01640e890fa751017d21fc4b107e4f0a
Parents: 8ab1889
Author: Chao Sun <su...@apache.org>
Authored: Fri Feb 17 12:22:45 2017 -0800
Committer: Chao Sun <su...@apache.org>
Committed: Wed Feb 22 09:28:56 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../test/resources/testconfiguration.properties | 3 +-
.../spark/SetSparkReducerParallelism.java | 79 ++++-
.../hive/ql/parse/spark/GenSparkUtils.java | 24 +-
.../hive/ql/parse/spark/SparkCompiler.java | 23 +-
.../queries/clientpositive/spark_use_op_stats.q | 41 +++
.../spark/spark_use_op_stats.q.out | 331 +++++++++++++++++++
7 files changed, 481 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3777fa9..0b315e1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3202,8 +3202,13 @@ public class HiveConf extends Configuration {
Constants.LLAP_LOGGER_NAME_CONSOLE),
"logger used for llap-daemons."),
+ SPARK_USE_OP_STATS("hive.spark.use.op.stats", true,
+ "Whether to use operator stats to determine reducer parallelism for Hive on Spark. "
+ + "If this is false, Hive will use source table stats to determine reducer "
+ + "parallelism for all first level reduce tasks, and the maximum reducer parallelism "
+ + "from all parents for all the rest (second level and onward) reducer tasks."),
SPARK_USE_FILE_SIZE_FOR_MAPJOIN("hive.spark.use.file.size.for.mapjoin", false,
- "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated"
+ "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated "
+ "with TableScan operator on the root of operator tree, instead of using operator statistics."),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 4a69bcc..d344464 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1483,7 +1483,8 @@ spark.only.query.files=spark_combine_equivalent_work.q,\
spark_dynamic_partition_pruning.q,\
spark_dynamic_partition_pruning_2.q,\
spark_vectorized_dynamic_partition_pruning.q,\
- spark_use_file_size_for_mapjoin.q
+ spark_use_file_size_for_mapjoin.q,\
+ spark_use_op_stats.q
miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
bucket4.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index 7a5b71f..337f418 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
import java.util.List;
+import java.util.Set;
import java.util.Stack;
import org.slf4j.Logger;
@@ -29,7 +30,9 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
@@ -57,6 +60,12 @@ public class SetSparkReducerParallelism implements NodeProcessor {
// Spark memory per task, and total number of cores
private ObjectPair<Long, Integer> sparkMemoryAndCores;
+ private final boolean useOpStats;
+
+ public SetSparkReducerParallelism(HiveConf conf) {
+ sparkMemoryAndCores = null;
+ useOpStats = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_OP_STATS);
+ }
@Override
public Object process(Node nd, Stack<Node> stack,
@@ -67,16 +76,28 @@ public class SetSparkReducerParallelism implements NodeProcessor {
ReduceSinkOperator sink = (ReduceSinkOperator) nd;
ReduceSinkDesc desc = sink.getConf();
+ Set<ReduceSinkOperator> parentSinks = null;
int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS);
int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+ if (!useOpStats) {
+ parentSinks = OperatorUtils.findOperatorsUpstream(sink, ReduceSinkOperator.class);
+ parentSinks.remove(sink);
+ if (!context.getVisitedReduceSinks().containsAll(parentSinks)) {
+ // We haven't processed all the parent sinks, and we need
+ // them to be done in order to compute the parallelism for this sink.
+ // In this case, skip. We should visit this again from another path.
+ LOG.debug("Skipping sink " + sink + " for now as we haven't seen all its parents.");
+ return false;
+ }
+ }
+
if (context.getVisitedReduceSinks().contains(sink)) {
// skip walking the children
LOG.debug("Already processed reduce sink: " + sink.getName());
return true;
}
-
context.getVisitedReduceSinks().add(sink);
if (needSetParallelism(sink, context.getConf())) {
@@ -96,19 +117,52 @@ public class SetSparkReducerParallelism implements NodeProcessor {
return false;
}
}
+
long numberOfBytes = 0;
- // we need to add up all the estimates from the siblings of this reduce sink
- for (Operator<? extends OperatorDesc> sibling
- : sink.getChildOperators().get(0).getParentOperators()) {
- if (sibling.getStatistics() != null) {
- numberOfBytes += sibling.getStatistics().getDataSize();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics());
+ if (useOpStats) {
+ // we need to add up all the estimates from the siblings of this reduce sink
+ for (Operator<? extends OperatorDesc> sibling
+ : sink.getChildOperators().get(0).getParentOperators()) {
+ if (sibling.getStatistics() != null) {
+ numberOfBytes += sibling.getStatistics().getDataSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics());
+ }
+ } else {
+ LOG.warn("No stats available from: " + sibling);
}
- } else {
- LOG.warn("No stats available from: " + sibling);
}
+ } else if (parentSinks.isEmpty()) {
+ // Not using OP stats and this is the first sink in the path, meaning that
+ // we should use TS stats to infer parallelism
+ for (Operator<? extends OperatorDesc> sibling
+ : sink.getChildOperators().get(0).getParentOperators()) {
+ Set<TableScanOperator> sources =
+ OperatorUtils.findOperatorsUpstream(sibling, TableScanOperator.class);
+ for (TableScanOperator source : sources) {
+ if (source.getStatistics() != null) {
+ numberOfBytes += source.getStatistics().getDataSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table source " + source + " has stats: " + source.getStatistics());
+ }
+ } else {
+ LOG.warn("No stats available from table source: " + source);
+ }
+ }
+ }
+ LOG.debug("Gathered stats for sink " + sink + ". Total size is "
+ + numberOfBytes + " bytes.");
+ } else {
+ // Use the maximum parallelism from all parent reduce sinks
+ int numberOfReducers = 0;
+ for (ReduceSinkOperator parent : parentSinks) {
+ numberOfReducers = Math.max(numberOfReducers, parent.getConf().getNumReducers());
+ }
+ desc.setNumReducers(numberOfReducers);
+ LOG.debug("Set parallelism for sink " + sink + " to " + numberOfReducers
+ + " based on its parents");
+ return false;
}
// Divide it by 2 so that we can have more reducers
@@ -134,7 +188,7 @@ public class SetSparkReducerParallelism implements NodeProcessor {
desc.setNumReducers(numReducers);
}
} else {
- LOG.info("Number of reducers determined to be: " + desc.getNumReducers());
+ LOG.info("Number of reducers for sink " + sink + " was already determined to be: " + desc.getNumReducers());
}
return false;
@@ -165,6 +219,9 @@ public class SetSparkReducerParallelism implements NodeProcessor {
}
private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException {
+ if (sparkMemoryAndCores != null) {
+ return;
+ }
if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) {
// If dynamic allocation is enabled, numbers for memory and cores are meaningless. So, we don't
// try to get it.
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 36bde30..d0a82af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -102,21 +102,21 @@ public class GenSparkUtils {
reduceWork.setReducer(root);
reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
- // All parents should be reduce sinks. We pick the one we just walked
- // to choose the number of reducers. In the join/union case they will
- // all be -1. In sort/order case where it matters there will be only
- // one parent.
- Preconditions.checkArgument(context.parentOfRoot instanceof ReduceSinkOperator,
- "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was "
- + context.parentOfRoot.getClass().getName());
- ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
-
- reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+ // Pick the maximum # reducers across all parents as the # of reduce tasks.
+ int maxExecutors = -1;
+ for (Operator<? extends OperatorDesc> parentOfRoot : root.getParentOperators()) {
+ Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator,
+ "AssertionError: expected parentOfRoot to be an "
+ + "instance of ReduceSinkOperator, but was "
+ + parentOfRoot.getClass().getName());
+ ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot;
+ maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers());
+ }
+ reduceWork.setNumReduceTasks(maxExecutors);
+ ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
setupReduceSink(context, reduceWork, reduceSink);
-
sparkWork.add(reduceWork);
-
SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork);
sparkWork.connect(context.preceedingWork, reduceWork, edgeProp);
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index c4b1640..682b987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.lib.TypeRule;
@@ -117,6 +118,9 @@ public class SparkCompiler extends TaskCompiler {
// Annotation OP tree with statistics
runStatsAnnotation(procCtx);
+ // Set reducer parallelism
+ runSetReducerParallelism(procCtx);
+
// Run Join releated optimizations
runJoinOptimizations(procCtx);
@@ -266,12 +270,27 @@ public class SparkCompiler extends TaskCompiler {
}
}
- private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException {
+ private void runSetReducerParallelism(OptimizeSparkProcContext procCtx) throws SemanticException {
ParseContext pCtx = procCtx.getParseContext();
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"),
- new SetSparkReducerParallelism());
+ new SetSparkReducerParallelism(pCtx.getConf()));
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ GraphWalker ogw = new PreOrderWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ }
+
+ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException {
+ ParseContext pCtx = procCtx.getParseContext();
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx));
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/test/queries/clientpositive/spark_use_op_stats.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_use_op_stats.q b/ql/src/test/queries/clientpositive/spark_use_op_stats.q
new file mode 100644
index 0000000..b559bc0
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_use_op_stats.q
@@ -0,0 +1,41 @@
+set hive.mapred.mode=nonstrict;
+set hive.spark.use.op.stats=false;
+set hive.auto.convert.join=false;
+set hive.exec.reducers.bytes.per.reducer=500;
+
+EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+CREATE TEMPORARY TABLE tmp AS
+SELECT * FROM src WHERE key > 50 AND key < 200;
+
+EXPLAIN
+WITH a AS (
+ SELECT src1.key, src2.value
+ FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+ WHERE src1.key > 100
+),
+b AS (
+ SELECT src1.key, src2.value
+ FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+ WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key;
+
+WITH a AS (
+ SELECT src1.key, src2.value
+ FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+ WHERE src1.key > 100
+),
+b AS (
+ SELECT src1.key, src2.value
+ FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+ WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key;
http://git-wip-us.apache.org/repos/asf/hive/blob/806d6e1b/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out b/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out
new file mode 100644
index 0000000..76f9936
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out
@@ -0,0 +1,331 @@
+PREHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 43), Map 3 (PARTITION-LEVEL SORT, 43)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col2
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col2 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+97 val_97
+97 val_97
+97 val_97
+97 val_97
+PREHOOK: query: CREATE TEMPORARY TABLE tmp AS
+SELECT * FROM src WHERE key > 50 AND key < 200
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tmp
+POSTHOOK: query: CREATE TEMPORARY TABLE tmp AS
+SELECT * FROM src WHERE key > 50 AND key < 200
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tmp
+PREHOOK: query: EXPLAIN
+WITH a AS (
+ SELECT src1.key, src2.value
+ FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+ WHERE src1.key > 100
+),
+b AS (
+ SELECT src1.key, src2.value
+ FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+ WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+WITH a AS (
+ SELECT src1.key, src2.value
+ FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+ WHERE src1.key > 100
+),
+b AS (
+ SELECT src1.key, src2.value
+ FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+ WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 13), Map 5 (PARTITION-LEVEL SORT, 13)
+ Reducer 3 <- Reducer 2 (PARTITION-LEVEL SORT, 43), Reducer 7 (PARTITION-LEVEL SORT, 43)
+ Reducer 4 <- Reducer 3 (GROUP, 1)
+ Reducer 7 <- Map 6 (PARTITION-LEVEL SORT, 43), Map 8 (PARTITION-LEVEL SORT, 43)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 148 Data size: 1542 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((UDFToDouble(key) > 100.0) and (UDFToDouble(key) > 150.0)) (type: boolean)
+ Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+ Map 5
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 148 Data size: 1542 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((UDFToDouble(key) > 100.0) and (UDFToDouble(key) > 150.0)) (type: boolean)
+ Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE
+ Map 6
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((UDFToDouble(key) > 150.0) and (UDFToDouble(key) > 100.0)) (type: boolean)
+ Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+ Map 8
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((UDFToDouble(key) > 150.0) and (UDFToDouble(key) > 100.0)) (type: boolean)
+ Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 17 Data size: 182 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 17 Data size: 182 Basic stats: COMPLETE Column stats: NONE
+ Reducer 3
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col2
+ Statistics: Num rows: 66 Data size: 706 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(_col0,_col2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 66 Data size: 706 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 4
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 7
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col2
+ Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col2 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: string)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: WITH a AS (
+ SELECT src1.key, src2.value
+ FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+ WHERE src1.key > 100
+),
+b AS (
+ SELECT src1.key, src2.value
+ FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+ WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@tmp
+#### A masked pattern was here ####
+POSTHOOK: query: WITH a AS (
+ SELECT src1.key, src2.value
+ FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key)
+ WHERE src1.key > 100
+),
+b AS (
+ SELECT src1.key, src2.value
+ FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+ WHERE src1.key > 150
+)
+SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@tmp
+#### A masked pattern was here ####
+180817551380