You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/22 02:07:34 UTC

[spark] branch branch-3.3 updated: [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ace3c69ef18 [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec
ace3c69ef18 is described below

commit ace3c69ef18d648bb15855c40c8b7e44987dede4
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Nov 22 10:07:17 2022 +0800

    [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec
    
    backport https://github.com/apache/spark/pull/38687 for branch-3.3
    
    ### What changes were proposed in this pull request?
    
    Add TimeTravelSpec to the key of relation cache in AnalysisContext.
    
    ### Why are the changes needed?
    
    Correct the relation resolution for the same table but different TimeTravelSpec.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    add test
    
    Closes #38741 from ulysses-you/time-travel-spec-3.3.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 11 ++++++-----
 .../spark/sql/connector/DataSourceV2SQLSuite.scala      | 17 +++++++++++++++++
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2a2fe6f2957..0c68dd8839d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -112,9 +112,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog {
  * @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the
  *                        depth of nested views.
  * @param maxNestedViewDepth The maximum allowed depth of nested view resolution.
- * @param relationCache A mapping from qualified table names to resolved relations. This can ensure
- *                      that the table is resolved only once if a table is used multiple times
- *                      in a query.
+ * @param relationCache A mapping from qualified table names and time travel spec to resolved
+ *                      relations. This can ensure that the table is resolved only once if a table
+ *                      is used multiple times in a query.
  * @param referredTempViewNames All the temp view names referred by the current view we are
  *                              resolving. It's used to make sure the relation resolution is
  *                              consistent between view creation and view resolution. For example,
@@ -128,7 +128,8 @@ case class AnalysisContext(
     catalogAndNamespace: Seq[String] = Nil,
     nestedViewDepth: Int = 0,
     maxNestedViewDepth: Int = -1,
-    relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
+    relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), LogicalPlan] =
+      mutable.Map.empty,
     referredTempViewNames: Seq[Seq[String]] = Seq.empty,
     // 1. If we are resolving a view, this field will be restored from the view metadata,
     //    by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
@@ -1188,7 +1189,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
         expandIdentifier(u.multipartIdentifier) match {
           case CatalogAndIdentifier(catalog, ident) =>
-            val key = catalog.name +: ident.namespace :+ ident.name
+            val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec)
             AnalysisContext.get.relationCache.get(key).map(_.transform {
               case multi: MultiInstanceRelation =>
                 val newRelation = multi.newInstance()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 44f97f55713..7470911c9e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2576,6 +2576,23 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-41154: Incorrect relation caching for queries with time travel spec") {
+    sql("use testcat")
+    val t1 = "testcat.t1"
+    val t2 = "testcat.t2"
+    withTable(t1, t2) {
+      sql(s"CREATE TABLE $t1 USING foo AS SELECT 1 as c")
+      sql(s"CREATE TABLE $t2 USING foo AS SELECT 2 as c")
+      assert(
+        sql("""
+              |SELECT * FROM t VERSION AS OF '1'
+              |UNION ALL
+              |SELECT * FROM t VERSION AS OF '2'
+              |""".stripMargin
+        ).collect() === Array(Row(1), Row(2)))
+    }
+  }
+
   private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
     val e = intercept[AnalysisException] {
       sql(s"$sqlCommand $sqlParams")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org