You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/30 14:10:40 UTC
[spark] branch master updated: [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c02823b49a [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace
8c02823b49a is described below
commit 8c02823b49a6f28005236e4965a25e664d73ebea
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Jun 30 22:10:18 2022 +0800
[SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace
### What changes were proposed in this pull request?
Make RefreshTable be compatible with 3 layer namespace
### Why are the changes needed?
This is a part of the effort to make Catalog API support 3l namespace
### Does this PR introduce _any_ user-facing change?
Yes. The API will support 3l namespace but maintain backwards compatibility
### How was this patch tested?
UT
Closes #36983 from amaliujia/refreshtable.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/internal/CatalogImpl.scala | 23 +++++++++++++++-------
.../apache/spark/sql/internal/CatalogSuite.scala | 21 ++++++++++++++++++++
2 files changed, 37 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 97226736691..98220f3b229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
@@ -700,17 +701,25 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
- val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = sparkSession.table(tableIdent).queryExecution.analyzed
+ val relation = sparkSession.table(tableName).queryExecution.analyzed
relation.refresh()
// Temporary and global temporary views are not supposed to be put into the relation cache
- // since they are tracked separately.
- if (!sessionCatalog.isTempView(tableIdent)) {
- sessionCatalog.invalidateCachedTable(tableIdent)
+ // since they are tracked separately. V1 and V2 plans are cache invalidated accordingly.
+ relation match {
+ case SubqueryAlias(_, v: View) if !v.isTempView =>
+ sessionCatalog.invalidateCachedTable(v.desc.identifier)
+ case SubqueryAlias(_, r: LogicalRelation) =>
+ sessionCatalog.invalidateCachedTable(r.catalogTable.get.identifier)
+ case SubqueryAlias(_, h: HiveTableRelation) =>
+ sessionCatalog.invalidateCachedTable(h.tableMeta.identifier)
+ case SubqueryAlias(_, r: DataSourceV2Relation) =>
+ r.catalog.get.asTableCatalog.invalidateTable(r.identifier.get)
+ case SubqueryAlias(_, v: View) if v.isTempView =>
+ case _ =>
+ throw QueryCompilationErrors.unexpectedTypeOfRelationError(relation, tableName)
}
-
// Re-caches the logical plan of the relation.
// Note this is a no-op for the relation itself if it's not cached, but will clear all
// caches referencing this relation. If this relation is cached as an InMemoryRelation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 13f6965a8e8..07c21fff712 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
import java.io.File
+import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, DataFrame}
@@ -795,4 +796,24 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(spark.catalog.currentCatalog().equals("spark_catalog"))
assert(spark.catalog.listCatalogs().collect().map(c => c.name).toSet == Set("testcat"))
}
+
+ test("SPARK-39583: Make RefreshTable be compatible with 3 layer namespace") {
+ withTempDir { dir =>
+ val tableName = "spark_catalog.default.my_table"
+
+ sql(s"""
+ | CREATE TABLE ${tableName}(col STRING) USING TEXT
+ | LOCATION '${dir.getAbsolutePath}'
+ |""".stripMargin)
+ sql(s"""INSERT INTO ${tableName} SELECT 'abc'""".stripMargin)
+ spark.catalog.cacheTable(tableName)
+ assert(spark.table(tableName).collect().length == 1)
+
+ FileUtils.deleteDirectory(dir)
+ assert(spark.table(tableName).collect().length == 1)
+
+ spark.catalog.refreshTable(tableName)
+ assert(spark.table(tableName).collect().length == 0)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org