You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/12 02:14:04 UTC

svn commit: r1586801 [1/7] - in /hive/trunk: data/conf/tez/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/ ql/src/test/results/clientpositive/tez/

Author: sershe
Date: Sat Apr 12 00:14:03 2014
New Revision: 1586801

URL: http://svn.apache.org/r1586801
Log:
HIVE-6883 : Dynamic partitioning optimization does not honor sort order or order by (Prasanth J, reviewed by Vikram Dixit K)

Modified:
    hive/trunk/data/conf/tez/hive-site.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
    hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q
    hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out
    hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_optimization.q.out
    hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_analyze.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_analyze.q.out

Modified: hive/trunk/data/conf/tez/hive-site.xml
URL: http://svn.apache.org/viewvc/hive/trunk/data/conf/tez/hive-site.xml?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
Binary files - no diff available.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Sat Apr 12 00:14:03 2014
@@ -349,6 +349,15 @@ public class SortedDynPartitionOptimizer
       }
       newSortOrder.addAll(sortOrder);
 
+      String orderStr = "";
+      for (Integer i : newSortOrder) {
+        if(i.intValue() == 1) {
+          orderStr += "+";
+        } else {
+          orderStr += "-";
+        }
+      }
+
       ArrayList<ExprNodeDesc> newPartCols = Lists.newArrayList();
 
       // we will clone here as RS will update bucket column key with its
@@ -366,9 +375,16 @@ public class SortedDynPartitionOptimizer
         newPartCols.add(newValueCols.get(idx).clone());
       }
 
-      String orderStr = "";
-      for (int i = 0; i < newKeyCols.size(); i++) {
-        orderStr += "+";
+      // in the absence of SORTED BY clause, the sorted dynamic partition insert
+      // should honor the ordering of records provided by ORDER BY in SELECT statement
+      ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent,
+          ReduceSinkOperator.class);
+      if (parentRSOp != null) {
+        String parentRSOpOrder = parentRSOp.getConf().getOrder();
+        if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) {
+          newKeyCols.addAll(parentRSOp.getConf().getKeyCols());
+          orderStr += parentRSOpOrder;
+        }
       }
 
       // Create Key/Value TableDesc. When the operator plan is split into MR tasks,
@@ -389,13 +405,9 @@ public class SortedDynPartitionOptimizer
           outValColNames, 0, "");
       TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
       List<List<Integer>> distinctColumnIndices = Lists.newArrayList();
-      int numDistributionKeys = newPartCols.size();
-      if (bucketColumns != null && !bucketColumns.isEmpty()) {
-        numDistributionKeys += 1;
-      }
 
       // Number of reducers is set to default (-1)
-      ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, numDistributionKeys, newValueCols,
+      ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, newKeyCols.size(), newValueCols,
           outputKeyCols, distinctColumnIndices, outValColNames, -1, newPartCols, -1, keyTable,
           valueTable);
       rsConf.setBucketCols(bucketColumns);

