You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/10 03:45:36 UTC
[spark] branch branch-3.1 updated: [SPARK-34039][SQL][3.1]
ReplaceTable should invalidate cache
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5ed725c [SPARK-34039][SQL][3.1] ReplaceTable should invalidate cache
5ed725c is described below
commit 5ed725cbd11856cc0641d5c7b536495952768b93
Author: Chao Sun <su...@apple.com>
AuthorDate: Sun Jan 10 12:45:01 2021 +0900
[SPARK-34039][SQL][3.1] ReplaceTable should invalidate cache
### What changes were proposed in this pull request?
This is a backport of #31081 to branch-3.1.
This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec.
### Why are the changes needed?
Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results.
### Does this PR introduce _any_ user-facing change?
Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.
### How was this patch tested?
Added a new unit test.
Closes #31100 from sunchao/SPARK-34039-branch-3.1.
Authored-by: Chao Sun <su...@apple.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../datasources/v2/DataSourceV2Strategy.scala | 21 ++++++++++++++-------
.../datasources/v2/ReplaceTableExec.scala | 14 +++++++++++---
.../datasources/v2/WriteToDataSourceV2Exec.scala | 22 ++++++----------------
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 17 +++++++++++++++++
4 files changed, 48 insertions(+), 26 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 34bc80c..dffe0a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartit
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, Table, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@@ -78,6 +78,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
}
+ private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
+ val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
+ session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
+ }
+
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters,
relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
@@ -161,10 +166,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(
- staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
+ staging, ident, schema, parts, propsWithOwner, orCreate = orCreate,
+ invalidateCache) :: Nil
case _ =>
ReplaceTableExec(
- catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
+ catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate,
+ invalidateCache) :: Nil
}
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
@@ -173,7 +180,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
- session,
staging,
ident,
parts,
@@ -181,10 +187,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
planLater(query),
propsWithOwner,
writeOptions,
- orCreate = orCreate) :: Nil
+ orCreate = orCreate,
+ invalidateCache) :: Nil
case _ =>
ReplaceTableAsSelectExec(
- session,
catalog,
ident,
parts,
@@ -192,7 +198,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
planLater(query),
propsWithOwner,
writeOptions,
- orCreate = orCreate) :: Nil
+ orCreate = orCreate,
+ invalidateCache) :: Nil
}
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index 1f3bcf2..10c09f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -33,10 +33,13 @@ case class ReplaceTableExec(
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
- orCreate: Boolean) extends V2CommandExec {
+ orCreate: Boolean,
+ invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
+ val table = catalog.loadTable(ident)
+ invalidateCache(catalog, table, ident)
catalog.dropTable(ident)
} else if (!orCreate) {
throw new CannotReplaceMissingTableException(ident)
@@ -54,9 +57,14 @@ case class AtomicReplaceTableExec(
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
- orCreate: Boolean) extends V2CommandExec {
+ orCreate: Boolean,
+ invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
+ if (catalog.tableExists(identifier)) {
+ val table = catalog.loadTable(identifier)
+ invalidateCache(catalog, table, identifier)
+ }
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index b0aff4a..a41b048 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -131,7 +130,6 @@ case class AtomicCreateTableAsSelectExec(
* ReplaceTableAsSelectStagingExec.
*/
case class ReplaceTableAsSelectExec(
- session: SparkSession,
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
@@ -139,7 +137,8 @@ case class ReplaceTableAsSelectExec(
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
- orCreate: Boolean) extends TableWriteExecHelper {
+ orCreate: Boolean,
+ invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
// Note that this operation is potentially unsafe, but these are the strict semantics of
@@ -152,7 +151,7 @@ case class ReplaceTableAsSelectExec(
// 3. The table returned by catalog.createTable doesn't support writing.
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
- uncacheTable(session, catalog, table, ident)
+ invalidateCache(catalog, table, ident)
catalog.dropTable(ident)
} else if (!orCreate) {
throw new CannotReplaceMissingTableException(ident)
@@ -177,7 +176,6 @@ case class ReplaceTableAsSelectExec(
* is left untouched.
*/
case class AtomicReplaceTableAsSelectExec(
- session: SparkSession,
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
@@ -185,13 +183,14 @@ case class AtomicReplaceTableAsSelectExec(
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
- orCreate: Boolean) extends TableWriteExecHelper {
+ orCreate: Boolean,
+ invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
- uncacheTable(session, catalog, table, ident)
+ invalidateCache(catalog, table, ident)
}
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
@@ -393,15 +392,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
Nil
}
-
- protected def uncacheTable(
- session: SparkSession,
- catalog: TableCatalog,
- table: Table,
- ident: Identifier): Unit = {
- val plan = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
- session.sharedState.cacheManager.uncacheQuery(session, plan, cascade = true)
- }
}
object DataWritingSparkTask extends Logging {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 92edd2b..f0f6e7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -785,6 +785,23 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-34039: ReplaceTable (atomic or non-atomic) should invalidate cache") {
+ Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
+ val view = "view"
+ withTable(t) {
+ withTempView(view) {
+ sql(s"CREATE TABLE $t USING foo AS SELECT id, data FROM source")
+ sql(s"CACHE TABLE $view AS SELECT id FROM $t")
+ checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
+ checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
+
+ sql(s"REPLACE TABLE $t (a bigint) USING foo")
+ assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
+ }
+ }
+ }
+ }
+
test("SPARK-33492: ReplaceTableAsSelect (atomic or non-atomic) should invalidate cache") {
Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
val view = "view"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org