You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/02/20 09:20:15 UTC
spark git commit: [SPARK-5909][SQL] Add a clearCache command to Spark
SQL's cache manager
Repository: spark
Updated Branches:
refs/heads/master 3be92cdac -> 70bfb5c72
[SPARK-5909][SQL] Add a clearCache command to Spark SQL's cache manager
JIRA: https://issues.apache.org/jira/browse/SPARK-5909
Author: Yin Huai <yh...@databricks.com>
Closes #4694 from yhuai/clearCache and squashes the following commits:
397ecc4 [Yin Huai] Address comments.
a2702fc [Yin Huai] Update parser.
3a54506 [Yin Huai] add isEmpty to CacheManager.
6d14460 [Yin Huai] Python clearCache.
f7b8dbd [Yin Huai] Add clear cache command.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70bfb5c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70bfb5c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70bfb5c7
Branch: refs/heads/master
Commit: 70bfb5c7282df84e76eba01f59bf1b8551583c33
Parents: 3be92cd
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Feb 20 16:20:02 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri Feb 20 16:20:02 2015 +0800
----------------------------------------------------------------------
python/pyspark/sql/context.py | 4 ++++
.../scala/org/apache/spark/sql/CacheManager.scala | 6 ++++++
.../scala/org/apache/spark/sql/SQLContext.scala | 5 +++++
.../scala/org/apache/spark/sql/SparkSQLParser.scala | 11 +++++++----
.../org/apache/spark/sql/execution/commands.scala | 15 +++++++++++++++
.../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++
6 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 2e2309f..3f168f7 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -687,6 +687,10 @@ class SQLContext(object):
"""Removes the specified table from the in-memory cache."""
self._ssql_ctx.uncacheTable(tableName)
+ def clearCache(self):
+ """Removes all cached tables from the in-memory cache. """
+ self._ssql_ctx.clearCache()
+
class HiveContext(SQLContext):
http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index f1949aa..ca4a127 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -71,11 +71,17 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
}
}
+ /** Clears all cached tables. */
private[sql] def clearCache(): Unit = writeLock {
cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
cachedData.clear()
}
+ /** Checks if the cache is empty. */
+ private[sql] def isEmpty: Boolean = readLock {
+ cachedData.isEmpty
+ }
+
/**
* Caches the data produced by the logical representation of the given schema rdd. Unlike
* `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a6cf3cd..4bdaa02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -217,6 +217,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
+ /**
+ * Removes all cached tables from the in-memory cache.
+ */
+ def clearCache(): Unit = cacheManager.clearCache()
+
// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 00e19da..5921eaf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -23,7 +23,7 @@ import scala.util.parsing.combinator.RegexParsers
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{ShowTablesCommand, UncacheTableCommand, CacheTableCommand, SetCommand}
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.types.StringType
@@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
protected val AS = Keyword("AS")
protected val CACHE = Keyword("CACHE")
+ protected val CLEAR = Keyword("CLEAR")
protected val IN = Keyword("IN")
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
@@ -74,9 +75,11 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
}
private lazy val uncache: Parser[LogicalPlan] =
- UNCACHE ~ TABLE ~> ident ^^ {
- case tableName => UncacheTableCommand(tableName)
- }
+ ( UNCACHE ~ TABLE ~> ident ^^ {
+ case tableName => UncacheTableCommand(tableName)
+ }
+ | CLEAR ~ CACHE ^^^ ClearCacheCommand
+ )
private lazy val set: Parser[LogicalPlan] =
SET ~> restInput ^^ {
http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 7c92e9f..a112321 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -176,6 +176,21 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
/**
* :: DeveloperApi ::
+ * Clear all cached data from the in-memory cache.
+ */
+@DeveloperApi
+case object ClearCacheCommand extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
+ sqlContext.clearCache()
+ Seq.empty[Row]
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(
http://git-wip-us.apache.org/repos/asf/spark/blob/70bfb5c7/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e70e866..c240f2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -280,4 +280,20 @@ class CachedTableSuite extends QueryTest {
assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
assert(!isCached("t2"))
}
+
+ test("Clear all cache") {
+ sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+ sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+ cacheTable("t1")
+ cacheTable("t2")
+ clearCache()
+ assert(cacheManager.isEmpty)
+
+ sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+ sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+ cacheTable("t1")
+ cacheTable("t2")
+ sql("Clear CACHE")
+ assert(cacheManager.isEmpty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org