You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2022/08/26 04:36:00 UTC

[jira] [Resolved] (SPARK-40206) Spark SQL Predict Pushdown for Hive Bucketed Table

     [ https://issues.apache.org/jira/browse/SPARK-40206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-40206.
----------------------------------
    Resolution: Invalid

> Spark SQL Predict Pushdown for Hive Bucketed Table
> --------------------------------------------------
>
>                 Key: SPARK-40206
>                 URL: https://issues.apache.org/jira/browse/SPARK-40206
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 3.3.0
>            Reporter: Raymond Tang
>            Priority: Minor
>              Labels: hive, hive-buckets, spark, spark-sql
>
> Hi team,
> I was testing out Hive bucket table features.  One of the benefits as most documentation suggested is that bucketed hive table can be used for query filer/predict pushdown to improve query performance.
> However through my exploration, that doesn't seem to be true. *Can you please help to clarify if Spark SQL supports query optimizations when using Hive bucketed table?*
>  
> How to produce the issue:
> Create a Hive 3 table using the following DDL:
> {code:java}
> create table test_db.bucket_table(user_id int, key string) 
> comment 'A bucketed table' 
> partitioned by(country string) 
> clustered by(user_id) sorted by (key) into 10 buckets
> stored as ORC;{code}
> And then insert into this table using the following PySpark script:
> {code:java}
> from pyspark.sql import SparkSession
> appName = "PySpark Hive Bucketing Example"
> master = "local"
> # Create Spark session with Hive supported.
> spark = SparkSession.builder \
>     .appName(appName) \
>     .master(master) \
>     .enableHiveSupport() \
>     .getOrCreate()
> # prepare sample data for inserting into hive table
> data = []
> countries = ['CN', 'AU']
> for i in range(0, 1000):
>     data.append([int(i),  'U'+str(i), countries[i % 2]])
> df = spark.createDataFrame(data, ['user_id', 'key', 'country'])
> df.show()
> # Save df to Hive table test_db.bucket_table
> df.write.mode('append').insertInto('test_db.bucket_table') {code}
> Then query the table using the following script:
> {code:java}
> from pyspark.sql import SparkSession
> appName = "PySpark Hive Bucketing Example"
> master = "local"
> # Create Spark session with Hive supported.
> spark = SparkSession.builder \
>     .appName(appName) \
>     .master(master) \
>     .enableHiveSupport() \
>     .getOrCreate()
> df = spark.sql("""select * from test_db.bucket_table
> where country='AU' and user_id=101
> """)
> df.show()
> df.explain(extended=True) {code}
> I am expecting to read from only one bucket file in HDFS but instead Spark scanned all bucket files in partition folder country=AU.
> {code:java}
> == Parsed Logical Plan ==
> 'Project [*]
>  - 'Filter (('country = AU) AND ('t1.user_id = 101))
>     - 'SubqueryAlias t1
>        - 'UnresolvedRelation [test_db, bucket_table], [], false
> == Analyzed Logical Plan ==
> user_id: int, key: string, country: string
> Project [user_id#20, key#21, country#22]
>  - Filter ((country#22 = AU) AND (user_id#20 = 101))
>     - SubqueryAlias t1
>        - SubqueryAlias spark_catalog.test_db.bucket_table
>           - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc
> == Optimized Logical Plan ==
> Filter (((isnotnull(country#22) AND isnotnull(user_id#20)) AND (country#22 = AU)) AND (user_id#20 = 101))
>  - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc
> == Physical Plan ==
> *(1) Filter (isnotnull(user_id#20) AND (user_id#20 = 101))
>  - *(1) ColumnarToRow
>     - FileScan orc test_db.bucket_table[user_id#20,key#21,country#22] Batched: true, DataFilters: [isnotnull(user_id#20), (user_id#20 = 101)], Format: ORC, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/bucket_table/coun..., PartitionFilters: [isnotnull(country#22), (country#22 = AU)], PushedFilters: [IsNotNull(user_id), EqualTo(user_id,101)], ReadSchema: struct<user_id:int,key:string>   {code}
> *Am I doing something wrong? or is it because Spark doesn't support it? Your guidance and help will be appreciated.* 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org