You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/11 14:36:04 UTC

[spark] branch branch-3.1 updated: [SPARK-34060][SQL][3.1] Fix Hive table caching while updating stats by `ALTER TABLE .. DROP PARTITION`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9cb3cbf  [SPARK-34060][SQL][3.1] Fix Hive table caching while updating stats by `ALTER TABLE .. DROP PARTITION`
9cb3cbf is described below

commit 9cb3cbfffc0fa9302f39cd61f0fb4648d4bbd60b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 11 14:35:24 2021 +0000

    [SPARK-34060][SQL][3.1] Fix Hive table caching while updating stats by `ALTER TABLE .. DROP PARTITION`
    
    ### What changes were proposed in this pull request?
    Fix canonicalisation of `HiveTableRelation` by normalisation of `CatalogTable`, and exclude table stats and temporary fields from the canonicalized plan.
    
    ### Why are the changes needed?
    This fixes the issue demonstrated by the example below:
    ```scala
    scala> spark.conf.set("spark.sql.statistics.size.autoUpdate.enabled", true)
    scala> sql(s"CREATE TABLE tbl (id int, part int) USING hive PARTITIONED BY (part)")
    scala> sql("INSERT INTO tbl PARTITION (part=0) SELECT 0")
    scala> sql("INSERT INTO tbl PARTITION (part=1) SELECT 1")
    scala> sql("CACHE TABLE tbl")
    scala> sql("SELECT * FROM tbl").show(false)
    +---+----+
    |id |part|
    +---+----+
    |0  |0   |
    |1  |1   |
    +---+----+
    
    scala> spark.catalog.isCached("tbl")
    scala> sql("ALTER TABLE tbl DROP PARTITION (part=0)")
    scala> spark.catalog.isCached("tbl")
    res19: Boolean = false
    ```
    `ALTER TABLE .. DROP PARTITION` must keep the table in the cache.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes, the drop partition command keeps the table in the cache while updating table stats:
    ```scala
    scala> sql("ALTER TABLE tbl DROP PARTITION (part=0)")
    scala> spark.catalog.isCached("tbl")
    res19: Boolean = true
    ```
    
    ### How was this patch tested?
    By running new UT:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *ShowCreateTableSuite"
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
    ```
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit d97e99157e8d3b7434610fd78af90911c33662c9)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #31124 from MaxGekk/fix-caching-hive-table-2-3.1.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/catalog/interface.scala     | 29 +++++++++++++++++++---
 .../apache/spark/sql/ShowCreateTableSuite.scala    | 25 +------------------
 .../apache/spark/sql/hive/CachedTableSuite.scala   | 29 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 28 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 0c40afa..7c519ce 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -472,6 +472,30 @@ object CatalogTable {
 
   val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
   val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"
+
+  def normalize(table: CatalogTable): CatalogTable = {
+    val nondeterministicProps = Set(
+      "CreateTime",
+      "transient_lastDdlTime",
+      "grantTime",
+      "lastUpdateTime",
+      "last_modified_by",
+      "last_modified_time",
+      "Owner:",
+      // The following are hive specific schema parameters which we do not need to match exactly.
+      "totalNumberFiles",
+      "maxFileSize",
+      "minFileSize"
+    )
+
+    table.copy(
+      createTime = 0L,
+      lastAccessTime = 0L,
+      properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
+      stats = None,
+      ignoredProperties = Map.empty
+    )
+  }
 }
 
 /**
@@ -738,10 +762,7 @@ case class HiveTableRelation(
   def isPartitioned: Boolean = partitionCols.nonEmpty
 
   override def doCanonicalize(): HiveTableRelation = copy(
-    tableMeta = tableMeta.copy(
-      storage = CatalogStorageFormat.empty,
-      createTime = -1
-    ),
+    tableMeta = CatalogTable.normalize(tableMeta),
     dataCols = dataCols.zipWithIndex.map {
       case (attr, index) => attr.withExprId(ExprId(index))
     },
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala
index 92d306c..5ce5d36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala
@@ -223,29 +223,6 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
-    def normalize(table: CatalogTable): CatalogTable = {
-      val nondeterministicProps = Set(
-        "CreateTime",
-        "transient_lastDdlTime",
-        "grantTime",
-        "lastUpdateTime",
-        "last_modified_by",
-        "last_modified_time",
-        "Owner:",
-        // The following are hive specific schema parameters which we do not need to match exactly.
-        "totalNumberFiles",
-        "maxFileSize",
-        "minFileSize"
-      )
-
-      table.copy(
-        createTime = 0L,
-        lastAccessTime = 0L,
-        properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
-        stats = None,
-        ignoredProperties = Map.empty
-      )
-    }
-    assert(normalize(actual) == normalize(expected))
+    assert(CatalogTable.normalize(actual) == CatalogTable.normalize(expected))
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index a7c8b61..b221a72 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.RDDBlockId
@@ -498,4 +499,32 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
     }
   }
+
+  test("SPARK-34060: update stats of cached table") {
+    withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
+      def checkTableSize(expected: String): Unit = {
+        val stats =
+          sql("DESCRIBE TABLE EXTENDED t")
+            .select("data_type")
+            .where("col_name = 'Statistics'")
+            .first()
+            .getString(0)
+        assert(stats.contains(expected))
+      }
+
+      sql("CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)")
+      sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+      sql("INSERT INTO t PARTITION (part=1) SELECT 1")
+      assert(!spark.catalog.isCached("t"))
+      sql("CACHE TABLE t")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(1, 1)))
+      checkTableSize("4 bytes")
+
+      sql("ALTER TABLE t DROP PARTITION (part=0)")
+      assert(spark.catalog.isCached("t"))
+      checkTableSize("2 bytes")
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org