You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/24 03:33:01 UTC

[spark] branch branch-3.1 updated: [SPARK-34052][SQL][3.1] store SQL text for a temp view created using "CACHE TABLE .. AS SELECT ..."

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 57120b8  [SPARK-34052][SQL][3.1] store SQL text for a temp view created using "CACHE TABLE .. AS SELECT ..."
57120b8 is described below

commit 57120b80acf0f46ab03f35efa4b6cd66469b97a5
Author: Chao Sun <su...@apple.com>
AuthorDate: Sun Jan 24 12:31:56 2021 +0900

    [SPARK-34052][SQL][3.1] store SQL text for a temp view created using "CACHE TABLE .. AS SELECT ..."
    
    This is a backport of #31107 to branch-3.1.
    
    ### What changes were proposed in this pull request?
    
    This passes original SQL text to `CacheTableCommand` command in DSv1 so that it will be stored instead of the analyzed logical plan, similar to `CREATE VIEW` command.
    
    In addition, this changes the behavior of dropping temporary view to also invalidate dependent caches in a cascade, when the config `SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW` is false (which is the default value).
    
    ### Why are the changes needed?
    
    Currently, after creating a temporary view with `CACHE TABLE ... AS SELECT` command, the view can still be queried even after the source table is dropped or replaced (in v2). This can cause correctness issue.
    
    For instance, in the following:
    ```sql
    > CREATE TABLE t ...;
    > CACHE TABLE v AS SELECT * FROM t;
    > DROP TABLE t;
    > SELECT * FROM v;
    ```
    The last select query still returns the old (and stale) result instead of fail. Note that the cache is already invalidated as part of dropping table `t`, but the temporary view `v` still exist.
    
    On the other hand, the following:
    ```sql
    > CREATE TABLE t ...;
    > CREATE TEMPORARY VIEW v AS SELECT * FROM t;
    > CACHE TABLE v;
    > DROP TABLE t;
    > SELECT * FROM v;
    ```
    will throw "Table or view not found" error in the last select query.
    
    This is related to #30567 which aligns the behavior of temporary view and global view by storing the original SQL text for temporary view, as opposed to the analyzed logical plan. However, the PR only handles `CreateView` case but not the `CacheTableAsSelect` case.
    
    This also changes uncache logic and use cascade invalidation for temporary views created above. This is to align its behavior to how a permanent view is handled as of today, and also to avoid potential issues where a dependent view becomes invalid while its data is still kept in cache.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now when `SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW` is set to false (the default value), whenever a table/permanent view/temp view that a cached view depends on is dropped, the cached view itself will become invalid during analysis, i.e., user will get "Table or view not found" error. In addition, when the dependent is a temp view in the previous case, the cache itself will also be invalidated.
    
    ### How was this patch tested?
    
    Added new test cases. Also modified and enhanced some existing related tests.
    
    Closes #31300 from sunchao/SPARK-34052-branch-3.1.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/sql/execution/SparkSqlParser.scala       |   3 +-
 .../apache/spark/sql/execution/command/cache.scala |  17 ++-
 .../apache/spark/sql/execution/command/ddl.scala   |   4 +-
 .../apache/spark/sql/internal/CatalogImpl.scala    |  37 ++++---
 .../org/apache/spark/sql/CachedTableSuite.scala    | 119 +++++++++++++++++----
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |   6 +-
 .../spark/sql/execution/SparkSqlParserSuite.scala  |   6 +-
 .../thriftserver/HiveThriftServer2Suites.scala     |   3 +-
 .../org/apache/spark/sql/hive/test/TestHive.scala  |   2 +-
 9 files changed, 152 insertions(+), 45 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 7a31b0d..8ee3521 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -212,8 +212,9 @@ class SparkSqlAstBuilder extends AstBuilder {
         s"prefix ${catalogAndNamespace.quoted} to " +
         "the table name in CACHE TABLE AS SELECT", ctx)
     }
+    val queryText = Option(ctx.query).map(source(_))
     val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
