You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/11 14:36:04 UTC
[spark] branch branch-3.1 updated: [SPARK-34060][SQL][3.1] Fix Hive
table caching while updating stats by `ALTER TABLE .. DROP PARTITION`
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9cb3cbf [SPARK-34060][SQL][3.1] Fix Hive table caching while updating stats by `ALTER TABLE .. DROP PARTITION`
9cb3cbf is described below
commit 9cb3cbfffc0fa9302f39cd61f0fb4648d4bbd60b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 11 14:35:24 2021 +0000
[SPARK-34060][SQL][3.1] Fix Hive table caching while updating stats by `ALTER TABLE .. DROP PARTITION`
### What changes were proposed in this pull request?
Fix canonicalisation of `HiveTableRelation` by normalisation of `CatalogTable`, and exclude table stats and temporary fields from the canonicalized plan.
### Why are the changes needed?
This fixes the issue demonstrated by the example below:
```scala
scala> spark.conf.set("spark.sql.statistics.size.autoUpdate.enabled", true)
scala> sql(s"CREATE TABLE tbl (id int, part int) USING hive PARTITIONED BY (part)")
scala> sql("INSERT INTO tbl PARTITION (part=0) SELECT 0")
scala> sql("INSERT INTO tbl PARTITION (part=1) SELECT 1")
scala> sql("CACHE TABLE tbl")
scala> sql("SELECT * FROM tbl").show(false)
+---+----+
|id |part|
+---+----+
|0 |0 |
|1 |1 |
+---+----+
scala> spark.catalog.isCached("tbl")
scala> sql("ALTER TABLE tbl DROP PARTITION (part=0)")
scala> spark.catalog.isCached("tbl")
res19: Boolean = false
```
`ALTER TABLE .. DROP PARTITION` must keep the table in the cache.
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the drop partition command keeps the table in the cache while updating table stats:
```scala
scala> sql("ALTER TABLE tbl DROP PARTITION (part=0)")
scala> spark.catalog.isCached("tbl")
res19: Boolean = true
```
### How was this patch tested?
By running new UT:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *ShowCreateTableSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
```
Authored-by: Max Gekk <max.gekkgmail.com>
Signed-off-by: Wenchen Fan <wenchendatabricks.com>
(cherry picked from commit d97e99157e8d3b7434610fd78af90911c33662c9)
Signed-off-by: Max Gekk <max.gekkgmail.com>
Closes #31124 from MaxGekk/fix-caching-hive-table-2-3.1.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/catalog/interface.scala | 29 +++++++++++++++++++---
.../apache/spark/sql/ShowCreateTableSuite.scala | 25 +------------------
.../apache/spark/sql/hive/CachedTableSuite.scala | 29 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 28 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 0c40afa..7c519ce 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -472,6 +472,30 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"
+
+ def normalize(table: CatalogTable): CatalogTable = {
+ val nondeterministicProps = Set(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_by",
+ "last_modified_time",
+ "Owner:",
+ // The following are hive specific schema parameters which we do not need to match exactly.
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+
+ table.copy(
+ createTime = 0L,
+ lastAccessTime = 0L,
+ properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
+ stats = None,
+ ignoredProperties = Map.empty
+ )
+ }
}
/**
@@ -738,10 +762,7 @@ case class HiveTableRelation(
def isPartitioned: Boolean = partitionCols.nonEmpty
override def doCanonicalize(): HiveTableRelation = copy(
- tableMeta = tableMeta.copy(
- storage = CatalogStorageFormat.empty,
- createTime = -1
- ),
+ tableMeta = CatalogTable.normalize(tableMeta),
dataCols = dataCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index))
},
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala
index 92d306c..5ce5d36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala
@@ -223,29 +223,6 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils {
}
protected def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
- def normalize(table: CatalogTable): CatalogTable = {
- val nondeterministicProps = Set(
- "CreateTime",
- "transient_lastDdlTime",
- "grantTime",
- "lastUpdateTime",
- "last_modified_by",
- "last_modified_time",
- "Owner:",
- // The following are hive specific schema parameters which we do not need to match exactly.
- "totalNumberFiles",
- "maxFileSize",
- "minFileSize"
- )
-
- table.copy(
- createTime = 0L,
- lastAccessTime = 0L,
- properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
- stats = None,
- ignoredProperties = Map.empty
- )
- }
- assert(normalize(actual) == normalize(expected))
+ assert(CatalogTable.normalize(actual) == CatalogTable.normalize(expected))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index a7c8b61..b221a72 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.RDDBlockId
@@ -498,4 +499,32 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
}
}
+
+ test("SPARK-34060: update stats of cached table") {
+ withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
+ def checkTableSize(expected: String): Unit = {
+ val stats =
+ sql("DESCRIBE TABLE EXTENDED t")
+ .select("data_type")
+ .where("col_name = 'Statistics'")
+ .first()
+ .getString(0)
+ assert(stats.contains(expected))
+ }
+
+ sql("CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)")
+ sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+ sql("INSERT INTO t PARTITION (part=1) SELECT 1")
+ assert(!spark.catalog.isCached("t"))
+ sql("CACHE TABLE t")
+ assert(spark.catalog.isCached("t"))
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(1, 1)))
+ checkTableSize("4 bytes")
+
+ sql("ALTER TABLE t DROP PARTITION (part=0)")
+ assert(spark.catalog.isCached("t"))
+ checkTableSize("2 bytes")
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org