You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/03 19:40:28 UTC
spark git commit: [SPARK-22178][SQL] Refresh Persistent Views by
REFRESH TABLE Command
Repository: spark
Updated Branches:
refs/heads/master 4c5158eec -> e65b6b7ca
[SPARK-22178][SQL] Refresh Persistent Views by REFRESH TABLE Command
## What changes were proposed in this pull request?
The underlying tables of persistent views are not refreshed when users issue the REFRESH TABLE command against the persistent views.
## How was this patch tested?
Added a test case
Author: gatorsmile <ga...@gmail.com>
Closes #19405 from gatorsmile/refreshView.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e65b6b7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e65b6b7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e65b6b7c
Branch: refs/heads/master
Commit: e65b6b7ca1a7cff1b91ad2262bb7941e6bf057cd
Parents: 4c5158e
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Oct 3 12:40:22 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Oct 3 12:40:22 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/CatalogImpl.scala | 15 +++++++++++----
.../spark/sql/hive/HiveMetadataCacheSuite.scala | 14 +++++++++++---
2 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e65b6b7c/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 142b005..fdd2533 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -474,13 +474,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
- // Non-temp tables: refresh the metadata cache.
- sessionCatalog.refreshTable(tableIdent)
+ val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
+ val table = sparkSession.table(tableIdent)
+
+ if (tableMetadata.tableType == CatalogTableType.VIEW) {
+ // Temp or persistent views: refresh (or invalidate) any metadata/data cached
+ // in the plan recursively.
+ table.queryExecution.analyzed.foreach(_.refresh())
+ } else {
+ // Non-temp tables: refresh the metadata cache.
+ sessionCatalog.refreshTable(tableIdent)
+ }
// If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
- val table = sparkSession.table(tableIdent)
if (isCached(table)) {
// Uncache the logicalPlan.
sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true)
http://git-wip-us.apache.org/repos/asf/spark/blob/e65b6b7c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 0c28a1b..e71aba7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -31,14 +31,22 @@ import org.apache.spark.sql.test.SQLTestUtils
class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("SPARK-16337 temporary view refresh") {
- withTempView("view_refresh") {
+ checkRefreshView(isTemp = true)
+ }
+
+ test("view refresh") {
+ checkRefreshView(isTemp = false)
+ }
+
+ private def checkRefreshView(isTemp: Boolean) {
+ withView("view_refresh") {
withTable("view_table") {
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.saveAsTable("view_table")
- // Read the table in
- spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh")
+ val temp = if (isTemp) "TEMPORARY" else ""
+ spark.sql(s"CREATE $temp VIEW view_refresh AS SELECT * FROM view_table WHERE id > -1")
assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)
// Delete a file using the Hadoop file system interface since the path returned by
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org