You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/02/20 09:20:15 UTC

spark git commit: [SPARK-5909][SQL] Add a clearCache command to Spark SQL's cache manager

Repository: spark
Updated Branches:
  refs/heads/master 3be92cdac -> 70bfb5c72


[SPARK-5909][SQL] Add a clearCache command to Spark SQL's cache manager

JIRA: https://issues.apache.org/jira/browse/SPARK-5909

Author: Yin Huai <yh...@databricks.com>

Closes #4694 from yhuai/clearCache and squashes the following commits:

397ecc4 [Yin Huai] Address comments.
a2702fc [Yin Huai] Update parser.
3a54506 [Yin Huai] add isEmpty to CacheManager.
6d14460 [Yin Huai] Python clearCache.
f7b8dbd [Yin Huai] Add clear cache command.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70bfb5c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70bfb5c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70bfb5c7

Branch: refs/heads/master
Commit: 70bfb5c7282df84e76eba01f59bf1b8551583c33
Parents: 3be92cd
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Feb 20 16:20:02 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri Feb 20 16:20:02 2015 +0800

----------------------------------------------------------------------
 python/pyspark/sql/context.py                       |  4 ++++
 .../scala/org/apache/spark/sql/CacheManager.scala   |  6 ++++++
 .../scala/org/apache/spark/sql/SQLContext.scala     |  5 +++++
 .../scala/org/apache/spark/sql/SparkSQLParser.scala | 11 +++++++----
 .../org/apache/spark/sql/execution/commands.scala   | 15 +++++++++++++++
 .../org/apache/spark/sql/CachedTableSuite.scala     | 16 ++++++++++++++++
 6 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 2e2309f..3f168f7 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -687,6 +687,10 @@ class SQLContext(object):
         """Removes the specified table from the in-memory cache."""
         self._ssql_ctx.uncacheTable(tableName)
 
+    def clearCache(self):
+        """Removes all cached tables from the in-memory cache. """
+        self._ssql_ctx.clearCache()
+
 
 class HiveContext(SQLContext):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index f1949aa..ca4a127 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -71,11 +71,17 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
     }
   }
 
+  /** Clears all cached tables. */
   private[sql] def clearCache(): Unit = writeLock {
     cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
     cachedData.clear()
   }
 
+  /** Checks if the cache is empty. */
+  private[sql] def isEmpty: Boolean = readLock {
+    cachedData.isEmpty
+  }
+
   /**
    * Caches the data produced by the logical representation of the given schema rdd.  Unlike
    * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing

http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a6cf3cd..4bdaa02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -217,6 +217,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
 
+  /**
+   * Removes all cached tables from the in-memory cache.
+   */
+  def clearCache(): Unit = cacheManager.clearCache()
+
   // scalastyle:off
   // Disable style checker so "implicits" object can start with lowercase i
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 00e19da..5921eaf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -23,7 +23,7 @@ import scala.util.parsing.combinator.RegexParsers
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{ShowTablesCommand, UncacheTableCommand, CacheTableCommand, SetCommand}
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.types.StringType
 
 
@@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
 
   protected val AS      = Keyword("AS")
   protected val CACHE   = Keyword("CACHE")
+  protected val CLEAR   = Keyword("CLEAR")
   protected val IN      = Keyword("IN")
   protected val LAZY    = Keyword("LAZY")
   protected val SET     = Keyword("SET")
@@ -74,9 +75,11 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
     }
 
   private lazy val uncache: Parser[LogicalPlan] =
-    UNCACHE ~ TABLE ~> ident ^^ {
-      case tableName => UncacheTableCommand(tableName)
-    }
+    ( UNCACHE ~ TABLE ~> ident ^^ {
+        case tableName => UncacheTableCommand(tableName)
+      }
+    | CLEAR ~ CACHE ^^^ ClearCacheCommand
+    )
 
   private lazy val set: Parser[LogicalPlan] =
     SET ~> restInput ^^ {

http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 7c92e9f..a112321 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -176,6 +176,21 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 
 /**
  * :: DeveloperApi ::
+ * Clear all cached data from the in-memory cache.
+ */
+@DeveloperApi
+case object ClearCacheCommand extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext) = {
+    sqlContext.clearCache()
+    Seq.empty[Row]
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * :: DeveloperApi ::
  */
 @DeveloperApi
 case class DescribeCommand(

http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e70e866..c240f2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -280,4 +280,20 @@ class CachedTableSuite extends QueryTest {
     assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
     assert(!isCached("t2"))
   }
+
+  test("Clear all cache") {
+    sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+    sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+    cacheTable("t1")
+    cacheTable("t2")
+    clearCache()
+    assert(cacheManager.isEmpty)
+
+    sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+    sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+    cacheTable("t1")
+    cacheTable("t2")
+    sql("Clear CACHE")
+    assert(cacheManager.isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org