You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/10/26 20:33:07 UTC
[spark] branch branch-3.0 updated: [SPARK-37098][SQL][3.0] Alter
table properties should invalidate cache
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2186295 [SPARK-37098][SQL][3.0] Alter table properties should invalidate cache
2186295 is described below
commit 21862951eac0d0eb2df4670201ba6435fe427e7e
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Oct 26 13:29:15 2021 -0700
[SPARK-37098][SQL][3.0] Alter table properties should invalidate cache
This PR backport https://github.com/apache/spark/pull/34365 to branch-3.0
### What changes were proposed in this pull request?
Invalidate the table cache after alter table properties (set and unset).
### Why are the changes needed?
The table properties can change the behavior of wriing. e.g. the parquet table with `parquet.compression`.
If you execute the following SQL, we will get the file with snappy compression rather than zstd.
```
CREATE TABLE t (c int) STORED AS PARQUET;
// cache table metadata
SELECT * FROM t;
ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd');
INSERT INTO TABLE t values(1);
```
So we should invalidate the table cache after alter table properties.
### Does this PR introduce _any_ user-facing change?
yes, bug fix
### How was this patch tested?
Add test
Closes #34379 from ulysses-you/SPARK-37098-3.0.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../sql/catalyst/catalog/SessionCatalog.scala | 7 +++++++
.../apache/spark/sql/execution/command/ddl.scala | 2 ++
.../apache/spark/sql/hive/HiveParquetSuite.scala | 24 ++++++++++++++++++++++
3 files changed, 33 insertions(+)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 389e410..03ec106 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -158,6 +158,13 @@ class SessionCatalog(
tableRelationCache.invalidate(key)
}
+ /** This method discards any cached table relation plans for the given table identifier. */
+ def invalidateCachedTable(name: TableIdentifier): Unit = {
+ val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
+ val tableName = formatTableName(name.table)
+ invalidateCachedTable(QualifiedTableName(dbName, tableName))
+ }
+
/** This method provides a way to invalidate all the cached plans. */
def invalidateAllCachedTables(): Unit = {
tableRelationCache.invalidateAll()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 4496b0c..3f44049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -278,6 +278,7 @@ case class AlterTableSetPropertiesCommand(
properties = table.properties ++ properties,
comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
catalog.alterTable(newTable)
+ catalog.invalidateCachedTable(tableName)
Seq.empty[Row]
}
@@ -316,6 +317,7 @@ case class AlterTableUnsetPropertiesCommand(
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties, comment = tableComment)
catalog.alterTable(newTable)
+ catalog.invalidateCachedTable(tableName)
Seq.empty[Row]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 470c6a3..fd9e63ec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -106,4 +106,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
}
}
}
+
+ test("SPARK-37098: Alter table properties should invalidate cache") {
+ // specify the compression in case we change it in future
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ withTempPath { dir =>
+ withTable("t") {
+ sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'")
+ // cache table metadata
+ sql("SELECT * FROM t")
+ sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='gzip')")
+ sql("INSERT INTO TABLE t values(1)")
+ val files1 = dir.listFiles().filter(_.getName.endsWith("gz.parquet"))
+ assert(files1.length == 1)
+
+ // cache table metadata again
+ sql("SELECT * FROM t")
+ sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')")
+ sql("INSERT INTO TABLE t values(1)")
+ val files2 = dir.listFiles().filter(_.getName.endsWith("snappy.parquet"))
+ assert(files2.length == 1)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org