You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/08/30 15:54:10 UTC
hive git commit: HIVE-17276: Check max shuffle size when converting
to dynamically partitioned hash join (Jesus Camacho Rodriguez,
reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master dd04a92f7 -> b67d52c29
HIVE-17276: Check max shuffle size when converting to dynamically partitioned hash join (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b67d52c2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b67d52c2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b67d52c2
Branch: refs/heads/master
Commit: b67d52c294c1c2db21ed89c86486c946aa5d3ca4
Parents: dd04a92
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Tue Aug 29 10:36:29 2017 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Aug 30 08:29:33 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 55 +++++--
.../queries/clientpositive/join_max_hashtable.q | 13 ++
.../llap/join_max_hashtable.q.out | 154 +++++++++++++++++++
4 files changed, 217 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0d8d7ae..e4b09a2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1435,9 +1435,14 @@ public class HiveConf extends Configuration {
HIVECONVERTJOINMAXENTRIESHASHTABLE("hive.auto.convert.join.hashtable.max.entries", 40000000L,
"If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" +
- "However, if it is on, and the predicated number of entries in hashtable for a given join \n" +
+ "However, if it is on, and the predicted number of entries in hashtable for a given join \n" +
"input is larger than this number, the join will not be converted to a mapjoin. \n" +
"The value \"-1\" means no limit."),
+ HIVECONVERTJOINMAXSHUFFLESIZE("hive.auto.convert.join.shuffle.max.size", 10000000L,
+ "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" +
+ "However, if it is on, and the predicted size of the larger input for a given join is greater \n" +
+ "than this number, the join will not be converted to a dynamically partitioned hash join. \n" +
+ "The value \"-1\" means no limit."),
HIVEHASHTABLEKEYCOUNTADJUSTMENT("hive.hashtable.key.count.adjustment", 1.0f,
"Adjustment to mapjoin hashtable size derived from table and column statistics; the estimate" +
" of the number of keys is divided by this value. If the value is 0, statistics are not used" +
http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 21d0053..a2414f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -579,12 +579,12 @@ public class ConvertJoinMapJoin implements NodeProcessor {
* for Dynamic Hash Join conversion consideration
* @param skipJoinTypeChecks whether to skip join type checking
* @param maxSize size threshold for Map Join conversion
- * @param checkHashTableEntries whether to check threshold for distinct keys in hash table for Map Join
+ * @param checkMapJoinThresholds whether to check thresholds to convert to Map Join
* @return returns big table position or -1 if it cannot be determined
* @throws SemanticException
*/
public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
- int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkHashTableEntries)
+ int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkMapJoinThresholds)
throws SemanticException {
if (!skipJoinTypeChecks) {
/*
@@ -634,6 +634,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// total size of the inputs
long totalSize = 0;
+ // convert to DPHJ
+ boolean convertDPHJ = false;
+
for (int pos = 0; pos < joinOp.getParentOperators().size(); pos++) {
Operator<? extends OperatorDesc> parentOp = joinOp.getParentOperators().get(pos);
@@ -693,19 +696,19 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// We are replacing the current big table with a new one, thus
// we need to count the current one as a map table then.
totalSize += bigInputStat.getDataSize();
- // Check if number of distinct keys is larger than given max
- // number of entries for HashMap. If it is, we do not convert.
- if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) {
- return -1;
+ // Check if number of distinct keys is greater than given max number of entries
+ // for HashMap
+ if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) {
+ convertDPHJ = true;
}
} else if (!selectedBigTable) {
// This is not the first table and we are not using it as big table,
// in fact, we're adding this table as a map table
totalSize += inputSize;
- // Check if number of distinct keys is larger than given max
- // number of entries for HashMap. If it is, we do not convert.
- if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) {
- return -1;
+ // Check if number of distinct keys is greater than given max number of entries
+ // for HashMap
+ if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) {
+ convertDPHJ = true;
}
}
@@ -723,6 +726,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
+ // Check if size of data to shuffle (larger table) is less than given max size
+ if (checkMapJoinThresholds && convertDPHJ
+ && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) {
+ LOG.debug("Conditions to convert to MapJoin are not met");
+ return -1;
+ }
+
// We store the total memory that this MapJoin is going to use,
// which is calculated as totalSize/buckets, with totalSize
// equal to sum of small tables size.
@@ -1087,13 +1097,36 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (estimation > max) {
// Estimation larger than max
LOG.debug("Number of different entries for HashTable is greater than the max; "
- + "we do not converting to MapJoin");
+ + "we do not convert to MapJoin");
return false;
}
// We can proceed with the conversion
return true;
}
+ /* Returns true if it passes the test, false otherwise. */
+ private boolean checkShuffleSizeForLargeTable(JoinOperator joinOp, int position,
+ OptimizeTezProcContext context) {
+ long max = HiveConf.getLongVar(context.parseContext.getConf(),
+ HiveConf.ConfVars.HIVECONVERTJOINMAXSHUFFLESIZE);
+ if (max < 1) {
+ // Max is disabled, we can safely return true
+ return true;
+ }
+ // Evaluate
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) joinOp.getParentOperators().get(position);
+ Statistics inputStats = rsOp.getStatistics();
+ long inputSize = inputStats.getDataSize();
+ LOG.debug("Estimated size for input {}: {}; Max size for DPHJ conversion: {}",
+ position, inputSize, max);
+ if (inputSize > max) {
+ LOG.debug("Size of input is greater than the max; "
+ + "we do not convert to DPHJ");
+ return false;
+ }
+ return true;
+ }
+
private static long estimateNDV(long numRows, List<ColStatistics> columnStats) {
// If there is a single column, return the number of distinct values
if (columnStats.size() == 1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/test/queries/clientpositive/join_max_hashtable.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/join_max_hashtable.q b/ql/src/test/queries/clientpositive/join_max_hashtable.q
index 9c30a0d..8d0ccb7 100644
--- a/ql/src/test/queries/clientpositive/join_max_hashtable.q
+++ b/ql/src/test/queries/clientpositive/join_max_hashtable.q
@@ -1,6 +1,7 @@
set hive.auto.convert.join=true;
set hive.optimize.dynamic.partition.hashjoin=true;
set hive.auto.convert.join.hashtable.max.entries=500;
+set hive.auto.convert.join.shuffle.max.size=100000;
-- CONVERT
EXPLAIN
@@ -35,3 +36,15 @@ FROM src x JOIN src y ON (x.key = y.key);
EXPLAIN
SELECT x.key, x.value
FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);
+
+set hive.auto.convert.join.shuffle.max.size=80000;
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key);
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);
http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
index 63f5d28..ef1a6f3 100644
--- a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
+++ b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
@@ -498,3 +498,157 @@ STAGE PLANS:
Processor Tree:
ListSink
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: x
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col1
+ input vertices:
+ 1 Map 2
+ Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: llap
+ LLAP IO: no inputs
+ Map 2
+ Map Operator Tree:
+ TableScan
+ alias: y
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Execution mode: llap
+ LLAP IO: no inputs
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: x
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: (key is not null and value is not null) (type: boolean)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string), _col1 (type: string)
+ 1 _col0 (type: string), _col1 (type: string)
+ outputColumnNames: _col0, _col1
+ input vertices:
+ 1 Map 2
+ Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: llap
+ LLAP IO: no inputs
+ Map 2
+ Map Operator Tree:
+ TableScan
+ alias: y
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: (key is not null and value is not null) (type: boolean)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Execution mode: llap
+ LLAP IO: no inputs
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+