You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/03/27 12:02:16 UTC
[spark] branch master updated: [SPARK-26771][SQL][FOLLOWUP] Make
all the uncache operations non-blocking by default
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 956b52b1 [SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking by default
956b52b1 is described below
commit 956b52b1670985a67e49b938ac1499ae65c79f6e
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Wed Mar 27 21:01:36 2019 +0900
[SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking by default
## What changes were proposed in this pull request?
To make the blocking behaviour consistent, this pr made catalog table/view `uncacheQuery` non-blocking by default. If this pr merged, all the behaviours in spark are non-blocking by default.
## How was this patch tested?
Pass Jenkins.
Closes #24212 from maropu/SPARK-26771-FOLLOWUP.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 3 +-
.../apache/spark/sql/execution/CacheManager.scala | 8 +--
.../sql/execution/columnar/InMemoryRelation.scala | 2 +-
.../apache/spark/sql/internal/CatalogImpl.scala | 6 +-
.../org/apache/spark/sql/CachedTableSuite.scala | 69 +++++++++++++---------
.../apache/spark/sql/hive/CachedTableSuite.scala | 15 ++++-
6 files changed, 62 insertions(+), 41 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2accb32..69c2f61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2956,7 +2956,8 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def unpersist(blocking: Boolean): this.type = {
- sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = false, blocking)
+ sparkSession.sharedState.cacheManager.uncacheQuery(
+ sparkSession, logicalPlan, cascade = false, blocking)
this
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 0145478..d1f096b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -98,13 +98,11 @@ class CacheManager extends Logging {
* @param query The [[Dataset]] to be un-cached.
* @param cascade If true, un-cache all the cache entries that refer to the given
* [[Dataset]]; otherwise un-cache the given [[Dataset]] only.
- * @param blocking Whether to block until all blocks are deleted.
*/
def uncacheQuery(
query: Dataset[_],
- cascade: Boolean,
- blocking: Boolean = true): Unit = {
- uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
+ cascade: Boolean): Unit = {
+ uncacheQuery(query.sparkSession, query.logicalPlan, cascade)
}
/**
@@ -119,7 +117,7 @@ class CacheManager extends Logging {
spark: SparkSession,
plan: LogicalPlan,
cascade: Boolean,
- blocking: Boolean): Unit = {
+ blocking: Boolean = false): Unit = {
val shouldRemove: LogicalPlan => Boolean =
if (cascade) {
_.find(_.sameResult(plan)).isDefined
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 1e4453f..1af5033 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -65,7 +65,7 @@ case class CachedRDDBuilder(
_cachedColumnBuffers
}
- def clearCache(blocking: Boolean = true): Unit = {
+ def clearCache(blocking: Boolean = false): Unit = {
if (_cachedColumnBuffers != null) {
synchronized {
if (_cachedColumnBuffers != null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 4698e8a..5e7d17b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -365,7 +365,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
override def dropTempView(viewName: String): Boolean = {
sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef =>
sparkSession.sharedState.cacheManager.uncacheQuery(
- sparkSession, viewDef, cascade = false, blocking = true)
+ sparkSession, viewDef, cascade = false)
sessionCatalog.dropTempView(viewName)
}
}
@@ -381,7 +381,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
override def dropGlobalTempView(viewName: String): Boolean = {
sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
sparkSession.sharedState.cacheManager.uncacheQuery(
- sparkSession, viewDef, cascade = false, blocking = true)
+ sparkSession, viewDef, cascade = false)
sessionCatalog.dropGlobalTempView(viewName)
}
}
@@ -494,7 +494,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
// cached version and make the new version cached lazily.
if (isCached(table)) {
// Uncache the logicalPlan.
- sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, blocking = true)
+ sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
// Cache it again.
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 47e745f..4d63390 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -93,13 +93,24 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}.sum
}
+ // Blocking uncache table for tests
+ private def uncacheTable(tableName: String): Unit = {
+ val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val cascade = !spark.sessionState.catalog.isTemporaryTable(tableIdent)
+ spark.sharedState.cacheManager.uncacheQuery(
+ spark,
+ spark.table(tableName).logicalPlan,
+ cascade = cascade,
+ blocking = true)
+ }
+
test("cache temp table") {
withTempView("tempTable") {
testData.select('key).createOrReplaceTempView("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
spark.catalog.cacheTable("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
- spark.catalog.uncacheTable("tempTable")
+ uncacheTable("tempTable")
}
}
@@ -121,7 +132,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
withTempView("tempTable") {
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
- spark.catalog.uncacheTable("tempTable")
+ uncacheTable("tempTable")
}
}
@@ -134,7 +145,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assertCached(sql("SELECT COUNT(*) FROM tempTable2"))
// Is this valid?
- spark.catalog.uncacheTable("tempTable2")
+ uncacheTable("tempTable2")
// Should this be cached?
assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
@@ -171,7 +182,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
case _ => false
})
- spark.catalog.uncacheTable("testData")
+ uncacheTable("testData")
assert(!spark.catalog.isCached("testData"))
assert(spark.table("testData").queryExecution.withCachedData match {
case _: InMemoryRelation => false
@@ -196,7 +207,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}.size
}
- spark.catalog.uncacheTable("testData")
+ uncacheTable("testData")
}
test("read from cached table and uncache") {
@@ -204,7 +215,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(spark.table("testData"), testData.collect().toSeq)
assertCached(spark.table("testData"))
- spark.catalog.uncacheTable("testData")
+ uncacheTable("testData")
checkAnswer(spark.table("testData"), testData.collect().toSeq)
assertCached(spark.table("testData"), 0)
}
@@ -215,7 +226,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
sql("SELECT * FROM selectStar WHERE key = 1"),
Seq(Row(1, "1")))
- spark.catalog.uncacheTable("selectStar")
+ uncacheTable("selectStar")
}
test("Self-join cached") {
@@ -225,7 +236,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
unCachedAnswer.toSeq)
- spark.catalog.uncacheTable("testData")
+ uncacheTable("testData")
}
test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
@@ -255,7 +266,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")
- spark.catalog.uncacheTable("testCacheTable")
+ uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
@@ -272,7 +283,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")
- spark.catalog.uncacheTable("testCacheTable")
+ uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
@@ -293,7 +304,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
isMaterialized(rddId),
"Lazily cached in-memory table should have been materialized")
- spark.catalog.uncacheTable("testData")
+ uncacheTable("testData")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
@@ -430,8 +441,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
spark.sparkContext.cleaner.get.attachListener(cleanerListener)
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
System.gc()
@@ -478,7 +489,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
sql("SELECT key, count(*) FROM orderedTable GROUP BY key ORDER BY key"),
sql("SELECT key, count(*) FROM testData3x GROUP BY key ORDER BY key").collect())
- spark.catalog.uncacheTable("orderedTable")
+ uncacheTable("orderedTable")
spark.catalog.dropTempView("orderedTable")
// Set up two tables distributed in the same way. Try this with the data distributed into
@@ -500,8 +511,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(sql("SELECT count(*) FROM t1 GROUP BY key"),
sql("SELECT count(*) FROM testData GROUP BY key"))
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
}
}
@@ -518,8 +529,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
query,
testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
}
// One side of join is not partitioned in the desired way. Need to shuffle one side.
@@ -535,8 +546,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
query,
testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
}
withTempView("t1", "t2") {
@@ -551,8 +562,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
query,
testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
}
// One side of join is not partitioned in the desired way. Since the number of partitions of
@@ -569,8 +580,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
query,
testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
}
// repartition's column ordering is different from group by column ordering.
@@ -584,7 +595,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
query,
testData.distinct().select($"value", $"key"))
- spark.catalog.uncacheTable("t1")
+ uncacheTable("t1")
}
// repartition's column ordering is different from join condition's column ordering.
@@ -606,8 +617,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
checkAnswer(
query,
df1.join(df2, $"key" === $"a" && $"value" === $"b").select($"key", $"value", $"a", $"b"))
- spark.catalog.uncacheTable("t1")
- spark.catalog.uncacheTable("t2")
+ uncacheTable("t1")
+ uncacheTable("t2")
}
}
@@ -620,7 +631,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
selectStar,
Seq(Row(1, "1")))
- spark.catalog.uncacheTable("selectStar")
+ uncacheTable("selectStar")
checkAnswer(
selectStar,
Seq(Row(1, "1")))
@@ -698,7 +709,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
Utils.deleteRecursively(path)
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
- spark.catalog.uncacheTable("t")
+ uncacheTable("t")
assert(spark.table("t").select($"i").count() == 0)
assert(getNumInMemoryRelations(spark.table("t").select($"i")) == 0)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 9483a9c..3eca632 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -49,6 +49,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
maybeBlock.nonEmpty
}
+ // Blocking uncache table for tests
+ private def uncacheTable(tableName: String): Unit = {
+ val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val cascade = !spark.sessionState.catalog.isTemporaryTable(tableIdent)
+ spark.sharedState.cacheManager.uncacheQuery(
+ spark,
+ spark.table(tableName).logicalPlan,
+ cascade = cascade,
+ blocking = true)
+ }
+
test("cache table") {
val preCacheResults = sql("SELECT * FROM src").collect().toSeq
@@ -106,7 +117,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
var e = intercept[AnalysisException](spark.table("nonexistentTable")).getMessage
assert(e.contains(expectedErrorMsg))
e = intercept[AnalysisException] {
- spark.catalog.uncacheTable("nonexistentTable")
+ uncacheTable("nonexistentTable")
}.getMessage
assert(e.contains(expectedErrorMsg))
e = intercept[AnalysisException] {
@@ -121,7 +132,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
withTable(tableName) {
sql(s"CREATE TABLE $tableName(a INT)")
// no error will be reported in the following three ways to uncache a table.
- spark.catalog.uncacheTable(tableName)
+ uncacheTable(tableName)
sql("UNCACHE TABLE newTable")
sparkSession.table(tableName).unpersist(blocking = true)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org