-    CacheTableCommand(tableName, query, ctx.LAZY != null, options)
+    CacheTableCommand(tableName, query, queryText, ctx.LAZY != null, options)
   }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 3f0945d..6ebf9f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.command
 import java.util.Locale
 
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.LocalTempView
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -29,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
 case class CacheTableCommand(
     multipartIdentifier: Seq[String],
     plan: Option[LogicalPlan],
+    originalText: Option[String],
     isLazy: Boolean,
     options: Map[String, String]) extends RunnableCommand {
   require(plan.isEmpty || multipartIdentifier.length == 1,
@@ -39,7 +42,19 @@ case class CacheTableCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val tableName = multipartIdentifier.quoted
     plan.foreach { logicalPlan =>
-      Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableName)
+      Dataset.ofRows(sparkSession,
+        CreateViewCommand(
+          name = TableIdentifier(tableName),
+          userSpecifiedColumns = Nil,
+          comment = None,
+          properties = Map.empty,
+          originalText = originalText,
+          child = logicalPlan,
+          allowExisting = false,
+          replace = false,
+          viewType = LocalTempView
+        )
+      )
     }
 
     val storageLevelKey = "storagelevel"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 3380d5a..7fc7c7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -237,8 +237,10 @@ case class DropTableCommand(
 
     if (isTempView || catalog.tableExists(tableName)) {
       try {
+        val hasViewText = isTempView &&
+          catalog.getTempViewOrPermanentTableMetadata(tableName).viewText.isDefined
         sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession.table(tableName), cascade = !isTempView)
+          sparkSession.table(tableName), cascade = !isTempView || hasViewText)
       } catch {
         case NonFatal(e) => log.warn(e.toString, e)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 125a597..c2e9165 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
@@ -396,13 +396,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   override def dropTempView(viewName: String): Boolean = {
     sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef =>
-      try {
-        val plan = sparkSession.sessionState.executePlan(viewDef)
-        sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession, plan.analyzed, cascade = false)
-      } catch {
-        case NonFatal(_) => // ignore
-      }
+      uncacheView(viewDef)
       sessionCatalog.dropTempView(viewName)
     }
   }
@@ -417,17 +411,30 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   override def dropGlobalTempView(viewName: String): Boolean = {
     sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
-      try {
-        val plan = sparkSession.sessionState.executePlan(viewDef)
-        sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession, plan.analyzed, cascade = false)
-      } catch {
-        case NonFatal(_) => // ignore
-      }
+      uncacheView(viewDef)
       sessionCatalog.dropGlobalTempView(viewName)
     }
   }
 
+  private def uncacheView(viewDef: LogicalPlan): Unit = {
+    try {
+      // If view text is defined, it means we are not storing analyzed logical plan for the view
+      // and instead its behavior follows that of a permanent view (see SPARK-33142 for more
+      // details). Therefore, when uncaching the view we should also do in a cascade fashion, the
+      // same way as how a permanent view is handled. This also avoids a potential issue where a
+      // dependent view becomes invalid because of the above while its data is still cached.
+      val viewText = viewDef match {
+        case v: View => v.desc.viewText
+        case _ => None
+      }
+      val plan = sparkSession.sessionState.executePlan(viewDef)
+      sparkSession.sharedState.cacheManager.uncacheQuery(
+        sparkSession, plan.analyzed, cascade = viewText.isDefined)
+    } catch {
+      case NonFatal(_) => // ignore
+    }
+  }
+
   /**
    * Recovers all the partitions in the directory of a table and update the catalog.
    * Only works with a partitioned table, and not a temporary view.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index a3f4e9e..537379c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -928,33 +928,61 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
     }
   }
 
-  test("SPARK-24596 Non-cascading Cache Invalidation - drop temporary view") {
-    withTempView("t1", "t2") {
-      sql("CACHE TABLE t1 AS SELECT * FROM testData WHERE key > 1")
-      sql("CACHE TABLE t2 as SELECT * FROM t1 WHERE value > 1")
+  test("SPARK-24596, SPARK-34052: cascading cache invalidation - drop temporary view") {
+    Seq(true, false).foreach { storeAnalyzed =>
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
+        withTempView("t1", "t2") {
+          sql("CACHE TABLE t1 AS SELECT * FROM testData WHERE key > 1")
+          sql("CACHE TABLE t2 as SELECT * FROM t1 WHERE value > 1")
 
-      assert(spark.catalog.isCached("t1"))
-      assert(spark.catalog.isCached("t2"))
-      sql("DROP VIEW t1")
-      assert(spark.catalog.isCached("t2"))
+          assert(spark.catalog.isCached("t1"))
+          assert(spark.catalog.isCached("t2"))
+
+          val oldView = spark.table("t2")
+          sql("DROP VIEW t1")
+
+          // dropping a temp view trigger cache invalidation on dependents iff the config is
+          // turned off
+          assert(storeAnalyzed ==
+            spark.sharedState.cacheManager.lookupCachedData(oldView).isDefined)
+          if (!storeAnalyzed) {
+            // t2 should become invalid after t1 is dropped
+            val e = intercept[AnalysisException](spark.catalog.isCached("t2"))
+            assert(e.message.contains(s"Table or view not found"))
+          }
+        }
+      }
     }
   }
 
-  test("SPARK-24596 Non-cascading Cache Invalidation - drop persistent view") {
-    withTable("t") {
-      spark.range(1, 10).toDF("key").withColumn("value", $"key" * 2)
-        .write.format("json").saveAsTable("t")
-      withView("t1") {
-        withTempView("t2") {
-          sql("CREATE VIEW t1 AS SELECT * FROM t WHERE key > 1")
+  test("SPARK-24596, SPARK-34052: cascading cache invalidation - drop persistent view") {
+    Seq(true, false).foreach { storeAnalyzed =>
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
+        withTable("t") {
+          spark.range(1, 10).toDF("key").withColumn("value", $"key" * 2)
+            .write.format("json").saveAsTable("t")
+          withView("t1") {
+            withTempView("t2") {
+              sql("CREATE VIEW t1 AS SELECT * FROM t WHERE key > 1")
 
-          sql("CACHE TABLE t1")
-          sql("CACHE TABLE t2 AS SELECT * FROM t1 WHERE value > 1")
+              sql("CACHE TABLE t1")
+              sql("CACHE TABLE t2 AS SELECT * FROM t1 WHERE value > 1")
 
-          assert(spark.catalog.isCached("t1"))
-          assert(spark.catalog.isCached("t2"))
-          sql("DROP VIEW t1")
-          assert(!spark.catalog.isCached("t2"))
+              assert(spark.catalog.isCached("t1"))
+              assert(spark.catalog.isCached("t2"))
+
+              val oldView = spark.table("t2")
+              sql("DROP VIEW t1")
+
+              // dropping a permanent view always trigger cache invalidation on dependents
+              assert(spark.sharedState.cacheManager.lookupCachedData(oldView).isEmpty)
+              if (!storeAnalyzed) {
+                // t2 should become invalid after t1 is dropped
+                val e = intercept[AnalysisException](spark.catalog.isCached("t2"))
+                assert(e.message.contains(s"Table or view not found"))
+              }
+            }
+          }
         }
       }
     }
@@ -1376,4 +1404,53 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
       sql("ALTER TABLE t RECOVER PARTITIONS")
     }
   }
+
+  test("SPARK-34052: cascading cache invalidation - CatalogImpl.dropTempView") {
+    Seq(true, false).foreach { storeAnalyzed =>
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
+        withTempView("view1", "view2") {
+          sql("CREATE TEMPORARY VIEW view1 AS SELECT * FROM testData WHERE key > 1")
+          sql("CACHE TABLE view2 AS SELECT * FROM view1 WHERE value > 1")
+          assert(spark.catalog.isCached("view2"))
+
+          val oldView = spark.table("view2")
+          spark.catalog.dropTempView("view1")
+          assert(storeAnalyzed ==
+            spark.sharedState.cacheManager.lookupCachedData(oldView).isDefined)
+        }
+      }
+    }
+  }
+
+  test("SPARK-34052: cascading cache invalidation - CatalogImpl.dropGlobalTempView") {
+    Seq(true, false).foreach { storeAnalyzed =>
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
+        withGlobalTempView("view1") {
+          withTempView("view2") {
+            val db = spark.sharedState.globalTempViewManager.database
+            sql("CREATE GLOBAL TEMPORARY VIEW view1 AS SELECT * FROM testData WHERE key > 1")
+            sql(s"CACHE TABLE view2 AS SELECT * FROM ${db}.view1 WHERE value > 1")
+            assert(spark.catalog.isCached("view2"))
+
+            val oldView = spark.table("view2")
+            spark.catalog.dropGlobalTempView("view1")
+            assert(storeAnalyzed ==
+              spark.sharedState.cacheManager.lookupCachedData(oldView).isDefined)
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-34052: cached temp view should become invalid after the source table is dropped") {
+    val t = "t"
+    withTable(t) {
+      sql(s"CREATE TABLE $t USING parquet AS SELECT * FROM VALUES(1, 'a') AS $t(a, b)")
+      sql(s"CACHE TABLE v AS SELECT a FROM $t")
+      checkAnswer(sql("SELECT * FROM v"), Row(1) :: Nil)
+      sql(s"DROP TABLE $t")
+      val e = intercept[AnalysisException](sql("SELECT * FROM v"))
+      assert(e.message.contains(s"Table or view not found: $t"))
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index f0f6e7c..42d92b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -779,8 +779,9 @@ class DataSourceV2SQLSuite
         checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
         checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
 
+        val oldView = spark.table(view)
         sql(s"DROP TABLE $t")
-        assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
+        assert(spark.sharedState.cacheManager.lookupCachedData(oldView).isEmpty)
       }
     }
   }
@@ -795,8 +796,9 @@ class DataSourceV2SQLSuite
           checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
           checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
 
+          val oldView = spark.table(view)
           sql(s"REPLACE TABLE $t (a bigint) USING foo")
-          assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
+          assert(spark.sharedState.cacheManager.lookupCachedData(oldView).isEmpty)
         }
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 81ba09f..517d0a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -342,25 +342,27 @@ class SparkSqlParserSuite extends AnalysisTest {
   test("CACHE TABLE") {
     assertEqual(
       "CACHE TABLE a.b.c",
-      CacheTableCommand(Seq("a", "b", "c"), None, false, Map.empty))
+      CacheTableCommand(Seq("a", "b", "c"), None, None, false, Map.empty))
 
     assertEqual(
       "CACHE TABLE t AS SELECT * FROM testData",
       CacheTableCommand(
         Seq("t"),
         Some(Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData")))),
+        Some("SELECT * FROM testData"),
         false,
         Map.empty))
 
     assertEqual(
       "CACHE LAZY TABLE a.b.c",
-      CacheTableCommand(Seq("a", "b", "c"), None, true, Map.empty))
+      CacheTableCommand(Seq("a", "b", "c"), None, None, true, Map.empty))
 
     assertEqual(
       "CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')",
       CacheTableCommand(
         Seq("a", "b", "c"),
         None,
+        None,
         true,
         Map("storageLevel" -> "DISK_ONLY")))
 
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index bd0db74..40d9306 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -389,10 +389,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test {
           statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
         }
 
+        // The cached temporary table also shouldn't be used during query optimization
         val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC")
         plan.next()
         plan.next()
-        assert(plan.getString(1).contains("Scan In-memory table test_table"))
+        assert(!plan.getString(1).contains("Scan In-memory table test_table"))
 
         val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
         val buf = new collection.mutable.ArrayBuffer[Int]()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index e996f2c..8c0db2e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -597,7 +597,7 @@ private[hive] class TestHiveQueryExecution(
 
   override lazy val analyzed: LogicalPlan = sparkSession.withActive {
     val describedTables = logical match {
-      case CacheTableCommand(tbl, _, _, _) => tbl.asTableIdentifier :: Nil
+      case CacheTableCommand(tbl, _, _, _, _) => tbl.asTableIdentifier :: Nil
       case _ => Nil
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org