You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/15 06:19:05 UTC

[spark] branch branch-3.1 updated: [SPARK-33653][SQL][3.1] DSv2: REFRESH TABLE should recache the table itself

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5715322  [SPARK-33653][SQL][3.1] DSv2: REFRESH TABLE should recache the table itself
5715322 is described below

commit 5715322de3a44ad5d12adc663722168056b69957
Author: Chao Sun <su...@apple.com>
AuthorDate: Mon Dec 14 22:11:10 2020 -0800

    [SPARK-33653][SQL][3.1] DSv2: REFRESH TABLE should recache the table itself
    
    This is a backport of #30742 for branch-3.1
    
    ### What changes were proposed in this pull request?
    
    This changes DSv2 refresh table semantics to also recache the target table itself.
    
    ### Why are the changes needed?
    
    Currently "REFRESH TABLE" in DSv2 only invalidate all caches referencing the table. With #30403 merged which adds support for caching a DSv2 table, we should also recache the target table itself to make the behavior consistent with DSv1.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now refreshing table in DSv2 also recache the target table itself.
    ### How was this patch tested?
    
    Added coverage of this new behavior in the existing UT for v2 refresh table command.
    
    Closes #30769 from sunchao/SPARK-33653-branch-3.1.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../datasources/v2/DataSourceV2Strategy.scala         | 16 +++++++++++++---
 .../execution/datasources/v2/RefreshTableExec.scala   |  1 -
 .../spark/sql/connector/DataSourceV2SQLSuite.scala    | 19 +++++++++++++++++++
 3 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 5289d35..97dab4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
+import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -56,9 +56,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
     session.sharedState.cacheManager.recacheByPlan(session, r)
   }
 
-  private def invalidateCache(r: ResolvedTable)(): Unit = {
+  private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = false)(): Unit = {
     val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
+    val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
     session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
+    if (recacheTable && cache.isDefined) {
+      // save the cache name and cache level for recreation
+      val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
+      val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
+
+      // recache with the same name and cache level.
+      val ds = Dataset.ofRows(session, v2Relation)
+      session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
+    }
   }
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -137,7 +147,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }
 
     case RefreshTable(r: ResolvedTable) =>
-      RefreshTableExec(r.catalog, r.identifier, invalidateCache(r)) :: Nil
+      RefreshTableExec(r.catalog, r.identifier, invalidateCache(r, recacheTable = true)) :: Nil
 
     case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
       val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
index 994583c..e66f0a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
@@ -29,7 +29,6 @@ case class RefreshTableExec(
     catalog.invalidateTable(ident)
 
     // invalidate all caches referencing the given table
-    // TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table
     invalidateCache()
 
     Seq.empty
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6838a76..a166c5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1748,6 +1748,25 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-33653: REFRESH TABLE should recache the target table itself") {
+    val tblName = "testcat.ns.t"
+    withTable(tblName) {
+      sql(s"CREATE TABLE $tblName (id bigint) USING foo")
+
+      // if the table is not cached, refreshing it should not recache it
+      assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isEmpty)
+      sql(s"REFRESH TABLE $tblName")
+      assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isEmpty)
+
+      sql(s"CACHE TABLE $tblName")
+
+      // after caching & refreshing the table should be recached
+      assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isDefined)
+      sql(s"REFRESH TABLE $tblName")
+      assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isDefined)
+    }
+  }
+
   test("REPLACE TABLE: v1 table") {
     val e = intercept[AnalysisException] {
       sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org