You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2013/04/04 05:49:52 UTC

svn commit: r1464277 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/test/queries/clientpos...

Author: namit
Date: Thu Apr  4 03:49:52 2013
New Revision: 1464277

URL: http://svn.apache.org/r1464277
Log:
HIVE-4281 add hive.map.groupby.sorted.testmode
(Namit via Gang Tim Liu)


Added:
    hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_test_1.q
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_test_1.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1464277&r1=1464276&r2=1464277&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Apr  4 03:49:52 2013
@@ -423,6 +423,7 @@ public class HiveConf extends Configurat
     HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
     HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true),
     HIVE_MAP_GROUPBY_SORT("hive.map.groupby.sorted", false),
+    HIVE_MAP_GROUPBY_SORT_TESTMODE("hive.map.groupby.sorted.testmode", false),
     HIVE_GROUPBY_ORDERBY_POSITION_ALIAS("hive.groupby.orderby.position.alias", false),
     HIVE_NEW_JOB_GROUPING_SET_CARDINALITY("hive.new.job.grouping.set.cardinality", 30),
 
@@ -765,7 +766,7 @@ public class HiveConf extends Configurat
     // ptf partition constants
     HIVE_PTF_PARTITION_PERSISTENCE_CLASS("hive.ptf.partition.persistence",
       "org.apache.hadoop.hive.ql.exec.PTFPersistence$PartitionedByteBasedList"),
