You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zzzzming95 (Jira)" <ji...@apache.org> on 2022/10/02 16:27:00 UTC

[jira] [Comment Edited] (SPARK-40610) Spark fall back to use getPartitions instead of getPartitionsByFilter when date_add functions used in where clause

    [ https://issues.apache.org/jira/browse/SPARK-40610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612135#comment-17612135 ] 

zzzzming95 edited comment on SPARK-40610 at 10/2/22 4:26 PM:
-------------------------------------------------------------

Spark will perform partition pruning when executing the sql of hive (that is, obtain the required sql partition from hive).
{code:java}
spark-sql> show partitions test_dt;
dt=2022-01-10
dt=2022-01-14
dt=2022-01-15
dt=2022-01-20
dt=2022-01-21
dt=2022-02-11
dt=2022-02-12
dt=2022-02-13
dt=2022-02-14
Time taken: 0.13 seconds, Fetched 9 row(s)
spark-sql> select * from test_dt where dt > '2022-02-11';
1    2022-02-12
1    2022-02-13
1    2022-02-14
Time taken: 0.343 seconds, Fetched 3 row(s) {code}
In my metastore log , i print the partition result, it just return the required partition :
{code:java}
2022-10-03 00:21:54,746 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 endFunction: get_MS
2022-10-03 00:21:54,767 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 endFunction: get_partitions_by_filter
2022-10-03 00:21:54,767 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 startFunction: get_partitions_by_filter: db=default tbl=test_dt par=[2022-02-12,2022-02-13,2022-02-14] {code}
 

When the partition cannot be trimmed (for example, the partition predicate contains udf: dt>=date_add ('2022-09-22 ', - 3)). Spark will get all partitions from hive and then filter the required partitions in memory.
{code:java}
spark-sql>  select * from test_dt where dt > date_add('2022-09-22',-3 );
Time taken: 0.187 seconds
spark-sql> {code}
metastore log :
{code:java}
2022-10-03 00:23:45,357 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 endFunction: get_partitions
2022-10-03 00:23:45,357 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 startFunction: get_partitions: db=default tbl=test_dt par=[2022-01-10,2022-01-14,2022-01-15,2022-01-20,2022-01-21,2022-02-11,2022-02-12,2022-02-13,2022-02-14] {code}
We can see that hive will return all partitions.

 

I tested spark 2.4 and spark 3. When using this condition: dt>=date_ add('2022-09-22',-3 )。 Spark will get all partitions from hive and then filter the required partitions in memory. When the number of partitions is too large, it is easy to cause Read timed out de


was (Author: zing):
Spark will perform partition pruning when executing the sql of hive (that is, obtain the required sql partition from hive).
{code:java}
spark-sql> show partitions test_dt;
dt=2022-01-10
dt=2022-01-14
dt=2022-01-15
dt=2022-01-20
dt=2022-01-21
dt=2022-02-11
dt=2022-02-12
dt=2022-02-13
dt=2022-02-14
Time taken: 0.13 seconds, Fetched 9 row(s)
spark-sql> select * from test_dt where dt > '2022-02-11';
1    2022-02-12
1    2022-02-13
1    2022-02-14
Time taken: 0.343 seconds, Fetched 3 row(s) {code}
In my metastore log , i print the partition result:
{code:java}
2022-10-03 00:21:54,746 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 endFunction: get_MS
2022-10-03 00:21:54,767 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 endFunction: get_partitions_by_filter
2022-10-03 00:21:54,767 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 startFunction: get_partitions_by_filter: db=default tbl=test_dt par=[2022-02-12,2022-02-13,2022-02-14] {code}
 

When the partition cannot be trimmed (for example, the partition predicate contains udf: dt>=date_add ('2022-09-22 ', - 3)). Spark will get all partitions from hive and then filter the required partitions in memory.
{code:java}
spark-sql>  select * from test_dt where dt > date_add('2022-09-22',-3 );
Time taken: 0.187 seconds
spark-sql> {code}
metastore log :
{code:java}
2022-10-03 00:23:45,357 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 endFunction: get_partitions
2022-10-03 00:23:45,357 INFO metastore.HiveMetaStore: 7: source:127.0.0.1 startFunction: get_partitions: db=default tbl=test_dt par=[2022-01-10,2022-01-14,2022-01-15,2022-01-20,2022-01-21,2022-02-11,2022-02-12,2022-02-13,2022-02-14] {code}
We can see that hive will return all partitions.

 

I tested spark 2.4 and spark 3. When using this condition: dt>=date_ add('2022-09-22',-3 )。 Spark will get all partitions from hive and then filter the required partitions in memory. When the number of partitions is too large, it is easy to cause Read timed out de

> Spark fall back to use getPartitions instead of getPartitionsByFilter when date_add functions used in where clause 
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-40610
>                 URL: https://issues.apache.org/jira/browse/SPARK-40610
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.1
>         Environment: edw.tmp_test_metastore_usage_source is a big table with 1000 partitions and hundreds of columns
>            Reporter: icyjhl
>            Priority: Major
>         Attachments: image-2022-10-03-00-21-24-506.png, spark_error.log, spark_sql.sql, sql_in_mysql.sql
>
>
> When I run a insert overwrite statement, I got error saying:
>  
> {code:java}
> MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. listPartitions {code}
>  
> It's weird as I only selected for about 3 partitions, so I rerun the sql and checked the metastore, then I found it's fetching all columns in all partitions:
>  
> {code:java}
> select "CD_ID", "COMMENT", "COLUMN_NAME", "TYPE_NAME" from "COLUMNS_V2" where "CD_ID" 
> in (675384,675393,675385,675394,675396,675397,675395,675398,675399,675401,675402,675400,675406……){code}
>  
>  
> After testing, I found the problem is with the date_add function in where clause, if remove it ,sql works fine, else metastore would fetch all columns in all partitions.
>  
>  
> {code:java}
> insert overwrite table test.tmp_test_metastore_usage
> SELECT userid
>     ,SUBSTR(sendtime,1,10) AS creation_date
>     ,cast(json_bh_esdate_deltadays_max as DECIMAL(38,2)) AS bh_esdate_deltadays_max
>     ,json_bh_qiye_industryphyname AS bh_qiye_industryphyname
>     ,cast(json_bh_esdate_deltadays_min as DECIMAL(38,2)) AS bh_esdate_deltadays_min
>     ,cast(json_bh_subconam_min as DECIMAL(38,2)) AS bh_subconam_min
>     ,cast(json_bh_qiye_regcap_min as DECIMAL(38,2)) AS bh_qiye_regcap_min
>     ,json_bh_industryphyname AS bh_industryphyname
>     ,cast(json_bh_subconam_mean as DECIMAL(38,2)) AS bh_subconam_mean
>     ,cast(json_bh_industryphyname_nunique as DECIMAL(38,2)) AS bh_industryphyname_nunique
>     ,cast(current_timestamp() as string) as dw_cre_date
>     ,cast(current_timestamp() as string) as dw_upd_date
> FROM (
>     SELECT userid
>         ,sendtime
>         ,json_bh_esdate_deltadays_max
>         ,json_bh_qiye_industryphyname
>         ,json_bh_esdate_deltadays_min
>         ,json_bh_subconam_min
>         ,json_bh_qiye_regcap_min
>         ,json_bh_industryphyname
>         ,json_bh_subconam_mean
>         ,json_bh_industryphyname_nunique
>         ,row_number() OVER (
>             PARTITION BY userid,dt ORDER BY sendtime DESC
>             ) rn
>     FROM edw.tmp_test_metastore_usage_source
>     WHERE dt >= date_add('2022-09-22',-3 )
>         AND json_bizid IN ('6101')
>         AND json_dingid IN ('611')
>     ) t
> WHERE rn = 1 {code}
>  
>  By the way 2.4.7 works good.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org