You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/24 19:52:32 UTC

git commit: [SPARK-4050][SQL] Fix caching of temporary tables with projections.

Repository: spark
Updated Branches:
  refs/heads/master d60a9d440 -> 0e886610e


[SPARK-4050][SQL] Fix caching of temporary tables with projections.

Previously cached data was found by `sameResult` plan matching on optimized plans.  This technique however fails to locate the cached data when a temporary table with a projection is queried with a further reduced projection.  The failure is due to the fact that optimization will collapse the projections, producing a plan that no longer produces the sameResult as the cached data (though the cached data still subsumes the desired data).  For example consider the following previously failing test case.

```scala
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
```

In this PR I change the matching to occur after analysis instead of optimization, so that in the case of temporary tables, the plans will always match.  I think this should work generally, however, this error does raise questions about the need to do more thorough subsumption checking when locating cached data.

Another question is what sort of semantics we want to provide when uncaching data from temporary tables.  For example consider the following sequence of commands:

```scala
testData.select('key).registerTempTable("tempTable1")
testData.select('key).registerTempTable("tempTable2")
cacheTable("tempTable1")

// This obviously works.
assertCached(sql("SELECT COUNT(*) FROM tempTable1"))

// It seems good that this works ...
assertCached(sql("SELECT COUNT(*) FROM tempTable2"))

// ... but is this valid?
uncacheTable("tempTable2")

// Should this still be cached?
assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
```

Author: Michael Armbrust <mi...@databricks.com>

Closes #2912 from marmbrus/cachingBug and squashes the following commits:

9c822d4 [Michael Armbrust] remove commented out code
5c72fb7 [Michael Armbrust] Add a test case / question about uncaching semantics.
63a23e4 [Michael Armbrust] Perform caching on analyzed instead of optimized plan.
03f1cfe [Michael Armbrust] Clean-up / add tests to SameResult suite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e886610
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e886610
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e886610

Branch: refs/heads/master
Commit: 0e886610eedd8ea24761cdcefa25ccedeca72dc8
Parents: d60a9d4
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Oct 24 10:52:25 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 24 10:52:25 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/plans/SameResultSuite.scala    | 10 ++++--
 .../org/apache/spark/sql/CacheManager.scala     | 10 +++---
 .../scala/org/apache/spark/sql/SQLContext.scala |  6 ++--
 .../org/apache/spark/sql/CachedTableSuite.scala | 34 +++++++++++++++++++-
 4 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e886610/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
index e8a793d..11e6831 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 
 /**
- * Provides helper methods for comparing plans.
+ * Tests for the sameResult function of [[LogicalPlan]].
  */
 class SameResultSuite extends FunSuite {
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -52,11 +52,15 @@ class SameResultSuite extends FunSuite {
     assertSameResult(testRelation.select('a, 'b), testRelation2.select('a, 'b))
     assertSameResult(testRelation.select('b, 'a), testRelation2.select('b, 'a))
 
-    assertSameResult(testRelation, testRelation2.select('a), false)
-    assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), false)
+    assertSameResult(testRelation, testRelation2.select('a), result = false)
+    assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), result = false)
   }
 
   test("filters") {
     assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b))
   }
+
+  test("sorts") {
+    assertSameResult(testRelation.orderBy('a.asc), testRelation2.orderBy('a.asc))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e886610/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 5ab2b53..3ced11a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -82,7 +82,7 @@ private[sql] trait CacheManager {
   private[sql] def cacheQuery(
       query: SchemaRDD,
       storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
-    val planToCache = query.queryExecution.optimizedPlan
+    val planToCache = query.queryExecution.analyzed
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
     } else {
@@ -96,8 +96,8 @@ private[sql] trait CacheManager {
 
   /** Removes the data for the given SchemaRDD from the cache */
   private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
-    val planToCache = query.queryExecution.optimizedPlan
-    val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
+    val planToCache = query.queryExecution.analyzed
+    val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
     require(dataIndex >= 0, s"Table $query is not cached.")
     cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
     cachedData.remove(dataIndex)
@@ -106,12 +106,12 @@ private[sql] trait CacheManager {
 
   /** Optionally returns cached data for the given SchemaRDD */
   private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {
-    lookupCachedData(query.queryExecution.optimizedPlan)
+    lookupCachedData(query.queryExecution.analyzed)
   }
 
   /** Optionally returns cached data for the given LogicalPlan. */
   private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
-    cachedData.find(_.plan.sameResult(plan))
+    cachedData.find(cd => plan.sameResult(cd.plan))
   }
 
   /** Replaces segments of the given logical plan with cached versions where possible. */

http://git-wip-us.apache.org/repos/asf/spark/blob/0e886610/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0e4a9ca..590dbf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -374,13 +374,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
     def logical: LogicalPlan
 
     lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
-    lazy val optimizedPlan = optimizer(analyzed)
-    lazy val withCachedData = useCachedData(optimizedPlan)
+    lazy val withCachedData = useCachedData(analyzed)
+    lazy val optimizedPlan = optimizer(withCachedData)
 
     // TODO: Don't just pick the first one...
     lazy val sparkPlan = {
       SparkPlan.currentContext.set(self)
-      planner(withCachedData).next()
+      planner(optimizedPlan).next()
     }
     // executedPlan should not be used to initialize any SparkPlan. It should be
     // only used for execution.

http://git-wip-us.apache.org/repos/asf/spark/blob/0e886610/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 444bc95..da5a358 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -53,17 +53,47 @@ class CachedTableSuite extends QueryTest {
     sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
   }
 
+  test("cache temp table") {
+    testData.select('key).registerTempTable("tempTable")
+    assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
+    cacheTable("tempTable")
+    assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+    uncacheTable("tempTable")
+  }
+
+  test("cache table as select") {
+    sql("CACHE TABLE tempTable AS SELECT key FROM testData")
+    assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+    uncacheTable("tempTable")
+  }
+
+  test("uncaching temp table") {
+    testData.select('key).registerTempTable("tempTable1")
+    testData.select('key).registerTempTable("tempTable2")
+    cacheTable("tempTable1")
+
+    assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
+    assertCached(sql("SELECT COUNT(*) FROM tempTable2"))
+
+    // Is this valid?
+    uncacheTable("tempTable2")
+
+    // Should this be cached?
+    assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
+  }
+
   test("too big for memory") {
     val data = "*" * 10000
     sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData")
     table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
     assert(table("bigData").count() === 200000L)
-    table("bigData").unpersist()
+    table("bigData").unpersist(blocking = true)
   }
 
   test("calling .cache() should use in-memory columnar caching") {
     table("testData").cache()
     assertCached(table("testData"))
+    table("testData").unpersist(blocking = true)
   }
 
   test("calling .unpersist() should drop in-memory columnar cache") {
@@ -108,6 +138,8 @@ class CachedTableSuite extends QueryTest {
         case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
       }.size
     }
+
+    uncacheTable("testData")
   }
 
   test("read from cached table and uncache") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org