You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/20 22:12:29 UTC

spark git commit: [SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name

Repository: spark
Updated Branches:
  refs/heads/master 15cacc812 -> abf29187f


[SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name

This PR enables the Web UI storage tab to show the in-memory table name instead of the mysterious query plan string as the name of the in-memory columnar RDD.

Note that after #2501, a single columnar RDD can be shared by multiple in-memory tables, as long as their query results are the same. In this case, only the first cached table name is shown. For example:

```sql
CACHE TABLE first AS SELECT * FROM src;
CACHE TABLE second AS SELECT * FROM src;
```

The Web UI only shows "In-memory table first".

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3383)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #3383 from liancheng/columnar-rdd-name and squashes the following commits:

071907f [Cheng Lian] Fixes tests
12ddfa6 [Cheng Lian] Names in-memory columnar RDD with corresponding table name


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abf29187
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abf29187
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abf29187

Branch: refs/heads/master
Commit: abf29187f0342b607fcefe269391d4db58d2a957
Parents: 15cacc8
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Nov 20 13:12:24 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Nov 20 13:12:24 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CacheManager.scala    |  9 +++++++--
 .../main/scala/org/apache/spark/sql/SchemaRDD.scala  |  2 +-
 .../sql/columnar/InMemoryColumnarTableScan.scala     | 15 +++++++++------
 .../org/apache/spark/sql/execution/commands.scala    |  5 ++---
 .../org/apache/spark/sql/CachedTableSuite.scala      |  2 +-
 .../sql/columnar/InMemoryColumnarQuerySuite.scala    |  6 +++---
 6 files changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abf29187/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 2e7abac..3c9439b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -46,7 +46,7 @@ private[sql] trait CacheManager {
   def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty
 
   /** Caches the specified table in-memory. */
-  def cacheTable(tableName: String): Unit = cacheQuery(table(tableName))
+  def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName))
 
   /** Removes the specified table from the in-memory cache. */
   def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName))
@@ -81,6 +81,7 @@ private[sql] trait CacheManager {
    */
   private[sql] def cacheQuery(
       query: SchemaRDD,
+      tableName: Option[String] = None,
       storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
     val planToCache = query.queryExecution.analyzed
     if (lookupCachedData(planToCache).nonEmpty) {
@@ -90,7 +91,11 @@ private[sql] trait CacheManager {
         CachedData(
           planToCache,
           InMemoryRelation(
-            useCompression, columnBatchSize, storageLevel, query.queryExecution.executedPlan))
+            useCompression,
+            columnBatchSize,
+            storageLevel,
+            query.queryExecution.executedPlan,
+            tableName))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abf29187/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index fbec2f9..904a276 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -475,7 +475,7 @@ class SchemaRDD(
   }
 
   override def persist(newLevel: StorageLevel): this.type = {
-    sqlContext.cacheQuery(this, newLevel)
+    sqlContext.cacheQuery(this, None, newLevel)
     this
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abf29187/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 881d32b..0cebe82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -36,8 +36,9 @@ private[sql] object InMemoryRelation {
       useCompression: Boolean,
       batchSize: Int,
       storageLevel: StorageLevel,
-      child: SparkPlan): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
+      child: SparkPlan,
+      tableName: Option[String]): InMemoryRelation =
+    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
 }
 
 private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
@@ -47,7 +48,8 @@ private[sql] case class InMemoryRelation(
     useCompression: Boolean,
     batchSize: Int,
     storageLevel: StorageLevel,
-    child: SparkPlan)(
+    child: SparkPlan,
+    tableName: Option[String])(
     private var _cachedColumnBuffers: RDD[CachedBatch] = null,
     private var _statistics: Statistics = null)
   extends LogicalPlan with MultiInstanceRelation {
@@ -137,13 +139,13 @@ private[sql] case class InMemoryRelation(
       }
     }.persist(storageLevel)
 
-    cached.setName(child.toString)
+    cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString))
     _cachedColumnBuffers = cached
   }
 
   def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
     InMemoryRelation(
-      newOutput, useCompression, batchSize, storageLevel, child)(
+      newOutput, useCompression, batchSize, storageLevel, child, tableName)(
       _cachedColumnBuffers, statisticsToBePropagated)
   }
 
@@ -155,7 +157,8 @@ private[sql] case class InMemoryRelation(
       useCompression,
       batchSize,
       storageLevel,
-      child)(
+      child,
+      tableName)(
       _cachedColumnBuffers,
       statisticsToBePropagated).asInstanceOf[this.type]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/abf29187/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index f23b9c4..afe3f3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -160,12 +160,11 @@ case class CacheTableCommand(
     import sqlContext._
 
     plan.foreach(_.registerTempTable(tableName))
-    val schemaRDD = table(tableName)
-    schemaRDD.cache()
+    cacheTable(tableName)
 
     if (!isLazy) {
       // Performs eager caching
-      schemaRDD.count()
+      table(tableName).count()
     }
 
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/abf29187/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 765fa82..0422101 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -123,7 +123,7 @@ class CachedTableSuite extends QueryTest {
     cacheTable("testData")
     assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
       table("testData").queryExecution.withCachedData.collect {
-        case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
+        case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r
       }.size
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abf29187/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 15903d0..fc95dcc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -29,7 +29,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("simple columnar query") {
     val plan = executePlan(testData.logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
+    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
 
     checkAnswer(scan, testData.collect().toSeq)
   }
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("projection") {
     val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
+    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
 
     checkAnswer(scan, testData.collect().map {
       case Row(key: Int, value: String) => value -> key
@@ -53,7 +53,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
     val plan = executePlan(testData.logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
+    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
 
     checkAnswer(scan, testData.collect().toSeq)
     checkAnswer(scan, testData.collect().toSeq)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org