You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/06/23 22:24:40 UTC

git commit: [SPARK-1669][SQL] Made cacheTable idempotent

Repository: spark
Updated Branches:
  refs/heads/master 853a2b951 -> a4bc442ca


[SPARK-1669][SQL]  Made cacheTable idempotent

JIRA issue: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669)

Caching the same table multiple times should end up with only 1 in-memory columnar representation of this table.

Before:

```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))))
```

After:

```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))
```

Author: Cheng Lian <li...@gmail.com>

Closes #1183 from liancheng/spark-1669 and squashes the following commits:

68f8a20 [Cheng Lian] Removed an unused import
51bae90 [Cheng Lian] Made cacheTable idempotent


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4bc442c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4bc442c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4bc442c

Branch: refs/heads/master
Commit: a4bc442ca2c35444de8a33376b6f27c6c2a9003d
Parents: 853a2b9
Author: Cheng Lian <li...@gmail.com>
Authored: Mon Jun 23 13:24:33 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jun 23 13:24:33 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala | 13 +++++++++----
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 20 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a4bc442c/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index c60af28..0bcfbf6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /** Caches the specified table in-memory. */
   def cacheTable(tableName: String): Unit = {
     val currentTable = catalog.lookupRelation(None, tableName)
-    val useCompression =
-      sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
-    val asInMemoryRelation =
-      InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
+    val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match {
+      case _: InMemoryRelation =>
+        currentTable.logicalPlan
+
+      case _ =>
+        val useCompression =
+          sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
+        InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
+    }
 
     catalog.registerTable(None, tableName, asInMemoryRelation)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4bc442c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e9360b0..cca58c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.test._
 
 /* Implicits */
@@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest {
     clear()
   }
 
+  test("SPARK-1669: cacheTable should be idempotent") {
+    assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
+
+    cacheTable("testData")
+    EliminateAnalysisOperators(table("testData").logicalPlan) match {
+      case _: InMemoryRelation =>
+      case _ =>
+        fail("testData should be cached")
+    }
+
+    cacheTable("testData")
+    EliminateAnalysisOperators(table("testData").logicalPlan) match {
+      case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
+        fail("cacheTable is not idempotent")
+
+      case _ =>
+    }
+  }
 }