You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/23 05:23:45 UTC

[spark] branch branch-3.0 updated: [SPARK-30494][SQL] Fix cached data leakage during replacing an existing view

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8c09160  [SPARK-30494][SQL] Fix cached data leakage during replacing an existing view
8c09160 is described below

commit 8c09160cb8ec750826932988e1c87385a496ba22
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Sun Mar 22 22:22:13 2020 -0700

    [SPARK-30494][SQL] Fix cached data leakage during replacing an existing view
    
    ### What changes were proposed in this pull request?
    
    The cached RDD for plan "select 1" stays in memory forever until the session close. This cached data cannot be used since the view temp1 has been replaced by another plan. It's a memory leak.
    
    We can reproduce by below commands:
    ```
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-SNAPSHOT
          /_/
    
    Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.sql("create or replace temporary view temp1 as select 1")
    scala> spark.sql("cache table temp1")
    scala> spark.sql("create or replace temporary view temp1 as select 1, 2")
    scala> spark.sql("cache table temp1")
    scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
    scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined)
    ```
    
    ### Why are the changes needed?
    Fix the memory leak, specially for long running mode.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Add an unit test.
    
    Closes #27185 from LantaoJin/SPARK-30494.
    
    Authored-by: LantaoJin <ji...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 929b794e25ff5454dadde7da304e6df25526d60e)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/command/CommandUtils.scala |  8 +++++
 .../spark/sql/execution/command/tables.scala       | 13 ++-----
 .../apache/spark/sql/execution/command/views.scala | 15 ++++++++
 .../org/apache/spark/sql/CachedTableSuite.scala    | 42 ++++++++++++++++++++++
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  2 +-
 5 files changed, 68 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index b229b23..7e456a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -385,4 +385,12 @@ object CommandUtils extends Logging {
   private def isDataPath(path: Path, stagingDir: String): Boolean = {
     !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
   }
+
+  def uncacheTableOrView(sparkSession: SparkSession, name: String): Unit = {
+    try {
+      sparkSession.catalog.uncacheTable(name)
+    } catch {
+      case NonFatal(e) => logWarning("Exception when attempting to uncache $name", e)
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d4de822..61955ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -196,11 +196,7 @@ case class AlterTableRenameCommand(
       // this can happen with Hive tables when the underlying catalog is in-memory.
       val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
       if (wasCached) {
-        try {
-          sparkSession.catalog.uncacheTable(oldName.unquotedString)
-        } catch {
-          case NonFatal(e) => log.warn(e.toString, e)
-        }
+        CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString)
       }
       // Invalidate the table last, otherwise uncaching the table would load the logical plan
       // back into the hive metastore cache
@@ -230,12 +226,7 @@ case class AlterTableAddColumnsCommand(
     val catalog = sparkSession.sessionState.catalog
     val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)
 
-    try {
-      sparkSession.catalog.uncacheTable(table.quotedString)
-    } catch {
-      case NonFatal(e) =>
-        log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
-    }
+    CommandUtils.uncacheTableOrView(sparkSession, table.quotedString)
     catalog.refreshTable(table)
 
     SchemaUtils.checkColumnNameDuplication(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 38481dd..795f900 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -108,9 +109,19 @@ case class CreateViewCommand(
     verifyTemporaryObjectsNotExists(catalog)
 
     if (viewType == LocalTempView) {
+      if (replace && catalog.getTempView(name.table).isDefined) {
+        logDebug(s"Try to uncache ${name.quotedString} before replacing.")
+        CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
+      }
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
     } else if (viewType == GlobalTempView) {
+      if (replace && catalog.getGlobalTempView(name.table).isDefined) {
+        val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+        val globalTempView = TableIdentifier(name.table, Option(db))
+        logDebug(s"Try to uncache ${globalTempView.quotedString} before replacing.")
+        CommandUtils.uncacheTableOrView(sparkSession, globalTempView.quotedString)
+      }
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
     } else if (catalog.tableExists(name)) {
@@ -125,6 +136,10 @@ case class CreateViewCommand(
         val viewIdent = tableMetadata.identifier
         checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
 
+        // uncache the cached data before replacing an exists view
+        logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
+        CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
+
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
         // Nothing we need to retain from the old view, so just drop and create a new one
         catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 8189353..9baaaa9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1122,4 +1122,46 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
       assert(!spark.catalog.isCached("t1"))
     }
   }
+
+  test("SPARK-30494 Fix the leak of cached data when replace an existing view") {
+    withTempView("tempView") {
+      spark.catalog.clearCache()
+      sql("create or replace temporary view tempView as select 1")
+      sql("cache table tempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined)
+      sql("create or replace temporary view tempView as select 1, 2")
+      assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty)
+      sql("cache table tempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
+    }
+
+    withGlobalTempView("tempGlobalTempView") {
+      spark.catalog.clearCache()
+      sql("create or replace global temporary view tempGlobalTempView as select 1")
+      sql("cache table global_temp.tempGlobalTempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined)
+      sql("create or replace global temporary view tempGlobalTempView as select 1, 2")
+      assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty)
+      sql("cache table global_temp.tempGlobalTempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
+    }
+
+    withView("view1") {
+      spark.catalog.clearCache()
+      sql("create or replace view view1 as select 1")
+      sql("cache table view1")
+      sql("create or replace view view1 as select 1, 2")
+      sql("cache table view1")
+      // the cached plan of persisted view likes below,
+      // we cannot use the same assertion of temp view.
+      // SubqueryAlias
+      //    |
+      //    + View
+      //        |
+      //        + Project[1 AS 1]
+      spark.sharedState.cacheManager.uncacheQuery(spark.table("view1"), cascade = false)
+      // make sure there is no cached data leak
+      assert(spark.sharedState.cacheManager.isEmpty)
+    }
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 801be64..19f4395 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -106,7 +106,7 @@ case class InsertIntoHiveTable(
     }
 
     // un-cache this table.
-    sparkSession.catalog.uncacheTable(table.identifier.quotedString)
+    CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString)
     sparkSession.sessionState.catalog.refreshTable(table.identifier)
 
     CommandUtils.updateTableStats(sparkSession, table)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org