You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/10/26 20:33:07 UTC

[spark] branch branch-3.0 updated: [SPARK-37098][SQL][3.0] Alter table properties should invalidate cache

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2186295  [SPARK-37098][SQL][3.0] Alter table properties should invalidate cache
2186295 is described below

commit 21862951eac0d0eb2df4670201ba6435fe427e7e
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Oct 26 13:29:15 2021 -0700

    [SPARK-37098][SQL][3.0] Alter table properties should invalidate cache
    
    This PR backport https://github.com/apache/spark/pull/34365 to branch-3.0
    
    ### What changes were proposed in this pull request?
    
    Invalidate the table cache after alter table properties (set and unset).
    
    ### Why are the changes needed?
    
    The table properties can change the behavior of wriing. e.g. the parquet table with `parquet.compression`.
    
    If you execute the following SQL, we will get the file with snappy compression rather than zstd.
    ```
    CREATE TABLE t (c int) STORED AS PARQUET;
    // cache table metadata
    SELECT * FROM t;
    ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd');
    INSERT INTO TABLE t values(1);
    ```
    So we should invalidate the table cache after alter table properties.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    Add test
    
    Closes #34379 from ulysses-you/SPARK-37098-3.0.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/catalyst/catalog/SessionCatalog.scala      |  7 +++++++
 .../apache/spark/sql/execution/command/ddl.scala   |  2 ++
 .../apache/spark/sql/hive/HiveParquetSuite.scala   | 24 ++++++++++++++++++++++
 3 files changed, 33 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 389e410..03ec106 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -158,6 +158,13 @@ class SessionCatalog(
     tableRelationCache.invalidate(key)
   }
 
+  /** This method discards any cached table relation plans for the given table identifier. */
+  def invalidateCachedTable(name: TableIdentifier): Unit = {
+    val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
+    val tableName = formatTableName(name.table)
+    invalidateCachedTable(QualifiedTableName(dbName, tableName))
+  }
+
   /** This method provides a way to invalidate all the cached plans. */
   def invalidateAllCachedTables(): Unit = {
     tableRelationCache.invalidateAll()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 4496b0c..3f44049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -278,6 +278,7 @@ case class AlterTableSetPropertiesCommand(
       properties = table.properties ++ properties,
       comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
     catalog.alterTable(newTable)
+    catalog.invalidateCachedTable(tableName)
     Seq.empty[Row]
   }
 
@@ -316,6 +317,7 @@ case class AlterTableUnsetPropertiesCommand(
     val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
     val newTable = table.copy(properties = newProperties, comment = tableComment)
     catalog.alterTable(newTable)
+    catalog.invalidateCachedTable(tableName)
     Seq.empty[Row]
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 470c6a3..fd9e63ec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -106,4 +106,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
       }
     }
   }
+
+  test("SPARK-37098: Alter table properties should invalidate cache") {
+    // specify the compression in case we change it in future
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+      withTempPath { dir =>
+        withTable("t") {
+          sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'")
+          // cache table metadata
+          sql("SELECT * FROM t")
+          sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='gzip')")
+          sql("INSERT INTO TABLE t values(1)")
+          val files1 = dir.listFiles().filter(_.getName.endsWith("gz.parquet"))
+          assert(files1.length == 1)
+
+          // cache table metadata again
+          sql("SELECT * FROM t")
+          sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')")
+          sql("INSERT INTO TABLE t values(1)")
+          val files2 = dir.listFiles().filter(_.getName.endsWith("snappy.parquet"))
+          assert(files2.length == 1)
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org