You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/11/17 17:05:48 UTC

[08/18] hive git commit: HIVE-12391: SkewJoinOptimizer might not kick in if columns are renamed after TableScanOperator (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-12391: SkewJoinOptimizer might not kick in if columns are renamed after TableScanOperator (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d2fd0060
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d2fd0060
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d2fd0060

Branch: refs/heads/spark
Commit: d2fd0060ecf343b7bbd0f2a1da92527242af9a8e
Parents: 55cb43d
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Nov 12 15:31:31 2015 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Fri Nov 13 09:47:37 2015 +0100

----------------------------------------------------------------------
 .../hive/ql/optimizer/SkewJoinOptimizer.java    |  27 ++-
 .../test/queries/clientpositive/skewjoinopt21.q |  30 +++
 .../results/clientpositive/skewjoinopt21.q.out  | 230 +++++++++++++++++++
 3 files changed, 277 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d2fd0060/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
index e8c7486..64dc48c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
@@ -28,8 +28,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Stack;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -49,12 +47,12 @@ import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -70,6 +68,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * SkewJoinOptimizer.
@@ -283,10 +283,11 @@ public class SkewJoinOptimizer implements Transform {
      * @param op The join operator being optimized
      * @param tableScanOpsForJoin table scan operators which are parents of the join operator
      * @return map<join keys intersection skewedkeys, list of skewed values>.
+     * @throws SemanticException 
      */
     private Map<List<ExprNodeDesc>, List<List<String>>>
       getSkewedValues(
-        Operator<? extends OperatorDesc> op, List<TableScanOperator> tableScanOpsForJoin) {
+        Operator<? extends OperatorDesc> op, List<TableScanOperator> tableScanOpsForJoin) throws SemanticException {
 
       Map <List<ExprNodeDesc>, List<List<String>>> skewDataReturn =
         new HashMap<List<ExprNodeDesc>, List<List<String>>>();
@@ -299,6 +300,7 @@ public class SkewJoinOptimizer implements Transform {
         ReduceSinkDesc rsDesc = ((ReduceSinkOperator) reduceSinkOp).getConf();
 
         if (rsDesc.getKeyCols() != null) {
+          TableScanOperator tableScanOp = null;
           Table table = null;
           // Find the skew information corresponding to the table
           List<String> skewedColumns = null;
@@ -321,7 +323,9 @@ public class SkewJoinOptimizer implements Transform {
             if (keyColDesc instanceof ExprNodeColumnDesc) {
               keyCol = (ExprNodeColumnDesc) keyColDesc;
               if (table == null) {
-                table = getTable(parseContext, reduceSinkOp, tableScanOpsForJoin);
+                tableScanOp = getTableScanOperator(parseContext, reduceSinkOp, tableScanOpsForJoin);
+                table =
+                  tableScanOp == null ? null : tableScanOp.getConf().getTableMetadata();
                 skewedColumns =
                   table == null ? null : table.getSkewedColNames();
                 // No skew on the table to take care of
@@ -332,10 +336,13 @@ public class SkewJoinOptimizer implements Transform {
                 skewedValueList =
                   table == null ? null : table.getSkewedColValues();
               }
-              int pos = skewedColumns.indexOf(keyCol.getColumn());
+              ExprNodeDesc keyColOrigin = ExprNodeDescUtils.backtrack(keyCol,
+                      reduceSinkOp, tableScanOp);
+              int pos = keyColOrigin == null || !(keyColOrigin instanceof ExprNodeColumnDesc) ?
+                      -1 : skewedColumns.indexOf(((ExprNodeColumnDesc)keyColOrigin).getColumn());
               if ((pos >= 0) && (!positionSkewedKeys.contains(pos))) {
                 positionSkewedKeys.add(pos);
-                ExprNodeColumnDesc keyColClone = (ExprNodeColumnDesc) keyCol.clone();
+                ExprNodeColumnDesc keyColClone = (ExprNodeColumnDesc) keyColOrigin.clone();
                 keyColClone.setTabAlias(null);
                 joinKeysSkewedCols.add(new ExprNodeDescEqualityWrapper(keyColClone));
               }
@@ -386,9 +393,9 @@ public class SkewJoinOptimizer implements Transform {
     }
 
     /**
-     * Get the table alias from the candidate table scans.
+     * Get the table scan.
      */
-    private Table getTable(
+    private TableScanOperator getTableScanOperator(
       ParseContext parseContext,
       Operator<? extends OperatorDesc> op,
       List<TableScanOperator> tableScanOpsForJoin) {
@@ -396,7 +403,7 @@ public class SkewJoinOptimizer implements Transform {
         if (op instanceof TableScanOperator) {
           TableScanOperator tsOp = (TableScanOperator)op;
           if (tableScanOpsForJoin.contains(tsOp)) {
-            return tsOp.getConf().getTableMetadata();
+            return tsOp;
           }
         }
         if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) || 

http://git-wip-us.apache.org/repos/asf/hive/blob/d2fd0060/ql/src/test/queries/clientpositive/skewjoinopt21.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/skewjoinopt21.q b/ql/src/test/queries/clientpositive/skewjoinopt21.q
new file mode 100644
index 0000000..76dde57
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/skewjoinopt21.q
@@ -0,0 +1,30 @@
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple join query with skew on both the tables on the join key
+-- adding a order by at the end to make the results deterministic
+
+EXPLAIN
+SELECT a.*, b.*
+FROM 
+  (SELECT key as k, val as v FROM T1) a
+  JOIN
+  (SELECT key as k, val as v FROM T2) b
+ON a.k = b.k;
+
+SELECT a.*, b.*
+FROM 
+  (SELECT key as k, val as v FROM T1) a
+  JOIN
+  (SELECT key as k, val as v FROM T2) b
+ON a.k = b.k
+ORDER BY a.k, b.k, a.v, b.v;

http://git-wip-us.apache.org/repos/asf/hive/blob/d2fd0060/ql/src/test/results/clientpositive/skewjoinopt21.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/skewjoinopt21.q.out b/ql/src/test/results/clientpositive/skewjoinopt21.q.out
new file mode 100644
index 0000000..d58d694
--- /dev/null
+++ b/ql/src/test/results/clientpositive/skewjoinopt21.q.out
@@ -0,0 +1,230 @@
+PREHOOK: query: CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T1
+POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T2
+POSTHOOK: query: CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T2
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t2
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t2
+PREHOOK: query: -- a simple join query with skew on both the tables on the join key
+-- adding a order by at the end to make the results deterministic
+
+EXPLAIN
+SELECT a.*, b.*
+FROM 
+  (SELECT key as k, val as v FROM T1) a
+  JOIN
+  (SELECT key as k, val as v FROM T2) b
+ON a.k = b.k
+PREHOOK: type: QUERY
+POSTHOOK: query: -- a simple join query with skew on both the tables on the join key
+-- adding a order by at the end to make the results deterministic
+
+EXPLAIN
+SELECT a.*, b.*
+FROM 
+  (SELECT key as k, val as v FROM T1) a
+  JOIN
+  (SELECT key as k, val as v FROM T2) b
+ON a.k = b.k
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1, Stage-4
+  Stage-4 is a root stage
+  Stage-0 depends on stages: Stage-2
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: t1
+            Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (key is not null and ((key = '2') or (key = '3'))) (type: boolean)
+              Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), val (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: string)
+          TableScan
+            alias: t2
+            Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (key is not null and ((key = '2') or (key = '3'))) (type: boolean)
+              Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), val (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: string)
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          keys:
+            0 _col0 (type: string)
+            1 _col0 (type: string)
+          outputColumnNames: _col0, _col1, _col2, _col3
+          Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE
+          Select Operator
+            expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+            outputColumnNames: _col0, _col1, _col2, _col3
+            Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-2
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Union
+              Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+          TableScan
+            Union
+              Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-4
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: t1
+            Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (key is not null and (not ((key = '2') or (key = '3')))) (type: boolean)
+              Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), val (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: string)
+          TableScan
+            alias: t2
+            Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (key is not null and (not ((key = '2') or (key = '3')))) (type: boolean)
+              Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), val (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: string)
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          outputColumnNames: _col0, _col1, _col2, _col3
+          Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE
+          Select Operator
+            expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+            outputColumnNames: _col0, _col1, _col2, _col3
+            Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT a.*, b.*
+FROM 
+  (SELECT key as k, val as v FROM T1) a
+  JOIN
+  (SELECT key as k, val as v FROM T2) b
+ON a.k = b.k
+ORDER BY a.k, b.k, a.v, b.v
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Input: default@t2
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a.*, b.*
+FROM 
+  (SELECT key as k, val as v FROM T1) a
+  JOIN
+  (SELECT key as k, val as v FROM T2) b
+ON a.k = b.k
+ORDER BY a.k, b.k, a.v, b.v
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Input: default@t2
+#### A masked pattern was here ####
+2	12	2	22
+3	13	3	13
+8	18	8	18
+8	18	8	18
+8	28	8	18
+8	28	8	18