You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/02/17 20:07:47 UTC
hive git commit: HIVE-15489: Alternatively use table scan stats for
HoS (Chao Sun, reviewed by Xuefu Zhang)
Repository: hive
Updated Branches:
refs/heads/master bba18181a -> 368d916b3
HIVE-15489: Alternatively use table scan stats for HoS (Chao Sun, reviewed by Xuefu Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/368d916b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/368d916b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/368d916b
Branch: refs/heads/master
Commit: 368d916b369f1adc58da884463b1dedb8c010616
Parents: bba1818
Author: Chao Sun <su...@apache.org>
Authored: Thu Jan 19 16:42:49 2017 -0800
Committer: Chao Sun <su...@apache.org>
Committed: Fri Feb 17 12:06:48 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../test/resources/testconfiguration.properties | 3 +-
.../hadoop/hive/ql/exec/OperatorUtils.java | 34 +++
.../SparkRemoveDynamicPruningBySize.java | 4 +-
.../optimizer/spark/SparkMapJoinOptimizer.java | 34 ++-
.../hive/ql/parse/spark/GenSparkUtils.java | 36 +--
.../hive/ql/parse/spark/SparkCompiler.java | 3 +-
.../spark_use_file_size_for_mapjoin.q | 30 +++
.../spark/spark_use_file_size_for_mapjoin.q.out | 257 +++++++++++++++++++
9 files changed, 364 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0bff243..1af59ba 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3193,6 +3193,9 @@ public class HiveConf extends Configuration {
Constants.LLAP_LOGGER_NAME_CONSOLE),
"logger used for llap-daemons."),
+ SPARK_USE_FILE_SIZE_FOR_MAPJOIN("hive.spark.use.file.size.for.mapjoin", false,
+ "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated"
+ + "with TableScan operator on the root of operator tree, instead of using operator statistics."),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
"Timeout for requests from Hive client to remote Spark driver."),
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e8db920..7c54275 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1481,7 +1481,8 @@ spark.query.files=add_part_multiple.q, \
spark.only.query.files=spark_combine_equivalent_work.q,\
spark_dynamic_partition_pruning.q,\
spark_dynamic_partition_pruning_2.q,\
- spark_vectorized_dynamic_partition_pruning.q
+ spark_vectorized_dynamic_partition_pruning.q,\
+ spark_use_file_size_for_mapjoin.q
miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
bucket4.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index d294e25..5bbfe12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -346,4 +346,38 @@ public class OperatorUtils {
}
}
}
+
+ /**
+ * Given the input operator 'op', walk up the operator tree from 'op', and collect all the
+ * roots that can be reached from it. The results are stored in 'roots'.
+ */
+ public static void findRoots(Operator<?> op, Collection<Operator<?>> roots) {
+ List<Operator<?>> parents = op.getParentOperators();
+ if (parents == null || parents.isEmpty()) {
+ roots.add(op);
+ return;
+ }
+ for (Operator<?> p : parents) {
+ findRoots(p, roots);
+ }
+ }
+
+ /**
+ * Remove the branch that contains the specified operator. Do nothing if there's no branching,
+ * i.e. all the upstream operators have only one child.
+ */
+ public static void removeBranch(Operator<?> op) {
+ Operator<?> child = op;
+ Operator<?> curr = op;
+
+ while (curr.getChildOperators().size() <= 1) {
+ child = curr;
+ if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
+ return;
+ }
+ curr = curr.getParentOperators().get(0);
+ }
+
+ curr.removeChild(child);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
index c41a0c8..26a1088 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimizer;
import java.util.Stack;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
@@ -54,7 +54,7 @@ public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
if (desc.getStatistics().getDataSize() > context.getConf()
.getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
- GenSparkUtils.removeBranch(op);
+ OperatorUtils.removeBranch(op);
// at this point we've found the fork in the op pipeline that has the pruning as a child plan.
LOG.info("Disabling dynamic pruning for: "
+ desc.getTableScan().getName()
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 7faff88..d8f37ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -191,12 +193,40 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
int pos = 0;
// bigTableFound means we've encountered a table that's bigger than the
- // max. This table is either the the big table or we cannot convert.
+ // max. This table is either the big table or we cannot convert.
boolean bigTableFound = false;
+ boolean useTsStats = context.getConf().getBoolean(HiveConf.ConfVars.SPARK_USE_FILE_SIZE_FOR_MAPJOIN.varname, false);
+ boolean hasUpstreamSinks = false;
+ // Check whether there's any upstream RS.
+ // If so, don't use TS stats because they could be inaccurate.
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ Set<ReduceSinkOperator> parentSinks =
+ OperatorUtils.findOperatorsUpstream(parentOp, ReduceSinkOperator.class);
+ parentSinks.remove(parentOp);
+ if (!parentSinks.isEmpty()) {
+ hasUpstreamSinks = true;
+ }
+ }
+
+ // If we are using TS stats and this JOIN has at least one upstream RS, disable MapJoin conversion.
+ if (useTsStats && hasUpstreamSinks) {
+ return new long[]{-1, 0, 0};
+ }
+
+ for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ Statistics currInputStat;
+ if (useTsStats) {
+ currInputStat = new Statistics();
+ // Find all root TSs and add up all data sizes
+ // Not adding other stats (e.g., # of rows, col stats) since only data size is used here
+ for (TableScanOperator root : OperatorUtils.findOperatorsUpstream(parentOp, TableScanOperator.class)) {
+ currInputStat.addToDataSize(root.getStatistics().getDataSize());
+ }
+ } else {
+ currInputStat = parentOp.getStatistics();
+ }
- Statistics currInputStat = parentOp.getStatistics();
if (currInputStat == null) {
LOG.warn("Couldn't get statistics from: " + parentOp);
return new long[]{-1, 0, 0};
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 7b2b3c0..36bde30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
@@ -573,7 +574,7 @@ public class GenSparkUtils {
*/
public BaseWork getEnclosingWork(Operator<?> op, GenSparkProcContext procCtx) {
List<Operator<?>> ops = new ArrayList<Operator<?>>();
- findRoots(op, ops);
+ OperatorUtils.findRoots(op, ops);
for (Operator<?> r : ops) {
BaseWork work = procCtx.rootToWorkMap.get(r);
if (work != null) {
@@ -582,37 +583,4 @@ public class GenSparkUtils {
}
return null;
}
-
- /*
- * findRoots returns all root operators (in ops) that result in operator op
- */
- private void findRoots(Operator<?> op, List<Operator<?>> ops) {
- List<Operator<?>> parents = op.getParentOperators();
- if (parents == null || parents.isEmpty()) {
- ops.add(op);
- return;
- }
- for (Operator<?> p : parents) {
- findRoots(p, ops);
- }
- }
-
- /**
- * Remove the branch that contains the specified operator. Do nothing if there's no branching,
- * i.e. all the upstream operators have only one child.
- */
- public static void removeBranch(Operator<?> op) {
- Operator<?> child = op;
- Operator<?> curr = op;
-
- while (curr.getChildOperators().size() <= 1) {
- child = curr;
- if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
- return;
- }
- curr = curr.getParentOperators().get(0);
- }
-
- curr.removeChild(child);
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 71528e8..c4b1640 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -169,7 +170,7 @@ public class SparkCompiler extends TaskCompiler {
return;
}
- GenSparkUtils.removeBranch(toRemove);
+ OperatorUtils.removeBranch(toRemove);
// at this point we've found the fork in the op pipeline that has the pruning as a child plan.
LOG.info("Disabling dynamic pruning for: "
+ toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency");
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q b/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q
new file mode 100644
index 0000000..b623b83
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_use_file_size_for_mapjoin.q
@@ -0,0 +1,30 @@
+set hive.mapred.mode=nonstrict;
+set hive.auto.convert.join=true;
+set hive.spark.use.file.size.for.mapjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=4000;
+
+EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97;
+
+set hive.auto.convert.join.noconditionaltask.size=8000;
+
+-- This is copied from auto_join2. Without the configuration both joins are mapjoins,
+-- but with the configuration on, Hive should not turn the second join into mapjoin since it
+-- has a upstream reduce sink.
+
+CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value;
+
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value;
+
+SELECT sum(hash(dest.key,dest.value)) FROM dest;
http://git-wip-us.apache.org/repos/asf/hive/blob/368d916b/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out
new file mode 100644
index 0000000..9044140
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_use_file_size_for_mapjoin.q.out
@@ -0,0 +1,257 @@
+PREHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(key) = 97.0) (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col2
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col2 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT src1.key, src2.value
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+WHERE src1.key = 97
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+97 val_97
+97 val_97
+97 val_97
+97 val_97
+PREHOOK: query: CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dest
+POSTHOOK: query: CREATE TABLE dest(key INT, value STRING) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dest
+PREHOOK: query: EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-3 is a root stage
+ Stage-1 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-1
+ Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col1
+ input vertices:
+ 1 Map 3
+ Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double)
+ sort order: +
+ Map-reduce partition columns: (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double)
+ Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: src3
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: UDFToDouble(_col0) (type: double)
+ sort order: +
+ Map-reduce partition columns: UDFToDouble(_col0) (type: double)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 (UDFToDouble(_col0) + UDFToDouble(_col1)) (type: double)
+ 1 UDFToDouble(_col0) (type: double)
+ outputColumnNames: _col0, _col3
+ Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(_col0) (type: int), _col3 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest
+
+ Stage: Stage-2
+ Stats-Aggr Operator
+
+PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@dest
+POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key)
+INSERT OVERWRITE TABLE dest SELECT src1.key, src3.value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@dest
+POSTHOOK: Lineage: dest.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest.value SIMPLE [(src)src3.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: SELECT sum(hash(dest.key,dest.value)) FROM dest
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT sum(hash(dest.key,dest.value)) FROM dest
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest
+#### A masked pattern was here ####
+33815990627