You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/25 23:31:02 UTC
[spark] branch branch-2.4 updated: [SPARK-33228][SQL] Don't uncache
data when replacing a view having the same logical plan
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new a85d690 [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan
a85d690 is described below
commit a85d69049c5ad994b3fb6cde1b3d739f3da336d4
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sun Oct 25 16:15:55 2020 -0700
[SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan
### What changes were proposed in this pull request?
SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache when replacing an existing view. But, this change drops cache even when replacing a view having the same logical plan. A sequence of queries to reproduce this as follows;
```
// Spark v2.4.6+
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
// If one re-runs the same query `df.createOrReplaceTempView("t")`, the cache's swept away
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
// Until v2.4.6
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
```
### Why are the changes needed?
bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests.
Closes #30140 from maropu/FixBugInReplaceView.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 87b498462b82fce02dd50286887092cf7858d2e8)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/sql/execution/command/views.scala | 10 +++++----
.../org/apache/spark/sql/CachedTableSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 5d9f2c3..9cb4003 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -143,17 +143,19 @@ case class CreateViewCommand(
val catalog = sparkSession.sessionState.catalog
if (viewType == LocalTempView) {
- if (replace && catalog.getTempView(name.table).isDefined) {
- logDebug(s"Try to uncache ${name.quotedString} before replacing.")
+ if (replace && catalog.getTempView(name.table).isDefined &&
+ !catalog.getTempView(name.table).get.sameResult(child)) {
+ logInfo(s"Try to uncache ${name.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
- if (replace && catalog.getGlobalTempView(name.table).isDefined) {
+ if (replace && catalog.getGlobalTempView(name.table).isDefined &&
+ !catalog.getGlobalTempView(name.table).get.sameResult(child)) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val globalTempView = TableIdentifier(name.table, Option(db))
- logDebug(s"Try to uncache ${globalTempView.quotedString} before replacing.")
+ logInfo(s"Try to uncache ${globalTempView.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(sparkSession, globalTempView.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 5c8c857..28c0fa4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -907,4 +907,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(spark.sharedState.cacheManager.isEmpty)
}
}
+
+ test("SPARK-33228: Don't uncache data when replacing an existing view having the same plan") {
+ withTempView("tempView") {
+ spark.catalog.clearCache()
+ val df = spark.range(1).selectExpr("id a", "id b")
+ df.cache()
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceTempView("tempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceTempView("tempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ }
+
+ withTempView("tempGlobalTempView") {
+ spark.catalog.clearCache()
+ val df = spark.range(1).selectExpr("id a", "id b")
+ df.cache()
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceGlobalTempView("tempGlobalTempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceGlobalTempView("tempGlobalTempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org