You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gabe Church (Jira)" <ji...@apache.org> on 2021/08/19 23:00:00 UTC

[jira] [Commented] (SPARK-32985) Decouple bucket filter pruning and bucket table scan

    [ https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893 ] 

Gabe Church commented on SPARK-32985:
-------------------------------------

This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. I've manually bypassed in my own work so that these bucketed tables are still useful, but this is absolutely huge improvement to be configurable so that we can gain from bucket-pruning. In my example it can take a 2hr query to a 2min query even with the listFiles via manual read. 

My example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get
import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
import org.apache.spark.sql.execution.datasources.FilePartition
val bucket_files = for {
FilePartition(bucketId, files) <- rdd.filePartitions
f <- files
} yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("\\.").last
val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))

> Decouple bucket filter pruning and bucket table scan
> ----------------------------------------------------
>
>                 Key: SPARK-32985
>                 URL: https://issues.apache.org/jira/browse/SPARK-32985
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Assignee: Cheng Su
>            Priority: Minor
>             Fix For: 3.2.0
>
>
> As a followup from discussion in [https://github.com/apache/spark/pull/29804#discussion_r493100510] . Currently in data source v1 file scan `FileSourceScanExec`, bucket filter pruning will only take effect with bucket table scan - [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542] . However this is unnecessary, as bucket filter pruning can also happen if we disable bucketed table scan. This help query leverage the benefit from bucket filter pruning to save CPU/IO to not read unnecessary bucket files, and do not bound by bucket table scan when the parallelism of tasks is a concern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org