You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/22 02:07:34 UTC
[spark] branch branch-3.3 updated: [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new ace3c69ef18 [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec
ace3c69ef18 is described below
commit ace3c69ef18d648bb15855c40c8b7e44987dede4
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Nov 22 10:07:17 2022 +0800
[SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec
backport https://github.com/apache/spark/pull/38687 for branch-3.3
### What changes were proposed in this pull request?
Add TimeTravelSpec to the key of relation cache in AnalysisContext.
### Why are the changes needed?
Correct the relation resolution for the same table but different TimeTravelSpec.
### Does this PR introduce _any_ user-facing change?
yes, bug fix
### How was this patch tested?
add test
Closes #38741 from ulysses-you/time-travel-spec-3.3.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 11 ++++++-----
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 17 +++++++++++++++++
2 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2a2fe6f2957..0c68dd8839d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -112,9 +112,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog {
* @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the
* depth of nested views.
* @param maxNestedViewDepth The maximum allowed depth of nested view resolution.
- * @param relationCache A mapping from qualified table names to resolved relations. This can ensure
- * that the table is resolved only once if a table is used multiple times
- * in a query.
+ * @param relationCache A mapping from qualified table names and time travel spec to resolved
+ * relations. This can ensure that the table is resolved only once if a table
+ * is used multiple times in a query.
* @param referredTempViewNames All the temp view names referred by the current view we are
* resolving. It's used to make sure the relation resolution is
* consistent between view creation and view resolution. For example,
@@ -128,7 +128,8 @@ case class AnalysisContext(
catalogAndNamespace: Seq[String] = Nil,
nestedViewDepth: Int = 0,
maxNestedViewDepth: Int = -1,
- relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
+ relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), LogicalPlan] =
+ mutable.Map.empty,
referredTempViewNames: Seq[Seq[String]] = Seq.empty,
// 1. If we are resolving a view, this field will be restored from the view metadata,
// by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
@@ -1188,7 +1189,7 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
- val key = catalog.name +: ident.namespace :+ ident.name
+ val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec)
AnalysisContext.get.relationCache.get(key).map(_.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 44f97f55713..7470911c9e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2576,6 +2576,23 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-41154: Incorrect relation caching for queries with time travel spec") {
+ sql("use testcat")
+ val t1 = "testcat.t1"
+ val t2 = "testcat.t2"
+ withTable(t1, t2) {
+ sql(s"CREATE TABLE $t1 USING foo AS SELECT 1 as c")
+ sql(s"CREATE TABLE $t2 USING foo AS SELECT 2 as c")
+ assert(
+ sql("""
+ |SELECT * FROM t VERSION AS OF '1'
+ |UNION ALL
+ |SELECT * FROM t VERSION AS OF '2'
+ |""".stripMargin
+ ).collect() === Array(Row(1), Row(2)))
+ }
+ }
+
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org