You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/30 14:10:40 UTC

[spark] branch master updated: [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c02823b49a [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace
8c02823b49a is described below

commit 8c02823b49a6f28005236e4965a25e664d73ebea
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Jun 30 22:10:18 2022 +0800

    [SPARK-39583][SQL] Make RefreshTable be compatible with 3 layer namespace
    
    ### What changes were proposed in this pull request?
    
    Make RefreshTable be compatible with 3 layer namespace
    
    ### Why are the changes needed?
    
    This is a part of the effort to make Catalog API support 3l namespace
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The API will support 3l namespace but maintain backwards compatibility
    
    ### How was this patch tested?
    
    UT
    
    Closes #36983 from amaliujia/refreshtable.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/internal/CatalogImpl.scala    | 23 +++++++++++++++-------
 .../apache/spark/sql/internal/CatalogSuite.scala   | 21 ++++++++++++++++++++
 2 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 97226736691..98220f3b229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
 
@@ -700,17 +701,25 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   override def refreshTable(tableName: String): Unit = {
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-    val relation = sparkSession.table(tableIdent).queryExecution.analyzed
+    val relation = sparkSession.table(tableName).queryExecution.analyzed
 
     relation.refresh()
 
     // Temporary and global temporary views are not supposed to be put into the relation cache
-    // since they are tracked separately.
-    if (!sessionCatalog.isTempView(tableIdent)) {
-      sessionCatalog.invalidateCachedTable(tableIdent)
+    // since they are tracked separately. V1 and V2 plans are cache invalidated accordingly.
+    relation match {
+      case SubqueryAlias(_, v: View) if !v.isTempView =>
+        sessionCatalog.invalidateCachedTable(v.desc.identifier)
+      case SubqueryAlias(_, r: LogicalRelation) =>
+        sessionCatalog.invalidateCachedTable(r.catalogTable.get.identifier)
+      case SubqueryAlias(_, h: HiveTableRelation) =>
+        sessionCatalog.invalidateCachedTable(h.tableMeta.identifier)
+      case SubqueryAlias(_, r: DataSourceV2Relation) =>
+        r.catalog.get.asTableCatalog.invalidateTable(r.identifier.get)
+      case SubqueryAlias(_, v: View) if v.isTempView =>
+      case _ =>
+        throw QueryCompilationErrors.unexpectedTypeOfRelationError(relation, tableName)
     }
-
     // Re-caches the logical plan of the relation.
     // Note this is a no-op for the relation itself if it's not cached, but will clear all
     // caches referencing this relation. If this relation is cached as an InMemoryRelation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 13f6965a8e8..07c21fff712 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
 
 import java.io.File
 
+import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.{AnalysisException, DataFrame}
@@ -795,4 +796,24 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(spark.catalog.currentCatalog().equals("spark_catalog"))
     assert(spark.catalog.listCatalogs().collect().map(c => c.name).toSet == Set("testcat"))
   }
+
+  test("SPARK-39583: Make RefreshTable be compatible with 3 layer namespace") {
+    withTempDir { dir =>
+      val tableName = "spark_catalog.default.my_table"
+
+      sql(s"""
+           | CREATE TABLE ${tableName}(col STRING) USING TEXT
+           | LOCATION '${dir.getAbsolutePath}'
+           |""".stripMargin)
+      sql(s"""INSERT INTO ${tableName} SELECT 'abc'""".stripMargin)
+      spark.catalog.cacheTable(tableName)
+      assert(spark.table(tableName).collect().length == 1)
+
+      FileUtils.deleteDirectory(dir)
+      assert(spark.table(tableName).collect().length == 1)
+
+      spark.catalog.refreshTable(tableName)
+      assert(spark.table(tableName).collect().length == 0)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org