You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nick Hryhoriev (Jira)" <ji...@apache.org> on 2021/05/06 07:14:00 UTC

[jira] [Commented] (SPARK-34204) When use input_file_name() func all column from file appeared in physical plan of query, not only projection.

    [ https://issues.apache.org/jira/browse/SPARK-34204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340038#comment-17340038 ] 

Nick Hryhoriev commented on SPARK-34204:
----------------------------------------

I write every simple code to avoid it, it little bit hacky but still works for me.


{code:java}
implicit class EnrichWithFilePathAndModificationTime(df: DataFrame) {

  def withFilePath(fileColumn: String)(implicit spark: SparkSession): DataFrame = {
    val existingFilesByPartition = df.rdd
      .partitions
      .map {
        case partition: FilePartition =>
          assert(partition.files.length == 1) // Spark must be configured to read one file per partition.
          partition.index -> partition.files.head.filePath
      }.toMap

    val partIdRowEncoder = RowEncoder.apply(
      df.schema
        .add(fileColumn, StringType)
    )
    df.mapPartitions { it =>
      val sparkPartitionId = TaskContext.get().partitionId()
      val file = existingFilesByPartition(sparkPartitionId)
      it.map(r => Row.fromSeq(r.toSeq ++ Seq(file)))
    }(partIdRowEncoder)
  }

}
{code}
This code snippet works only together with
{code:java}
// Do not change, our custom logic require only 1 file in spark partition
.set("spark.sql.files.openCostInBytes", Int.MaxValue.toString){code}
which may not suit use cases with a too big amount of small files.
Please advise if anyone knows a better way.



 

> When use input_file_name() func all column from file appeared in physical plan of query, not only projection.
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-34204
>                 URL: https://issues.apache.org/jira/browse/SPARK-34204
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.7
>            Reporter: Nick Hryhoriev
>            Priority: Major
>
> input_file_name() function damage applying projection to the physical plan of the query.
>  if use this function and a new column, column-oriented formats like parquet and orc put all columns to Physical plan.
>  While without it, only selected columns uploaded.
>  In my case, performance influence is x30.
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> object TestSize {
>   def main(args: Array[String]): Unit = {
>     implicit val spark: SparkSession = SparkSession.builder()
>       .master("local")
>       .config("spark.sql.shuffle.partitions", "5")
>       .getOrCreate()
>     import spark.implicits._
>     val query1 = spark.read.parquet(
>       "s3a://part-00040-a19f0d20-eab3-48ef-be5a-602c7f9a8e58.c000.gz.parquet"
>     )
>       .select($"app_id", $"idfa", input_file_name().as("fileName"))
>       .distinct()
>       .count()
>    val query2 = spark.read.parquet( "s3a://part-00040-a19f0d20-eab3-48ef-be5a- 602c7f9a8e58.c000.gz.parquet" ) 
>       .select($"app_id", $"idfa")
>       .distinct() 
>       .count()
>     Thread.sleep(10000000000L)
>   }
> }
> {code}
> `query1` has all columns in the physical plan, while `query2` only two.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org