You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/12 02:14:04 UTC
svn commit: r1586801 [1/7] - in /hive/trunk: data/conf/tez/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
ql/src/test/results/clientpositive/tez/
Author: sershe
Date: Sat Apr 12 00:14:03 2014
New Revision: 1586801
URL: http://svn.apache.org/r1586801
Log:
HIVE-6883 : Dynamic partitioning optimization does not honor sort order or order by (Prasanth J, reviewed by Vikram Dixit K)
Modified:
hive/trunk/data/conf/tez/hive-site.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q
hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out
hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_optimization.q.out
hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_analyze.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_analyze.q.out
Modified: hive/trunk/data/conf/tez/hive-site.xml
URL: http://svn.apache.org/viewvc/hive/trunk/data/conf/tez/hive-site.xml?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
Binary files - no diff available.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Sat Apr 12 00:14:03 2014
@@ -349,6 +349,15 @@ public class SortedDynPartitionOptimizer
}
newSortOrder.addAll(sortOrder);
+ String orderStr = "";
+ for (Integer i : newSortOrder) {
+ if(i.intValue() == 1) {
+ orderStr += "+";
+ } else {
+ orderStr += "-";
+ }
+ }
+
ArrayList<ExprNodeDesc> newPartCols = Lists.newArrayList();
// we will clone here as RS will update bucket column key with its
@@ -366,9 +375,16 @@ public class SortedDynPartitionOptimizer
newPartCols.add(newValueCols.get(idx).clone());
}
- String orderStr = "";
- for (int i = 0; i < newKeyCols.size(); i++) {
- orderStr += "+";
+ // in the absence of SORTED BY clause, the sorted dynamic partition insert
+ // should honor the ordering of records provided by ORDER BY in SELECT statement
+ ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent,
+ ReduceSinkOperator.class);
+ if (parentRSOp != null) {
+ String parentRSOpOrder = parentRSOp.getConf().getOrder();
+ if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) {
+ newKeyCols.addAll(parentRSOp.getConf().getKeyCols());
+ orderStr += parentRSOpOrder;
+ }
}
// Create Key/Value TableDesc. When the operator plan is split into MR tasks,
@@ -389,13 +405,9 @@ public class SortedDynPartitionOptimizer
outValColNames, 0, "");
TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
List<List<Integer>> distinctColumnIndices = Lists.newArrayList();
- int numDistributionKeys = newPartCols.size();
- if (bucketColumns != null && !bucketColumns.isEmpty()) {
- numDistributionKeys += 1;
- }
// Number of reducers is set to default (-1)
- ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, numDistributionKeys, newValueCols,
+ ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, newKeyCols.size(), newValueCols,
outputKeyCols, distinctColumnIndices, outValColNames, -1, newPartCols, -1, keyTable,
valueTable);
rsConf.setBucketCols(bucketColumns);
Modified: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q Sat Apr 12 00:14:03 2014
@@ -56,12 +56,12 @@ create table over1k_part_buck_sort_orc(
sorted by (f) into 4 buckets stored as orc;
-- map-only jobs converted to map-reduce job by hive.optimize.sort.dynamic.partition optimization
-explain insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
explain insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
explain insert overwrite table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
explain insert overwrite table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
-insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
insert overwrite table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
insert overwrite table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
@@ -70,12 +70,12 @@ set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
-- map-reduce jobs modified by hive.optimize.sort.dynamic.partition optimization
-explain insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
explain insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
explain insert into table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
explain insert into table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
-insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by si;
insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
insert into table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
insert into table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
@@ -93,3 +93,69 @@ select count(*) from over1k_part_orc;
select count(*) from over1k_part_limit_orc;
select count(*) from over1k_part_buck_orc;
select count(*) from over1k_part_buck_sort_orc;
+
+-- tests for HIVE-6883
+create table over1k_part2_orc(
+ si smallint,
+ i int,
+ b bigint,
+ f float)
+ partitioned by (ds string, t tinyint);
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+
+desc formatted over1k_part2_orc partition(ds="foo",t=27);
+desc formatted over1k_part2_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2_orc;
+select count(*) from over1k_part2_orc;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from over1k_orc where t is null or t=27 order by i;
+
+desc formatted over1k_part2_orc partition(ds="foo",t=27);
+desc formatted over1k_part2_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2_orc;
+select count(*) from over1k_part2_orc;
+
+-- hadoop-1 does not honor number of reducers in local mode. There is always only 1 reducer irrespective of the number of buckets.
+-- Hence all records go to one bucket and all other buckets will be empty. Similar to HIVE-6867. However, hadoop-2 honors number
+-- of reducers and records are spread across all reducers. To avoid this inconsistency we will make number of buckets to 1 for this test.
+create table over1k_part_buck_sort2_orc(
+ si smallint,
+ i int,
+ b bigint,
+ f float)
+ partitioned by (t tinyint)
+ clustered by (si)
+ sorted by (f) into 1 buckets;
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2_orc partition(t=27);
+desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2_orc;
+select count(*) from over1k_part_buck_sort2_orc;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part_buck_sort2_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2_orc partition(t=27);
+desc formatted over1k_part_buck_sort2_orc partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2_orc;
+select count(*) from over1k_part_buck_sort2_orc;
Modified: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q Sat Apr 12 00:14:03 2014
@@ -87,3 +87,69 @@ select count(*) from over1k_part;
select count(*) from over1k_part_limit;
select count(*) from over1k_part_buck;
select count(*) from over1k_part_buck_sort;
+
+-- tests for HIVE-6883
+create table over1k_part2(
+ si smallint,
+ i int,
+ b bigint,
+ f float)
+ partitioned by (ds string, t tinyint);
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+
+desc formatted over1k_part2 partition(ds="foo",t=27);
+desc formatted over1k_part2 partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2;
+select count(*) from over1k_part2;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part2 partition(ds="foo",t) select si,i,b,f,t from over1k where t is null or t=27 order by i;
+
+desc formatted over1k_part2 partition(ds="foo",t=27);
+desc formatted over1k_part2 partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part2;
+select count(*) from over1k_part2;
+
+-- hadoop-1 does not honor number of reducers in local mode. There is always only 1 reducer irrespective of the number of buckets.
+-- Hence all records go to one bucket and all other buckets will be empty. Similar to HIVE-6867. However, hadoop-2 honors number
+-- of reducers and records are spread across all reducers. To avoid this inconsistency we will make number of buckets to 1 for this test.
+create table over1k_part_buck_sort2(
+ si smallint,
+ i int,
+ b bigint,
+ f float)
+ partitioned by (t tinyint)
+ clustered by (si)
+ sorted by (f) into 1 buckets;
+
+set hive.optimize.sort.dynamic.partition=false;
+explain insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+set hive.optimize.sort.dynamic.partition=true;
+explain insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+set hive.optimize.sort.dynamic.partition=false;
+insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2 partition(t=27);
+desc formatted over1k_part_buck_sort2 partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2;
+select count(*) from over1k_part_buck_sort2;
+
+set hive.optimize.sort.dynamic.partition=true;
+insert overwrite table over1k_part_buck_sort2 partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+desc formatted over1k_part_buck_sort2 partition(t=27);
+desc formatted over1k_part_buck_sort2 partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select * from over1k_part_buck_sort2;
+select count(*) from over1k_part_buck_sort2;
Modified: hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q?rev=1586801&r1=1586800&r2=1586801&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q Sat Apr 12 00:14:03 2014
@@ -24,7 +24,7 @@ CREATE TABLE orc_create_people (
state string)
STORED AS orc;
-INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging;
+INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging ORDER BY id;
set hive.stats.autogather = true;
analyze table orc_create_people compute statistics partialscan;
@@ -44,7 +44,7 @@ CREATE TABLE orc_create_people (
state string)
STORED AS orc;
-INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging;
+INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging ORDER BY id;
desc formatted orc_create_people;
@@ -64,7 +64,7 @@ PARTITIONED BY (state string)
STORED AS orc;
INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
- SELECT * FROM orc_create_people_staging;
+ SELECT * FROM orc_create_people_staging ORDER BY id;
set hive.stats.autogather = true;
analyze table orc_create_people partition(state) compute statistics partialscan;
@@ -86,7 +86,7 @@ PARTITIONED BY (state string)
STORED AS orc;
INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
- SELECT * FROM orc_create_people_staging;
+ SELECT * FROM orc_create_people_staging ORDER BY id;
desc formatted orc_create_people partition(state="Ca");
desc formatted orc_create_people partition(state="Or");
@@ -110,7 +110,7 @@ into 4 buckets
STORED AS orc;
INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
- SELECT * FROM orc_create_people_staging;
+ SELECT * FROM orc_create_people_staging ORDER BY id;
set hive.stats.autogather = true;
analyze table orc_create_people partition(state) compute statistics partialscan;
@@ -135,7 +135,7 @@ into 4 buckets
STORED AS orc;
INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
- SELECT * FROM orc_create_people_staging;
+ SELECT * FROM orc_create_people_staging ORDER BY id;
desc formatted orc_create_people partition(state="Ca");
desc formatted orc_create_people partition(state="Or");
@@ -157,7 +157,7 @@ PARTITIONED BY (state string)
STORED AS orc;
INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
- SELECT * FROM orc_create_people_staging;
+ SELECT * FROM orc_create_people_staging ORDER BY id;
-- set the table to text format
ALTER TABLE orc_create_people SET SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe';