You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Chen (Jira)" <ji...@apache.org> on 2021/03/17 20:00:12 UTC

[jira] [Created] (SPARK-34780) Cached Table (parquet) with old Configs Used

Michael Chen created SPARK-34780:
------------------------------------

             Summary: Cached Table (parquet) with old Configs Used
                 Key: SPARK-34780
                 URL: https://issues.apache.org/jira/browse/SPARK-34780
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.4
            Reporter: Michael Chen


When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358).
{code:java}
test("cache uses old SQLConf") {
  import testImplicits._
  withTempDir { dir =>
    val tableDir = dir.getAbsoluteFile + "/table"
    val df = Seq("a").toDF("key")
    df.write.parquet(tableDir)
    SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
    val compression1Stats = spark.read.parquet(tableDir).select("key").
      queryExecution.optimizedPlan.collect {
      case l: LogicalRelation => l
      case m: InMemoryRelation => m
    }.map(_.computeStats())

    SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
    val df2 = spark.read.parquet(tableDir).select("key")
    df2.cache()
    val compression10Stats = df2.queryExecution.optimizedPlan.collect {
      case l: LogicalRelation => l
      case m: InMemoryRelation => m
    }.map(_.computeStats())

    SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
    val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
      queryExecution.optimizedPlan.collect {
      case l: LogicalRelation => l
      case m: InMemoryRelation => m
    }.map(_.computeStats())

    // I expect these stats to be the same because file compression factor is the same
    assert(compression1Stats == compression1StatsWithCache)
    // Instead, we can see the file compression factor is being cached and used along with
    // the logical plan
    assert(compression10Stats == compression1StatsWithCache)
  }
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org