-    HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize", 
+    HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize",
       (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB
     ;
 

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1464277&r1=1464276&r2=1464277&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Apr  4 03:49:52 2013
@@ -534,6 +534,15 @@
 </property>
 
 <property>
+  <name>hive.map.groupby.sorted.testmode</name>
+  <value>false</value>
+  <description>If the bucketing/sorting properties of the table exactly match the grouping key, whether to
+    perform the group by in the mapper by using BucketizedHiveInputFormat. If the test mode is set, the plan
+    is not converted, but a query property is set to denote the same.
+  </description>
+</property>
+
+<property>
   <name>hive.new.job.grouping.set.cardinality</name>
   <value>30</value>
   <description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1464277&r1=1464276&r2=1464277&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Thu Apr  4 03:49:52 2013
@@ -46,6 +46,7 @@ public class QueryProperties {
   boolean hasDistributeBy = false;
   boolean hasClusterBy = false;
   boolean mapJoinRemoved = false;
+  boolean hasMapGroupBy = false;
 
   public boolean hasJoin() {
     return hasJoin;
@@ -134,4 +135,12 @@ public class QueryProperties {
   public void setMapJoinRemoved(boolean mapJoinRemoved) {
     this.mapJoinRemoved = mapJoinRemoved;
   }
+
+  public boolean isHasMapGroupBy() {
+    return hasMapGroupBy;
+  }
+
+  public void setHasMapGroupBy(boolean hasMapGroupBy) {
+    this.hasMapGroupBy = hasMapGroupBy;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1464277&r1=1464276&r2=1464277&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Thu Apr  4 03:49:52 2013
@@ -178,7 +178,7 @@ public class GroupByOptimizer implements
       // Dont remove the operator for distincts
       if (useMapperSort && !groupByOp.getConf().isDistinct() &&
           (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
-        convertGroupByMapSideSortedGroupBy(groupByOp, depth);
+        convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
       }
       else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
           (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
@@ -455,7 +455,14 @@ public class GroupByOptimizer implements
 
     // Convert the group by to a map-side group by
     // The operators specified by depth and removed from the tree.
-    protected void convertGroupByMapSideSortedGroupBy(GroupByOperator groupByOp, int depth) {
+    protected void convertGroupByMapSideSortedGroupBy(
+        HiveConf conf, GroupByOperator groupByOp, int depth) {
+      // In test mode, dont change the query plan. However, setup a query property
+      pGraphContext.getQueryProperties().setHasMapGroupBy(true);
+      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) {
+        return;
+      }
+
       if (groupByOp.removeChildren(depth)) {
         // Use bucketized hive input format - that makes sure that one mapper reads the entire file
         groupByOp.setUseBucketizedHiveInputFormat(true);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1464277&r1=1464276&r2=1464277&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Thu Apr  4 03:49:52 2013
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -113,6 +114,7 @@ public class ParseContext {
   private List<Task<? extends Serializable>> rootTasks;
 
   private FetchTask fetchTask;
+  private QueryProperties queryProperties;
 
   public ParseContext() {
   }
@@ -180,7 +182,8 @@ public class ParseContext {
       HashSet<ReadEntity> semanticInputs, List<Task<? extends Serializable>> rootTasks,
       Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner,
       Map<String, ReadEntity> viewAliasToInput,
-      List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting) {
+      List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting,
+      QueryProperties queryProperties) {
     this.conf = conf;
     this.qb = qb;
     this.ast = ast;
@@ -212,6 +215,7 @@ public class ParseContext {
     this.viewAliasToInput = viewAliasToInput;
     this.reduceSinkOperatorsAddedByEnforceBucketingSorting =
         reduceSinkOperatorsAddedByEnforceBucketingSorting;
+    this.queryProperties = queryProperties;
   }
 
   /**
@@ -623,4 +627,12 @@ public class ParseContext {
   public Map<String, ReadEntity> getViewAliasToInput() {
     return viewAliasToInput;
   }
+
+  public QueryProperties getQueryProperties() {
+    return queryProperties;
+  }
+
+  public void setQueryProperties(QueryProperties queryProperties) {
+    this.queryProperties = queryProperties;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1464277&r1=1464276&r2=1464277&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Apr  4 03:49:52 2013
@@ -352,7 +352,8 @@ public class SemanticAnalyzer extends Ba
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
         opToPartToSkewedPruner, viewAliasToInput,
-        reduceSinkOperatorsAddedByEnforceBucketingSorting);
+        reduceSinkOperatorsAddedByEnforceBucketingSorting,
+        queryProperties);
   }
 
   @SuppressWarnings("nls")
@@ -8692,7 +8693,7 @@ public class SemanticAnalyzer extends Ba
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
         opToPartToSkewedPruner, viewAliasToInput,
-        reduceSinkOperatorsAddedByEnforceBucketingSorting);
+        reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
 
     // Generate table access stats if required
     if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_test_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_test_1.q?rev=1464277&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_test_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_test_1.q Thu Apr  4 03:49:52 2013
@@ -0,0 +1,21 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+set hive.map.groupby.sorted.testmode=true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1;
+
+CREATE TABLE outputTbl1(key int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key
+-- matches the sorted key. However, in test mode, the group by wont be converted.
+EXPLAIN
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;

Added: hive/trunk/ql/src/test/results/clientpositive/groupby_sort_test_1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_sort_test_1.q.out?rev=1464277&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby_sort_test_1.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby_sort_test_1.q.out Thu Apr  4 03:49:52 2013
@@ -0,0 +1,127 @@
+PREHOOK: query: CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1
+PREHOOK: type: LOAD
+PREHOOK: Output: default@t1
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@t1
+PREHOOK: query: -- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: -- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
+PREHOOK: query: CREATE TABLE outputTbl1(key int, cnt int)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE outputTbl1(key int, cnt int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@outputTbl1
+POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
+PREHOOK: query: -- The plan should be converted to a map-side group by if the group by key
+-- matches the sorted key. However, in test mode, the group by wont be converted.
+EXPLAIN
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key
+PREHOOK: type: QUERY
+POSTHOOK: query: -- The plan should be converted to a map-side group by if the group by key
+-- matches the sorted key. However, in test mode, the group by wont be converted.
+EXPLAIN
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME outputTbl1))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL key))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        t1 
+          TableScan
+            alias: t1
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+              outputColumnNames: key
+              Group By Operator
+                aggregations:
+                      expr: count(1)
+                bucketGroup: false
+                keys:
+                      expr: key
+                      type: string
+                mode: hash
+                outputColumnNames: _col0, _col1
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: string
+                  sort order: +
+                  Map-reduce partition columns:
+                        expr: _col0
+                        type: string
+                  tag: -1
+                  value expressions:
+                        expr: _col1
+                        type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: string
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Select Operator
+            expressions:
+                  expr: UDFToInteger(_col0)
+                  type: int
+                  expr: UDFToInteger(_col1)
+                  type: int
+            outputColumnNames: _col0, _col1
+            File Output Operator
+              compressed: false
+              GlobalTableId: 1
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  name: default.outputtbl1
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.outputtbl1
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+