Modified: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q Sat Apr 12 00:14:03 2014
@@ -56,12 +56,12 @@ create table over1k_part_buck_sort_orc(
        sorted by (f) into 4 buckets stored as orc;
 
 -- map-only jobs converted to map-reduce job by hive.optimize.sort.dynamic.partition optimization
-explain insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
 explain insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
 explain insert overwrite table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
 explain insert overwrite table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
 
-insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
 insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
 insert overwrite table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
 insert overwrite table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
@@ -70,12 +70,12 @@ set hive.enforce.bucketing=true;
 set hive.enforce.sorting=true;
 
 -- map-reduce jobs modified by hive.optimize.sort.dynamic.partition optimization
-explain insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
 explain insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
 explain insert into table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
 explain insert into table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
 
-insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
 insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
 insert into table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
 insert into table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
@@ -93,3 +93,69 @@ select count(*) from over1k_part_orc;
 select count(*) from over1k_part_limit_orc;
 select count(*) from over1k_part_buck_orc;
 select count(*) from over1k_part_buck_sort_orc;
+
+-- tests for HIVE-6883
+create table over1k_part2_orc(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (ds string, t tinyint);
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+
+desc formatted over1k_part2_orc partition(ds="foo",t=27);
+desc formatted over1k_part2_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2_orc;
+select count(*) from over1k_part2_orc;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+
+desc formatted over1k_part2_orc partition(ds="foo",t=27);
+desc formatted over1k_part2_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2_orc;
+select count(*) from over1k_part2_orc;
+
+-- hadoop-1 does not honor number of reducers in local mode. There is always only 1 reducer irrespective of the number of buckets.
+-- Hence all records go to one bucket and all other buckets will be empty. Similar to HIVE-6867. However, hadoop-2 honors number
+-- of reducers and records are spread across all reducers. To avoid this inconsistency we will make number of buckets to 1 for this test.
+create table over1k_part_buck_sort2_orc(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (t tinyint)
+       clustered by (si)
+       sorted by (f) into 1 buckets;
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2_orc partition(t=27);
+desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2_orc;
+select count(*) from over1k_part_buck_sort2_orc;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2_orc partition(t=27);
+desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2_orc;
+select count(*) from over1k_part_buck_sort2_orc;

Modified: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q Sat Apr 12 00:14:03 2014
@@ -87,3 +87,69 @@ select count(*) from over1k_part;
 select count(*) from over1k_part_limit;
 select count(*) from over1k_part_buck;
 select count(*) from over1k_part_buck_sort;
+
+-- tests for HIVE-6883
+create table over1k_part2(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (ds string, t tinyint);
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+
+desc formatted over1k_part2 partition(ds="foo",t=27);
+desc formatted over1k_part2 partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2;
+select count(*) from over1k_part2;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+
+desc formatted over1k_part2 partition(ds="foo",t=27);
+desc formatted over1k_part2 partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2;
+select count(*) from over1k_part2;
+
+-- hadoop-1 does not honor number of reducers in local mode. There is always only 1 reducer irrespective of the number of buckets.
+-- Hence all records go to one bucket and all other buckets will be empty. Similar to HIVE-6867. However, hadoop-2 honors number
+-- of reducers and records are spread across all reducers. To avoid this inconsistency we will make number of buckets to 1 for this test.
+create table over1k_part_buck_sort2(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (t tinyint)
+       clustered by (si)
+       sorted by (f) into 1 buckets;
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2 partition(t=27);
+desc formatted over1k_part_buck_sort2 partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2;
+select count(*) from over1k_part_buck_sort2;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2 partition(t=27);
+desc formatted over1k_part_buck_sort2 partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2;
+select count(*) from over1k_part_buck_sort2;

Modified: hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q Sat Apr 12 00:14:03 2014
@@ -24,7 +24,7 @@ CREATE TABLE orc_create_people (
   state string)
 STORED AS orc;
 
-INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging;
+INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging ORDER BY id;
 
 set hive.stats.autogather = true;
 analyze table orc_create_people compute statistics partialscan;
@@ -44,7 +44,7 @@ CREATE TABLE orc_create_people (
   state string)
 STORED AS orc;
 
-INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging;
+INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging ORDER BY id;
 
 desc formatted orc_create_people;
 
@@ -64,7 +64,7 @@ PARTITIONED BY (state string)
 STORED AS orc;
 
 INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
-  SELECT * FROM orc_create_people_staging;
+  SELECT * FROM orc_create_people_staging ORDER BY id;
 
 set hive.stats.autogather = true;
 analyze table orc_create_people partition(state) compute statistics partialscan;
@@ -86,7 +86,7 @@ PARTITIONED BY (state string)
 STORED AS orc;
 
 INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
-  SELECT * FROM orc_create_people_staging;
+  SELECT * FROM orc_create_people_staging ORDER BY id;
 
 desc formatted orc_create_people partition(state="Ca");
 desc formatted orc_create_people partition(state="Or");
@@ -110,7 +110,7 @@ into 4 buckets
 STORED AS orc;
 
 INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
-  SELECT * FROM orc_create_people_staging;
+  SELECT * FROM orc_create_people_staging ORDER BY id;
 
 set hive.stats.autogather = true;
 analyze table orc_create_people partition(state) compute statistics partialscan;
@@ -135,7 +135,7 @@ into 4 buckets
 STORED AS orc;
 
 INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
-  SELECT * FROM orc_create_people_staging;
+  SELECT * FROM orc_create_people_staging ORDER BY id;
 
 desc formatted orc_create_people partition(state="Ca");
 desc formatted orc_create_people partition(state="Or");
@@ -157,7 +157,7 @@ PARTITIONED BY (state string)
 STORED AS orc;
 
 INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
-  SELECT * FROM orc_create_people_staging;
+  SELECT * FROM orc_create_people_staging ORDER BY id;
 
 -- set the table to text format
 ALTER TABLE orc_create_people SET SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe';