You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/21 23:48:46 UTC

spark git commit: [SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used

Repository: spark
Updated Branches:
  refs/heads/master 03fd92167 -> 6265cba00


[SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used

https://issues.apache.org/jira/browse/SPARK-6969

Author: Yin Huai <yh...@databricks.com>

Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits:

1e5142b [Yin Huai] Add todo.
92b2498 [Yin Huai] Minor updates.
367df92 [Yin Huai] Recache data in the command of REFRESH TABLE.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6265cba0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6265cba0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6265cba0

Branch: refs/heads/master
Commit: 6265cba00f6141575b4be825735d77d4cea500ab
Parents: 03fd921
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Apr 21 14:48:42 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Apr 21 14:48:42 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/ddl.scala      | 17 +++++++
 .../spark/sql/hive/CachedTableSuite.scala       | 50 +++++++++++++++++++-
 2 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6265cba0/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 2e861b8..78d4941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
+    // Refresh the given table's metadata first.
     sqlContext.catalog.refreshTable(databaseName, tableName)
+
+    // If this table is cached as a InMemoryColumnarRelation, drop the original
+    // cached version and make the new version cached lazily.
+    val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
+    // Use lookupCachedData directly since RefreshTable also takes databaseName.
+    val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    if (isCached) {
+      // Create a data frame to represent the table.
+      // TODO: Use uncacheTable once it supports database name.
+      val df = DataFrame(sqlContext, logicalPlan)
+      // Uncache the logicalPlan.
+      sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+      // Cache it again.
+      sqlContext.cacheManager.cacheQuery(df, Some(tableName))
+    }
+
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6265cba0/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index c188264..fc6c3c3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
 import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.util.Utils
 
 class CachedTableSuite extends QueryTest {
 
@@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
     assertCached(table("udfTest"))
     uncacheTable("udfTest")
   }
+
+  test("REFRESH TABLE also needs to recache the data (data source tables)") {
+    val tempPath: File = Utils.createTempDir()
+    tempPath.delete()
+    table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+    sql("DROP TABLE IF EXISTS refreshTable")
+    createExternalTable("refreshTable", tempPath.toString, "parquet")
+    checkAnswer(
+      table("refreshTable"),
+      table("src").collect())
+    // Cache the table.
+    sql("CACHE TABLE refreshTable")
+    assertCached(table("refreshTable"))
+    // Append new data.
+    table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+    // We are still using the old data.
+    assertCached(table("refreshTable"))
+    checkAnswer(
+      table("refreshTable"),
+      table("src").collect())
+    // Refresh the table.
+    sql("REFRESH TABLE refreshTable")
+    // We are using the new data.
+    assertCached(table("refreshTable"))
+    checkAnswer(
+      table("refreshTable"),
+      table("src").unionAll(table("src")).collect())
+
+    // Drop the table and create it again.
+    sql("DROP TABLE refreshTable")
+    createExternalTable("refreshTable", tempPath.toString, "parquet")
+    // It is not cached.
+    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+    // Refresh the table. REFRESH TABLE command should not make a uncached
+    // table cached.
+    sql("REFRESH TABLE refreshTable")
+    checkAnswer(
+      table("refreshTable"),
+      table("src").unionAll(table("src")).collect())
+    // It is not cached.
+    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+    sql("DROP TABLE refreshTable")
+    Utils.deleteRecursively(tempPath)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org