You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/01 08:20:01 UTC
spark git commit: [SPARK-19736][SQL] refreshByPath should clear all
cached plans with the specified path
Repository: spark
Updated Branches:
refs/heads/master 4913c92c2 -> 38e783534
[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path
## What changes were proposed in this pull request?
`Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.
However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #17064 from viirya/fix-refreshByPath.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e78353
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e78353
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e78353
Branch: refs/heads/master
Commit: 38e7835347a2e1803b1df5e73cf8b749951b11b2
Parents: 4913c92
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Mar 1 00:19:57 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Mar 1 00:19:57 2017 -0800
----------------------------------------------------------------------
.../spark/sql/execution/CacheManager.scala | 19 ++++++++++---------
.../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++
2 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/38e78353/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4ca1347..8013851 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
(fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
- cachedData.foreach {
- case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined =>
- val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
- if (dataIndex >= 0) {
- data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
- cachedData.remove(dataIndex)
- }
- sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
- case _ => // Do Nothing
+ cachedData.filter {
+ case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => true
+ case _ => false
+ }.foreach { data =>
+ val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
+ if (dataIndex >= 0) {
+ data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
+ cachedData.remove(dataIndex)
+ }
+ sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/38e78353/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1af1a36..2a0e088 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -634,4 +634,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(getNumInMemoryRelations(cachedPlan2) == 4)
}
}
+
+ test("refreshByPath should refresh all cached plans with the specified path") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath()
+
+ spark.range(10).write.mode("overwrite").parquet(path)
+ spark.read.parquet(path).cache()
+ spark.read.parquet(path).filter($"id" > 4).cache()
+ assert(spark.read.parquet(path).filter($"id" > 4).count() == 5)
+
+ spark.range(20).write.mode("overwrite").parquet(path)
+ spark.catalog.refreshByPath(path)
+ assert(spark.read.parquet(path).count() == 20)
+ assert(spark.read.parquet(path).filter($"id" > 4).count() == 15)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org