You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/08/30 15:54:10 UTC

hive git commit: HIVE-17276: Check max shuffle size when converting to dynamically partitioned hash join (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master dd04a92f7 -> b67d52c29


HIVE-17276: Check max shuffle size when converting to dynamically partitioned hash join (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b67d52c2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b67d52c2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b67d52c2

Branch: refs/heads/master
Commit: b67d52c294c1c2db21ed89c86486c946aa5d3ca4
Parents: dd04a92
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Tue Aug 29 10:36:29 2017 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Aug 30 08:29:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  55 +++++--
 .../queries/clientpositive/join_max_hashtable.q |  13 ++
 .../llap/join_max_hashtable.q.out               | 154 +++++++++++++++++++
 4 files changed, 217 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0d8d7ae..e4b09a2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1435,9 +1435,14 @@ public class HiveConf extends Configuration {
 
     HIVECONVERTJOINMAXENTRIESHASHTABLE("hive.auto.convert.join.hashtable.max.entries", 40000000L,
         "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" +
-        "However, if it is on, and the predicated number of entries in hashtable for a given join \n" +
+        "However, if it is on, and the predicted number of entries in hashtable for a given join \n" +
         "input is larger than this number, the join will not be converted to a mapjoin. \n" +
         "The value \"-1\" means no limit."),
+    HIVECONVERTJOINMAXSHUFFLESIZE("hive.auto.convert.join.shuffle.max.size", 10000000L,
+       "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" +
+       "However, if it is on, and the predicted size of the larger input for a given join is greater \n" +
+       "than this number, the join will not be converted to a dynamically partitioned hash join. \n" +
+       "The value \"-1\" means no limit."),
     HIVEHASHTABLEKEYCOUNTADJUSTMENT("hive.hashtable.key.count.adjustment", 1.0f,
         "Adjustment to mapjoin hashtable size derived from table and column statistics; the estimate" +
         " of the number of keys is divided by this value. If the value is 0, statistics are not used" +

http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 21d0053..a2414f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -579,12 +579,12 @@ public class ConvertJoinMapJoin implements NodeProcessor {
    * for Dynamic Hash Join conversion consideration
    * @param skipJoinTypeChecks whether to skip join type checking
    * @param maxSize size threshold for Map Join conversion
-   * @param checkHashTableEntries whether to check threshold for distinct keys in hash table for Map Join
+   * @param checkMapJoinThresholds whether to check thresholds to convert to Map Join
    * @return returns big table position or -1 if it cannot be determined
    * @throws SemanticException
    */
   public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
-      int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkHashTableEntries)
+      int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkMapJoinThresholds)
               throws SemanticException {
     if (!skipJoinTypeChecks) {
       /*
@@ -634,6 +634,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // total size of the inputs
     long totalSize = 0;
 
+    // convert to DPHJ
+    boolean convertDPHJ = false;
+
     for (int pos = 0; pos < joinOp.getParentOperators().size(); pos++) {
       Operator<? extends OperatorDesc> parentOp = joinOp.getParentOperators().get(pos);
 
@@ -693,19 +696,19 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         // We are replacing the current big table with a new one, thus
         // we need to count the current one as a map table then.
         totalSize += bigInputStat.getDataSize();
-        // Check if number of distinct keys is larger than given max
-        // number of entries for HashMap. If it is, we do not convert.
-        if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) {
-          return -1;
+        // Check if number of distinct keys is greater than given max number of entries
+        // for HashMap
+        if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) {
+          convertDPHJ = true;
         }
       } else if (!selectedBigTable) {
         // This is not the first table and we are not using it as big table,
         // in fact, we're adding this table as a map table
         totalSize += inputSize;
-        // Check if number of distinct keys is larger than given max
-        // number of entries for HashMap. If it is, we do not convert.
-        if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) {
-          return -1;
+        // Check if number of distinct keys is greater than given max number of entries
+        // for HashMap
+        if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) {
+          convertDPHJ = true;
         }
       }
 
@@ -723,6 +726,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     }
 
+    // Check if size of data to shuffle (larger table) is less than given max size
+    if (checkMapJoinThresholds && convertDPHJ
+            && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) {
+      LOG.debug("Conditions to convert to MapJoin are not met");
+      return -1;
+    }
+
     // We store the total memory that this MapJoin is going to use,
     // which is calculated as totalSize/buckets, with totalSize
     // equal to sum of small tables size.
@@ -1087,13 +1097,36 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     if (estimation > max) {
       // Estimation larger than max
       LOG.debug("Number of different entries for HashTable is greater than the max; "
-          + "we do not converting to MapJoin");
+          + "we do not convert to MapJoin");
       return false;
     }
     // We can proceed with the conversion
     return true;
   }
 
+  /* Returns true if it passes the test, false otherwise. */
+  private boolean checkShuffleSizeForLargeTable(JoinOperator joinOp, int position,
+          OptimizeTezProcContext context) {
+    long max = HiveConf.getLongVar(context.parseContext.getConf(),
+            HiveConf.ConfVars.HIVECONVERTJOINMAXSHUFFLESIZE);
+    if (max < 1) {
+      // Max is disabled, we can safely return true
+      return true;
+    }
+    // Evaluate
+    ReduceSinkOperator rsOp = (ReduceSinkOperator) joinOp.getParentOperators().get(position);
+    Statistics inputStats = rsOp.getStatistics();
+    long inputSize = inputStats.getDataSize();
+    LOG.debug("Estimated size for input {}: {}; Max size for DPHJ conversion: {}",
+        position, inputSize, max);
+    if (inputSize > max) {
+      LOG.debug("Size of input is greater than the max; "
+          + "we do not convert to DPHJ");
+      return false;
+    }
+    return true;
+  }
+
   private static long estimateNDV(long numRows, List<ColStatistics> columnStats) {
     // If there is a single column, return the number of distinct values
     if (columnStats.size() == 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/test/queries/clientpositive/join_max_hashtable.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/join_max_hashtable.q b/ql/src/test/queries/clientpositive/join_max_hashtable.q
index 9c30a0d..8d0ccb7 100644
--- a/ql/src/test/queries/clientpositive/join_max_hashtable.q
+++ b/ql/src/test/queries/clientpositive/join_max_hashtable.q
@@ -1,6 +1,7 @@
 set hive.auto.convert.join=true;
 set hive.optimize.dynamic.partition.hashjoin=true;
 set hive.auto.convert.join.hashtable.max.entries=500;
+set hive.auto.convert.join.shuffle.max.size=100000;
 
 -- CONVERT
 EXPLAIN
@@ -35,3 +36,15 @@ FROM src x JOIN src y ON (x.key = y.key);
 EXPLAIN
 SELECT x.key, x.value
 FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);
+
+set hive.auto.convert.join.shuffle.max.size=80000;
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key);
+
+-- CONVERT
+EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value);

http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
index 63f5d28..ef1a6f3 100644
--- a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
+++ b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out
@@ -498,3 +498,157 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT x.key, x.value
+FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Map 2 (BROADCAST_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string), _col1 (type: string)
+                          1 _col0 (type: string), _col1 (type: string)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: y
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+