You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/03/27 12:02:16 UTC

[spark] branch master updated: [SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking by default

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 956b52b1 [SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking by default
956b52b1 is described below

commit 956b52b1670985a67e49b938ac1499ae65c79f6e
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Wed Mar 27 21:01:36 2019 +0900

    [SPARK-26771][SQL][FOLLOWUP] Make all the uncache operations non-blocking by default
    
    ## What changes were proposed in this pull request?
    To make the blocking behaviour consistent, this pr made catalog table/view `uncacheQuery` non-blocking by default. If this pr merged, all the behaviours in spark are non-blocking by default.
    
    ## How was this patch tested?
    Pass Jenkins.
    
    Closes #24212 from maropu/SPARK-26771-FOLLOWUP.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  3 +-
 .../apache/spark/sql/execution/CacheManager.scala  |  8 +--
 .../sql/execution/columnar/InMemoryRelation.scala  |  2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala    |  6 +-
 .../org/apache/spark/sql/CachedTableSuite.scala    | 69 +++++++++++++---------
 .../apache/spark/sql/hive/CachedTableSuite.scala   | 15 ++++-
 6 files changed, 62 insertions(+), 41 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2accb32..69c2f61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2956,7 +2956,8 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def unpersist(blocking: Boolean): this.type = {
-    sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = false, blocking)
+    sparkSession.sharedState.cacheManager.uncacheQuery(
+      sparkSession, logicalPlan, cascade = false, blocking)
     this
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 0145478..d1f096b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -98,13 +98,11 @@ class CacheManager extends Logging {
    * @param query     The [[Dataset]] to be un-cached.
    * @param cascade   If true, un-cache all the cache entries that refer to the given
    *                  [[Dataset]]; otherwise un-cache the given [[Dataset]] only.
-   * @param blocking  Whether to block until all blocks are deleted.
    */
   def uncacheQuery(
       query: Dataset[_],
-      cascade: Boolean,
-      blocking: Boolean = true): Unit = {
-    uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
+      cascade: Boolean): Unit = {
+    uncacheQuery(query.sparkSession, query.logicalPlan, cascade)
   }
 
   /**
@@ -119,7 +117,7 @@ class CacheManager extends Logging {
       spark: SparkSession,
       plan: LogicalPlan,
       cascade: Boolean,
-      blocking: Boolean): Unit = {
+      blocking: Boolean = false): Unit = {
     val shouldRemove: LogicalPlan => Boolean =
       if (cascade) {
         _.find(_.sameResult(plan)).isDefined
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 1e4453f..1af5033 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -65,7 +65,7 @@ case class CachedRDDBuilder(
     _cachedColumnBuffers
   }
 
-  def clearCache(blocking: Boolean = true): Unit = {
+  def clearCache(blocking: Boolean = false): Unit = {
     if (_cachedColumnBuffers != null) {
       synchronized {
         if (_cachedColumnBuffers != null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 4698e8a..5e7d17b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -365,7 +365,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   override def dropTempView(viewName: String): Boolean = {
     sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef =>
       sparkSession.sharedState.cacheManager.uncacheQuery(
-        sparkSession, viewDef, cascade = false, blocking = true)
+        sparkSession, viewDef, cascade = false)
       sessionCatalog.dropTempView(viewName)
     }
   }
@@ -381,7 +381,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   override def dropGlobalTempView(viewName: String): Boolean = {
     sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
       sparkSession.sharedState.cacheManager.uncacheQuery(
-        sparkSession, viewDef, cascade = false, blocking = true)
+        sparkSession, viewDef, cascade = false)
       sessionCatalog.dropGlobalTempView(viewName)
     }
   }
@@ -494,7 +494,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     // cached version and make the new version cached lazily.
     if (isCached(table)) {
       // Uncache the logicalPlan.
-      sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, blocking = true)
+      sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
       // Cache it again.
       sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 47e745f..4d63390 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -93,13 +93,24 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     }.sum
   }
 
+  // Blocking uncache table for tests
+  private def uncacheTable(tableName: String): Unit = {
+    val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val cascade = !spark.sessionState.catalog.isTemporaryTable(tableIdent)
+    spark.sharedState.cacheManager.uncacheQuery(
+      spark,
+      spark.table(tableName).logicalPlan,
+      cascade = cascade,
+      blocking = true)
+  }
+
   test("cache temp table") {
     withTempView("tempTable") {
       testData.select('key).createOrReplaceTempView("tempTable")
       assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
       spark.catalog.cacheTable("tempTable")
       assertCached(sql("SELECT COUNT(*) FROM tempTable"))
-      spark.catalog.uncacheTable("tempTable")
+      uncacheTable("tempTable")
     }
   }
 
@@ -121,7 +132,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     withTempView("tempTable") {
       sql("CACHE TABLE tempTable AS SELECT key FROM testData")
       assertCached(sql("SELECT COUNT(*) FROM tempTable"))
-      spark.catalog.uncacheTable("tempTable")
+      uncacheTable("tempTable")
     }
   }
 
@@ -134,7 +145,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     assertCached(sql("SELECT COUNT(*) FROM tempTable2"))
 
     // Is this valid?
-    spark.catalog.uncacheTable("tempTable2")
+    uncacheTable("tempTable2")
 
     // Should this be cached?
     assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
@@ -171,7 +182,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       case _ => false
     })
 
-    spark.catalog.uncacheTable("testData")
+    uncacheTable("testData")
     assert(!spark.catalog.isCached("testData"))
     assert(spark.table("testData").queryExecution.withCachedData match {
       case _: InMemoryRelation => false
@@ -196,7 +207,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       }.size
     }
 
-    spark.catalog.uncacheTable("testData")
+    uncacheTable("testData")
   }
 
   test("read from cached table and uncache") {
@@ -204,7 +215,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     checkAnswer(spark.table("testData"), testData.collect().toSeq)
     assertCached(spark.table("testData"))
 
-    spark.catalog.uncacheTable("testData")
+    uncacheTable("testData")
     checkAnswer(spark.table("testData"), testData.collect().toSeq)
     assertCached(spark.table("testData"), 0)
   }
@@ -215,7 +226,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     checkAnswer(
       sql("SELECT * FROM selectStar WHERE key = 1"),
       Seq(Row(1, "1")))
-    spark.catalog.uncacheTable("selectStar")
+    uncacheTable("selectStar")
   }
 
   test("Self-join cached") {
@@ -225,7 +236,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     checkAnswer(
       sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
       unCachedAnswer.toSeq)
-    spark.catalog.uncacheTable("testData")
+    uncacheTable("testData")
   }
 
   test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
@@ -255,7 +266,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         isMaterialized(rddId),
         "Eagerly cached in-memory table should have already been materialized")
 
-      spark.catalog.uncacheTable("testCacheTable")
+      uncacheTable("testCacheTable")
       eventually(timeout(10 seconds)) {
         assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
       }
@@ -272,7 +283,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         isMaterialized(rddId),
         "Eagerly cached in-memory table should have already been materialized")
 
-      spark.catalog.uncacheTable("testCacheTable")
+      uncacheTable("testCacheTable")
       eventually(timeout(10 seconds)) {
         assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
       }
@@ -293,7 +304,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       isMaterialized(rddId),
       "Lazily cached in-memory table should have been materialized")
 
-    spark.catalog.uncacheTable("testData")
+    uncacheTable("testData")
     eventually(timeout(10 seconds)) {
       assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
     }
@@ -430,8 +441,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     }
     spark.sparkContext.cleaner.get.attachListener(cleanerListener)
 
-    spark.catalog.uncacheTable("t1")
-    spark.catalog.uncacheTable("t2")
+    uncacheTable("t1")
+    uncacheTable("t2")
 
     System.gc()
 
@@ -478,7 +489,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     checkAnswer(
       sql("SELECT key, count(*) FROM orderedTable GROUP BY key ORDER BY key"),
       sql("SELECT key, count(*) FROM testData3x GROUP BY key ORDER BY key").collect())
-    spark.catalog.uncacheTable("orderedTable")
+    uncacheTable("orderedTable")
     spark.catalog.dropTempView("orderedTable")
 
     // Set up two tables distributed in the same way. Try this with the data distributed into
@@ -500,8 +511,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         checkAnswer(sql("SELECT count(*) FROM t1 GROUP BY key"),
           sql("SELECT count(*) FROM testData GROUP BY key"))
 
-        spark.catalog.uncacheTable("t1")
-        spark.catalog.uncacheTable("t2")
+        uncacheTable("t1")
+        uncacheTable("t2")
       }
     }
 
@@ -518,8 +529,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       checkAnswer(
         query,
         testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
-      spark.catalog.uncacheTable("t1")
-      spark.catalog.uncacheTable("t2")
+      uncacheTable("t1")
+      uncacheTable("t2")
     }
 
     // One side of join is not partitioned in the desired way. Need to shuffle one side.
@@ -535,8 +546,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       checkAnswer(
         query,
         testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
-      spark.catalog.uncacheTable("t1")
-      spark.catalog.uncacheTable("t2")
+      uncacheTable("t1")
+      uncacheTable("t2")
     }
 
     withTempView("t1", "t2") {
@@ -551,8 +562,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       checkAnswer(
         query,
         testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
-      spark.catalog.uncacheTable("t1")
-      spark.catalog.uncacheTable("t2")
+      uncacheTable("t1")
+      uncacheTable("t2")
     }
 
     // One side of join is not partitioned in the desired way. Since the number of partitions of
@@ -569,8 +580,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       checkAnswer(
         query,
         testData.join(testData2, $"key" === $"a").select($"key", $"value", $"a", $"b"))
-      spark.catalog.uncacheTable("t1")
-      spark.catalog.uncacheTable("t2")
+      uncacheTable("t1")
+      uncacheTable("t2")
     }
 
     // repartition's column ordering is different from group by column ordering.
@@ -584,7 +595,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       checkAnswer(
         query,
         testData.distinct().select($"value", $"key"))
-      spark.catalog.uncacheTable("t1")
+      uncacheTable("t1")
     }
 
     // repartition's column ordering is different from join condition's column ordering.
@@ -606,8 +617,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       checkAnswer(
         query,
         df1.join(df2, $"key" === $"a" && $"value" === $"b").select($"key", $"value", $"a", $"b"))
-      spark.catalog.uncacheTable("t1")
-      spark.catalog.uncacheTable("t2")
+      uncacheTable("t1")
+      uncacheTable("t2")
     }
   }
 
@@ -620,7 +631,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       selectStar,
       Seq(Row(1, "1")))
 
-    spark.catalog.uncacheTable("selectStar")
+    uncacheTable("selectStar")
     checkAnswer(
       selectStar,
       Seq(Row(1, "1")))
@@ -698,7 +709,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
 
         Utils.deleteRecursively(path)
         spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
-        spark.catalog.uncacheTable("t")
+        uncacheTable("t")
         assert(spark.table("t").select($"i").count() == 0)
         assert(getNumInMemoryRelations(spark.table("t").select($"i")) == 0)
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 9483a9c..3eca632 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -49,6 +49,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     maybeBlock.nonEmpty
   }
 
+  // Blocking uncache table for tests
+  private def uncacheTable(tableName: String): Unit = {
+    val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val cascade = !spark.sessionState.catalog.isTemporaryTable(tableIdent)
+    spark.sharedState.cacheManager.uncacheQuery(
+      spark,
+      spark.table(tableName).logicalPlan,
+      cascade = cascade,
+      blocking = true)
+  }
+
   test("cache table") {
     val preCacheResults = sql("SELECT * FROM src").collect().toSeq
 
@@ -106,7 +117,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     var e = intercept[AnalysisException](spark.table("nonexistentTable")).getMessage
     assert(e.contains(expectedErrorMsg))
     e = intercept[AnalysisException] {
-      spark.catalog.uncacheTable("nonexistentTable")
+      uncacheTable("nonexistentTable")
     }.getMessage
     assert(e.contains(expectedErrorMsg))
     e = intercept[AnalysisException] {
@@ -121,7 +132,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     withTable(tableName) {
       sql(s"CREATE TABLE $tableName(a INT)")
       // no error will be reported in the following three ways to uncache a table.
-      spark.catalog.uncacheTable(tableName)
+      uncacheTable(tableName)
       sql("UNCACHE TABLE newTable")
       sparkSession.table(tableName).unpersist(blocking = true)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org