You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hive.apache.org by "Mostafa Mokhtar (JIRA)" <ji...@apache.org> on 2015/02/11 00:57:15 UTC

[jira] [Updated] (HIVE-9647) Discrepancy in CBO between partitioned and un-partitioned tables

     [ https://issues.apache.org/jira/browse/HIVE-9647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mostafa Mokhtar updated HIVE-9647:
----------------------------------
    Description: 
High-level summary
HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct value to estimate join selectivity.
The way statistics are aggregated for partitioned tables results in discrepancy in number of distinct values which results in different plans between partitioned and un-partitioned schemas.

The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to estimate selectivity of joins.

||Column	||Partitioned count distincts|| 	Un-Partitioned count distincts 
|sr_customer_sk	|71,245	|1,415,625|
|sr_item_sk	|38,846|	62,562|
|sr_ticket_number	|71,245	|34,931,085|
|ss_customer_sk	|88,476|	1,415,625|
|ss_item_sk	|38,846|	62,562|
|ss_ticket_number|	100,756	|56,256,175|

	
The discrepancy is because NDV calculation for a partitioned table assumes that the NDV range is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from PART_COL_STATS” .
This is problematic for columns like ticket number which are naturally increasing with the partitioned date column ss_sold_date_sk.
Suggestions
Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors which we can use as a reference here 
Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS extrapolate the NDV for the qualified partitions as in :
Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS))
More details
While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans are different, then I dumped the CBO logical plan and I found that join estimates are drastically different

Unpartitioned schema :
{code}
2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
    HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2952
      HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3], ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2982
        HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980
          HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]): rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964
            HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3], ss_ticket_number=[$9], ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2920
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2822
            HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9], sr_return_quantity=[$10]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2823
          HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
            HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
              HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2821
{code}

Partitioned schema :
{code}
2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
    HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2787
      HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7], ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2817
        HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2815
          HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8], sr_return_quantity=[$9]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758
            HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
          HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093, cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801
            HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8], ss_quantity=[$9], ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2755
              HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
            HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
              HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
                HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2656
{code}

This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.

Column statistics from TAB_COL_STATS, notice how the column statistics are identical in both cases.
{code}
DB_NAME	COLUMN_NAME	COLUMN_TYPE	NUM_NULLS	LONG_HIGH_VALUE	LONG_LOW_VALUE	MAX_COL_LEN	NUM_DISTINCTS
tpcds_bin_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
tpcds_bin_partitioned_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
tpcds_bin_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
tpcds_bin_partitioned_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
tpcds_bin_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
tpcds_bin_partitioned_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
tpcds_bin_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_partitioned_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
tpcds_bin_partitioned_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
tpcds_bin_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
tpcds_bin_partitioned_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
tpcds_bin_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_partitioned_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
tpcds_bin_partitioned_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
tpcds_bin_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
tpcds_bin_partitioned_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
{code}

For partitioned tables we get the statistics using get_aggr_stats_for which eventually issues the query below

{code}
select 
    COLUMN_NAME,
    COLUMN_TYPE,
    …
    max(NUM_DISTINCTS),
    …
from
    PART_COL_STATS
Where
where
    DB_NAME = 
        and TABLE_NAME = 
        and COLUMN_NAME in 
        and PARTITION_NAME in (1 … N)
group by COLUMN_NAME , COLUMN_TYPE;
{code}
 …

  was:
High-level summary
HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct value to estimate join selectivity.
The way statistics are aggregated for partitioned tables results in discrepancy in number of distinct values which results in different plans between partitioned and un-partitioned schemas.

The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to estimate selectivity of joins.
{code}
Column	Partitioned count distincts 	Un-Partitioned count distincts 
sr_customer_sk	71,245	1,415,625
sr_item_sk	38,846	62,562
sr_ticket_number	71,245	34,931,085
ss_customer_sk	88,476	1,415,625
ss_item_sk	38,846	62,562
ss_ticket_number	100,756	56,256,175
{code}
	
The discrepancy is because NDV calculation for a partitioned table assumes that the NDV range is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from PART_COL_STATS” .
This is problematic for columns like ticket number which are naturally increasing with the partitioned date column ss_sold_date_sk.
Suggestions
Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors which we can use as a reference here 
Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS extrapolate the NDV for the qualified partitions as in :
Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS))
More details
While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans are different, then I dumped the CBO logical plan and I found that join estimates are drastically different

Unpartitioned schema :
{code}
2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
    HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2952
      HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3], ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2982
        HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980
          HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]): rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964
            HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3], ss_ticket_number=[$9], ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2920
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2822
            HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9], sr_return_quantity=[$10]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2823
          HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
            HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
              HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2821
{code}

Partitioned schema :
{code}
2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
    HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2787
      HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7], ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2817
        HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2815
          HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8], sr_return_quantity=[$9]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758
            HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
          HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093, cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801
            HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8], ss_quantity=[$9], ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2755
              HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
            HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
              HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
                HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2656
{code}

This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.

