You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/08/01 18:52:26 UTC

svn commit: r1368119 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/test/queries/clientnegative/ ql/src/test/results/clientnegative/

Author: kevinwilfong
Date: Wed Aug  1 16:52:25 2012
New Revision: 1368119

URL: http://svn.apache.org/viewvc?rev=1368119&view=rev
Log:
HIVE-3289. sort merge join may not work silently. (njain via kevinwilfong)

Added:
    hive/trunk/ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q
    hive/trunk/ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1368119&r1=1368118&r2=1368119&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug  1 16:52:25 2012
@@ -465,6 +465,7 @@ public class HiveConf extends Configurat
     HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
     HIVEENFORCESORTING("hive.enforce.sorting", false),
     HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+    HIVEENFORCESORTMERGEBUCKETMAPJOIN("hive.enforce.sortmergebucketmapjoin", false),
 
     HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
     HIVEROWOFFSET("hive.exec.rowoffset", false),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1368119&r1=1368118&r2=1368119&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Aug  1 16:52:25 2012
@@ -758,6 +758,14 @@
 </property>
 
 <property>
+  <name>hive.enforce.sortmergebucketmapjoin</name>
+  <value>false</value>
+  <description>If the user asked for sort-merge bucketed map-side join, and it cannot be performed,
+    should the query fail or not ?
+  </description>
+</property>
+
+<property>
   <name>hive.metastore.ds.connection.url.hook</name>
   <value></value>
   <description>Name of the hook to use for retriving the JDO connection URL. If empty, the value in javax.jdo.option.ConnectionURL is used </description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1368119&r1=1368118&r2=1368119&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Wed Aug  1 16:52:25 2012
@@ -223,6 +223,11 @@ public enum ErrorMsg {
   ALTER_COMMAND_FOR_TABLES(10132, "To alter a base table you need to use the ALTER TABLE command."),
   ALTER_VIEW_DISALLOWED_OP(10133, "Cannot use this form of ALTER on a view"),
   ALTER_TABLE_NON_NATIVE(10134, "ALTER TABLE cannot be used for a non-native table"),
+  SORTMERGE_MAPJOIN_FAILED(10135,
+      "Sort merge bucketed join could not be performed. " +
+      "If you really want to perform the operation, either set " +
+      "hive.optimize.bucketmapjoin.sortedmerge=false, or set " +
+      "hive.enforce.sortmergebucketmapjoin=false."),
 
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1368119&r1=1368118&r2=1368119&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Wed Aug  1 16:52:25 2012
@@ -28,7 +28,9 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +106,7 @@ public class SortedMergeBucketMapJoinOpt
   }
 
   class SortedMergeBucketMapjoinProc implements NodeProcessor {
-    ParseContext pGraphContext;
+    private ParseContext pGraphContext;
 
     public SortedMergeBucketMapjoinProc(ParseContext pctx) {
       this.pGraphContext = pctx;
@@ -113,23 +115,24 @@ public class SortedMergeBucketMapJoinOpt
     public SortedMergeBucketMapjoinProc() {
     }
 
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+    // Return true or false based on whether the mapjoin was converted successfully to
+    // a sort-merge map join operator.
+    private boolean convertSMBJoin(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
       if (nd instanceof SMBMapJoinOperator) {
-        return null;
+        return false;
       }
       MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
       if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null
           || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) {
-        return null;
+        return false;
       }
 
       boolean tableSorted = true;
       QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext()
           .get(mapJoinOp);
       if (joinCxt == null) {
-        return null;
+        return false;
       }
       String[] srcs = joinCxt.getBaseSrc();
       int pos = 0;
@@ -142,11 +145,26 @@ public class SortedMergeBucketMapJoinOpt
         //this is a mapjoin but not suit for a sort merge bucket map join. check outer joins
         MapJoinProcessor.checkMapJoin(((MapJoinOperator) nd).getConf().getPosBigTable(),
             ((MapJoinOperator) nd).getConf().getConds());
-        return null;
+        return false;
       }
       // convert a bucket map join operator to a sorted merge bucket map join
       // operator
       convertToSMBJoin(mapJoinOp, srcs);
+      return true;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      boolean convert = convertSMBJoin(nd, stack, procCtx, nodeOutputs);
+      // Throw an error if the user asked for sort merge bucketed mapjoin to be enforced
+      // and sort merge bucketed mapjoin cannot be performed
+      if (!convert &&
+        pGraphContext.getConf().getBoolVar(
+          HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) {
+        throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg());
+      }
+
       return null;
     }
 

