You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/15 06:19:05 UTC
[spark] branch branch-3.1 updated: [SPARK-33653][SQL][3.1] DSv2:
REFRESH TABLE should recache the table itself
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5715322 [SPARK-33653][SQL][3.1] DSv2: REFRESH TABLE should recache the table itself
5715322 is described below
commit 5715322de3a44ad5d12adc663722168056b69957
Author: Chao Sun <su...@apple.com>
AuthorDate: Mon Dec 14 22:11:10 2020 -0800
[SPARK-33653][SQL][3.1] DSv2: REFRESH TABLE should recache the table itself
This is a backport of #30742 for branch-3.1
### What changes were proposed in this pull request?
This changes DSv2 refresh table semantics to also recache the target table itself.
### Why are the changes needed?
Currently "REFRESH TABLE" in DSv2 only invalidate all caches referencing the table. With #30403 merged which adds support for caching a DSv2 table, we should also recache the target table itself to make the behavior consistent with DSv1.
### Does this PR introduce _any_ user-facing change?
Yes, now refreshing table in DSv2 also recache the target table itself.
### How was this patch tested?
Added coverage of this new behavior in the existing UT for v2 refresh table command.
Closes #30769 from sunchao/SPARK-33653-branch-3.1.
Authored-by: Chao Sun <su...@apple.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../datasources/v2/DataSourceV2Strategy.scala | 16 +++++++++++++---
.../execution/datasources/v2/RefreshTableExec.scala | 1 -
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 19 +++++++++++++++++++
3 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 5289d35..97dab4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
+import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -56,9 +56,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
session.sharedState.cacheManager.recacheByPlan(session, r)
}
- private def invalidateCache(r: ResolvedTable)(): Unit = {
+ private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = false)(): Unit = {
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
+ val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
+ if (recacheTable && cache.isDefined) {
+ // save the cache name and cache level for recreation
+ val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
+ val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
+
+ // recache with the same name and cache level.
+ val ds = Dataset.ofRows(session, v2Relation)
+ session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
+ }
}
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -137,7 +147,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
case RefreshTable(r: ResolvedTable) =>
- RefreshTableExec(r.catalog, r.identifier, invalidateCache(r)) :: Nil
+ RefreshTableExec(r.catalog, r.identifier, invalidateCache(r, recacheTable = true)) :: Nil
case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
index 994583c..e66f0a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
@@ -29,7 +29,6 @@ case class RefreshTableExec(
catalog.invalidateTable(ident)
// invalidate all caches referencing the given table
- // TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table
invalidateCache()
Seq.empty
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6838a76..a166c5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1748,6 +1748,25 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-33653: REFRESH TABLE should recache the target table itself") {
+ val tblName = "testcat.ns.t"
+ withTable(tblName) {
+ sql(s"CREATE TABLE $tblName (id bigint) USING foo")
+
+ // if the table is not cached, refreshing it should not recache it
+ assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isEmpty)
+ sql(s"REFRESH TABLE $tblName")
+ assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isEmpty)
+
+ sql(s"CACHE TABLE $tblName")
+
+ // after caching & refreshing the table should be recached
+ assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isDefined)
+ sql(s"REFRESH TABLE $tblName")
+ assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isDefined)
+ }
+ }
+
test("REPLACE TABLE: v1 table") {
val e = intercept[AnalysisException] {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org