You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2021/03/19 01:18:00 UTC
[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old
Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304561#comment-17304561 ]
Hyukjin Kwon commented on SPARK-34780:
--------------------------------------
cc [~sunchao] [~maxgekk] FYI
> Cached Table (parquet) with old Configs Used
> --------------------------------------------
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.4, 3.1.1
> Reporter: Michael Chen
> Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
> import testImplicits._
> withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
> queryExecution.optimizedPlan.collect {
> case l: LogicalRelation => l
> case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
> case l: LogicalRelation => l
> case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
> queryExecution.optimizedPlan.collect {
> case l: LogicalRelation => l
> case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
> }
> }{code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org