You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nick Hryhoriev (Jira)" <ji...@apache.org> on 2021/05/06 07:14:00 UTC
[jira] [Commented] (SPARK-34204) When use input_file_name() func
all column from file appeared in physical plan of query, not only
projection.
[ https://issues.apache.org/jira/browse/SPARK-34204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340038#comment-17340038 ]
Nick Hryhoriev commented on SPARK-34204:
----------------------------------------
I write every simple code to avoid it, it little bit hacky but still works for me.
{code:java}
implicit class EnrichWithFilePathAndModificationTime(df: DataFrame) {
def withFilePath(fileColumn: String)(implicit spark: SparkSession): DataFrame = {
val existingFilesByPartition = df.rdd
.partitions
.map {
case partition: FilePartition =>
assert(partition.files.length == 1) // Spark must be configured to read one file per partition.
partition.index -> partition.files.head.filePath
}.toMap
val partIdRowEncoder = RowEncoder.apply(
df.schema
.add(fileColumn, StringType)
)
df.mapPartitions { it =>
val sparkPartitionId = TaskContext.get().partitionId()
val file = existingFilesByPartition(sparkPartitionId)
it.map(r => Row.fromSeq(r.toSeq ++ Seq(file)))
}(partIdRowEncoder)
}
}
{code}
This code snippet works only together with
{code:java}
// Do not change, our custom logic require only 1 file in spark partition
.set("spark.sql.files.openCostInBytes", Int.MaxValue.toString){code}
which may not suit use cases with a too big amount of small files.
Please advise if anyone knows a better way.
> When use input_file_name() func all column from file appeared in physical plan of query, not only projection.
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-34204
> URL: https://issues.apache.org/jira/browse/SPARK-34204
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.7
> Reporter: Nick Hryhoriev
> Priority: Major
>
> input_file_name() function damage applying projection to the physical plan of the query.
> if use this function and a new column, column-oriented formats like parquet and orc put all columns to Physical plan.
> While without it, only selected columns uploaded.
> In my case, performance influence is x30.
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> object TestSize {
> def main(args: Array[String]): Unit = {
> implicit val spark: SparkSession = SparkSession.builder()
> .master("local")
> .config("spark.sql.shuffle.partitions", "5")
> .getOrCreate()
> import spark.implicits._
> val query1 = spark.read.parquet(
> "s3a://part-00040-a19f0d20-eab3-48ef-be5a-602c7f9a8e58.c000.gz.parquet"
> )
> .select($"app_id", $"idfa", input_file_name().as("fileName"))
> .distinct()
> .count()
> val query2 = spark.read.parquet( "s3a://part-00040-a19f0d20-eab3-48ef-be5a- 602c7f9a8e58.c000.gz.parquet" )
> .select($"app_id", $"idfa")
> .distinct()
> .count()
> Thread.sleep(10000000000L)
> }
> }
> {code}
> `query1` has all columns in the physical plan, while `query2` only two.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org