You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/21 23:48:46 UTC
spark git commit: [SPARK-6969][SQL] Refresh the cached table when
REFRESH TABLE is used
Repository: spark
Updated Branches:
refs/heads/master 03fd92167 -> 6265cba00
[SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used
https://issues.apache.org/jira/browse/SPARK-6969
Author: Yin Huai <yh...@databricks.com>
Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits:
1e5142b [Yin Huai] Add todo.
92b2498 [Yin Huai] Minor updates.
367df92 [Yin Huai] Recache data in the command of REFRESH TABLE.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6265cba0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6265cba0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6265cba0
Branch: refs/heads/master
Commit: 6265cba00f6141575b4be825735d77d4cea500ab
Parents: 03fd921
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Apr 21 14:48:42 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Apr 21 14:48:42 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/sources/ddl.scala | 17 +++++++
.../spark/sql/hive/CachedTableSuite.scala | 50 +++++++++++++++++++-
2 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6265cba0/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 2e861b8..78d4941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
+ // Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(databaseName, tableName)
+
+ // If this table is cached as a InMemoryColumnarRelation, drop the original
+ // cached version and make the new version cached lazily.
+ val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
+ // Use lookupCachedData directly since RefreshTable also takes databaseName.
+ val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+ if (isCached) {
+ // Create a data frame to represent the table.
+ // TODO: Use uncacheTable once it supports database name.
+ val df = DataFrame(sqlContext, logicalPlan)
+ // Uncache the logicalPlan.
+ sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+ // Cache it again.
+ sqlContext.cacheManager.cacheQuery(df, Some(tableName))
+ }
+
Seq.empty[Row]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6265cba0/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index c188264..fc6c3c3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.hive
+import java.io.File
+
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.util.Utils
class CachedTableSuite extends QueryTest {
@@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
assertCached(table("udfTest"))
uncacheTable("udfTest")
}
+
+ test("REFRESH TABLE also needs to recache the data (data source tables)") {
+ val tempPath: File = Utils.createTempDir()
+ tempPath.delete()
+ table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+ sql("DROP TABLE IF EXISTS refreshTable")
+ createExternalTable("refreshTable", tempPath.toString, "parquet")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Cache the table.
+ sql("CACHE TABLE refreshTable")
+ assertCached(table("refreshTable"))
+ // Append new data.
+ table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+ // We are still using the old data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Refresh the table.
+ sql("REFRESH TABLE refreshTable")
+ // We are using the new data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").unionAll(table("src")).collect())
+
+ // Drop the table and create it again.
+ sql("DROP TABLE refreshTable")
+ createExternalTable("refreshTable", tempPath.toString, "parquet")
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+ // Refresh the table. REFRESH TABLE command should not make a uncached
+ // table cached.
+ sql("REFRESH TABLE refreshTable")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").unionAll(table("src")).collect())
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+ sql("DROP TABLE refreshTable")
+ Utils.deleteRecursively(tempPath)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org