Column statistics from TAB_COL_STATS, notice how the column statistics are identical in both cases.
{code}
DB_NAME	COLUMN_NAME	COLUMN_TYPE	NUM_NULLS	LONG_HIGH_VALUE	LONG_LOW_VALUE	MAX_COL_LEN	NUM_DISTINCTS
tpcds_bin_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
tpcds_bin_partitioned_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
tpcds_bin_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
tpcds_bin_partitioned_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
tpcds_bin_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
tpcds_bin_partitioned_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
tpcds_bin_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_partitioned_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
tpcds_bin_partitioned_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
tpcds_bin_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
tpcds_bin_partitioned_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
tpcds_bin_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_partitioned_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
tpcds_bin_partitioned_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
tpcds_bin_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
tpcds_bin_partitioned_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
{code}

For partitioned tables we get the statistics using get_aggr_stats_for which eventually issues the query below

{code}
select 
    COLUMN_NAME,
    COLUMN_TYPE,
    …
    max(NUM_DISTINCTS),
    …
from
    PART_COL_STATS
Where
where
    DB_NAME = 
        and TABLE_NAME = 
        and COLUMN_NAME in 
        and PARTITION_NAME in (1 … N)
group by COLUMN_NAME , COLUMN_TYPE;
{code}
 …


> Discrepancy in CBO between partitioned and un-partitioned tables 
> -----------------------------------------------------------------
>
>                 Key: HIVE-9647
>                 URL: https://issues.apache.org/jira/browse/HIVE-9647
>             Project: Hive
>          Issue Type: Bug
>          Components: CBO
>    Affects Versions: 0.14.0
>            Reporter: Mostafa Mokhtar
>            Assignee: Gunther Hagleitner
>             Fix For: 1.2.0
>
>
> High-level summary
> HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct value to estimate join selectivity.
> The way statistics are aggregated for partitioned tables results in discrepancy in number of distinct values which results in different plans between partitioned and un-partitioned schemas.
> The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to estimate selectivity of joins.
> ||Column	||Partitioned count distincts|| 	Un-Partitioned count distincts 
> |sr_customer_sk	|71,245	|1,415,625|
> |sr_item_sk	|38,846|	62,562|
> |sr_ticket_number	|71,245	|34,931,085|
> |ss_customer_sk	|88,476|	1,415,625|
> |ss_item_sk	|38,846|	62,562|
> |ss_ticket_number|	100,756	|56,256,175|
> 	
> The discrepancy is because NDV calculation for a partitioned table assumes that the NDV range is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from PART_COL_STATS” .
> This is problematic for columns like ticket number which are naturally increasing with the partitioned date column ss_sold_date_sk.
> Suggestions
> Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors which we can use as a reference here 
> Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS extrapolate the NDV for the qualified partitions as in :
> Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS))
> More details
> While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans are different, then I dumped the CBO logical plan and I found that join estimates are drastically different
> Unpartitioned schema :
> {code}
> 2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
> HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956
>   HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
>     HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2952
>       HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3], ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2982
>         HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980
>           HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]): rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964
>             HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3], ss_ticket_number=[$9], ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2920
>               HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2822
>             HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9], sr_return_quantity=[$10]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923
>               HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2823
>           HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
>             HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
>               HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2821
> {code}
> Partitioned schema :
> {code}
> 2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
> HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791
>   HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
>     HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2787
>       HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7], ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2817
>         HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2815
>           HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8], sr_return_quantity=[$9]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758
>             HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
>           HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093, cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801
>             HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8], ss_quantity=[$9], ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2755
>               HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
>             HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
>               HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
>                 HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2656
> {code}
> This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.
> Column statistics from TAB_COL_STATS, notice how the column statistics are identical in both cases.
> {code}
> DB_NAME	COLUMN_NAME	COLUMN_TYPE	NUM_NULLS	LONG_HIGH_VALUE	LONG_LOW_VALUE	MAX_COL_LEN	NUM_DISTINCTS
> tpcds_bin_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
> tpcds_bin_partitioned_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
> tpcds_bin_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
> tpcds_bin_partitioned_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
> tpcds_bin_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
> tpcds_bin_partitioned_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
> tpcds_bin_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
> tpcds_bin_partitioned_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
> tpcds_bin_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
> tpcds_bin_partitioned_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
> tpcds_bin_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
> tpcds_bin_partitioned_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
> tpcds_bin_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
> tpcds_bin_partitioned_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
> tpcds_bin_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
> tpcds_bin_partitioned_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
> tpcds_bin_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
> tpcds_bin_partitioned_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
> {code}
> For partitioned tables we get the statistics using get_aggr_stats_for which eventually issues the query below
> {code}
> select 
>     COLUMN_NAME,
>     COLUMN_TYPE,
>     …
>     max(NUM_DISTINCTS),
>     …
> from
>     PART_COL_STATS
> Where
> where
>     DB_NAME = 
>         and TABLE_NAME = 
>         and COLUMN_NAME in 
>         and PARTITION_NAME in (1 … N)
> group by COLUMN_NAME , COLUMN_TYPE;
> {code}
>  …



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)