You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chao Sun (Jira)" <ji...@apache.org> on 2021/04/07 21:50:00 UTC

[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

    [ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316714#comment-17316714 ] 

Chao Sun commented on SPARK-34780:
----------------------------------

Hi [~mikechen] (and sorry for the late reply again), thanks for providing another very useful code snippet! I'm not sure if this qualifies as correctness issue though since it is (to me) more like different interpretations of malformed columns in CSV? 

My previous statement about {{SessionState}} is incorrect. It seems the conf in {{SessionState}} is always the most up-to-date one. The only solution I can think of to solve this issue is to take conf into account when checking equality of {{HadoopFsRelation}} (and potentially others), which means we'd need to define equality for {{SQLConf}}..

> Cached Table (parquet) with old Configs Used
> --------------------------------------------
>
>                 Key: SPARK-34780
>                 URL: https://issues.apache.org/jira/browse/SPARK-34780
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4, 3.1.1
>            Reporter: Michael Chen
>            Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
>     val tableDir = dir.getAbsoluteFile + "/table"
>     val df = Seq("a").toDF("key")
>     df.write.parquet(tableDir)
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1Stats = spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
>     val df2 = spark.read.parquet(tableDir).select("key")
>     df2.cache()
>     val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     // I expect these stats to be the same because file compression factor is the same
>     assert(compression1Stats == compression1StatsWithCache)
>     // Instead, we can see the file compression factor is being cached and used along with
>     // the logical plan
>     assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org