Added: hive/trunk/ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q?rev=1368119&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q Wed Aug  1 16:52:25 2012
@@ -0,0 +1,28 @@
+create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc) 
+INTO 1 BUCKETS STORED AS RCFILE; 
+create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc) 
+INTO 1 BUCKETS STORED AS RCFILE;
+
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+
+insert overwrite table table_asc select key, value from src; 
+insert overwrite table table_desc select key, value from src;
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+
+-- If the user asked for sort merge join to be enforced (by setting
+-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since
+-- one of the tables is in ascending order and the other is in descending order,
+-- and sort merge bucket mapjoin cannot be performed. In the default mode, the
+-- query would succeed, although a regular map-join would be performed instead of
+-- what the user asked.
+
+explain 
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key;
+
+set hive.enforce.sortmergebucketmapjoin=true;
+
+explain 
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key;

Added: hive/trunk/ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out?rev=1368119&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out (added)
+++ hive/trunk/ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out Wed Aug  1 16:52:25 2012
@@ -0,0 +1,144 @@
+PREHOOK: query: create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc) 
+INTO 1 BUCKETS STORED AS RCFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc) 
+INTO 1 BUCKETS STORED AS RCFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@table_asc
+PREHOOK: query: create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc) 
+INTO 1 BUCKETS STORED AS RCFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc) 
+INTO 1 BUCKETS STORED AS RCFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@table_desc
+PREHOOK: query: insert overwrite table table_asc select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@table_asc
+POSTHOOK: query: insert overwrite table table_asc select key, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@table_asc
+POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table table_desc select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@table_desc
+POSTHOOK: query: insert overwrite table table_desc select key, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@table_desc
+POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: -- If the user asked for sort merge join to be enforced (by setting
+-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since
+-- one of the tables is in ascending order and the other is in descending order,
+-- and sort merge bucket mapjoin cannot be performed. In the default mode, the
+-- query would succeed, although a regular map-join would be performed instead of
+-- what the user asked.
+
+explain 
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: -- If the user asked for sort merge join to be enforced (by setting
+-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since
+-- one of the tables is in ascending order and the other is in descending order,
+-- and sort merge bucket mapjoin cannot be performed. In the default mode, the
+-- query would succeed, although a regular map-join would be performed instead of
+-- what the user asked.
+
+explain 
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_asc) a) (TOK_TABREF (TOK_TABNAME table_desc) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+  Stage-3 is a root stage
+  Stage-1 depends on stages: Stage-3
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-3
+    Map Reduce Local Work
+      Alias -> Map Local Tables:
+        a 
+          Fetch Operator
+            limit: -1
+      Alias -> Map Local Operator Tree:
+        a 
+          TableScan
+            alias: a
+            HashTable Sink Operator
+              condition expressions:
+                0 {key} {value}
+                1 {key} {value}
+              handleSkewJoin: false
+              keys:
+                0 [Column[key]]
+                1 [Column[key]]
+              Position of Big Table: 1
+
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        b 
+          TableScan
+            alias: b
+            Map Join Operator
+              condition map:
+                   Inner Join 0 to 1
+              condition expressions:
+                0 {key} {value}
+                1 {key} {value}
+              handleSkewJoin: false
+              keys:
+                0 [Column[key]]
+                1 [Column[key]]
+              outputColumnNames: _col0, _col1, _col4, _col5
+              Position of Big Table: 1
+              Select Operator
+                expressions:
+                      expr: _col0
+                      type: int
+                      expr: _col1
+                      type: string
+                      expr: _col4
+                      type: int
+                      expr: _col5
+                      type: string
+                outputColumnNames: _col0, _col1, _col4, _col5
+                Select Operator
+                  expressions:
+                        expr: _col0
+                        type: int
+                        expr: _col1
+                        type: string
+                        expr: _col4
+                        type: int
+                        expr: _col5
+                        type: string
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+      Local Work:
+        Map Reduce Local Work
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+FAILED: SemanticException [Error 10135]: Sort merge bucketed join could not be performed. If you really want to perform the operation, either set hive.optimize.bucketmapjoin.sortedmerge=false, or set hive.enforce.sortmergebucketmapjoin=false.