You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/01 02:01:49 UTC
spark git commit: [SPARK-21127][SQL] Update statistics after data
changing commands
Repository: spark
Updated Branches:
refs/heads/master 4eb41879c -> 61b5df567
[SPARK-21127][SQL] Update statistics after data changing commands
## What changes were proposed in this pull request?
Update stats after the following data changing commands:
- InsertIntoHadoopFsRelationCommand
- InsertIntoHiveTable
- LoadDataCommand
- TruncateTableCommand
- AlterTableSetLocationCommand
- AlterTableDropPartitionCommand
## How was this patch tested?
Added new test cases.
Author: wangzhenhua <wa...@huawei.com>
Author: Zhenhua Wang <wz...@163.com>
Closes #18334 from wzhfy/changeStatsForOperation.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61b5df56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61b5df56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61b5df56
Branch: refs/heads/master
Commit: 61b5df567eb8ae0df4059cb0e334316fff462de9
Parents: 4eb4187
Author: wangzhenhua <wa...@huawei.com>
Authored: Sat Jul 1 10:01:44 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Jul 1 10:01:44 2017 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 10 +
.../sql/execution/command/CommandUtils.scala | 17 +-
.../spark/sql/execution/command/ddl.scala | 15 +-
.../spark/sql/StatisticsCollectionSuite.scala | 77 +++++---
.../apache/spark/sql/hive/StatisticsSuite.scala | 187 ++++++++++++-------
5 files changed, 207 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c641e4d..25152f3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -774,6 +774,14 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)
+ val AUTO_UPDATE_SIZE =
+ buildConf("spark.sql.statistics.autoUpdate.size")
+ .doc("Enables automatic update for table size once table's data is changed. Note that if " +
+ "the total number of files of the table is very large, this can be expensive and slow " +
+ "down data change commands.")
+ .booleanConf
+ .createWithDefault(false)
+
val CBO_ENABLED =
buildConf("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
@@ -1083,6 +1091,8 @@ class SQLConf extends Serializable with Logging {
def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
+ def autoUpdateSize: Boolean = getConf(SQLConf.AUTO_UPDATE_SIZE)
+
def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 9239760..fce12cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -36,7 +36,14 @@ object CommandUtils extends Logging {
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
if (table.stats.nonEmpty) {
val catalog = sparkSession.sessionState.catalog
- catalog.alterTableStats(table.identifier, None)
+ if (sparkSession.sessionState.conf.autoUpdateSize) {
+ val newTable = catalog.getTableMetadata(table.identifier)
+ val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable)
+ val newStats = CatalogStatistics(sizeInBytes = newSize)
+ catalog.alterTableStats(table.identifier, Some(newStats))
+ } else {
+ catalog.alterTableStats(table.identifier, None)
+ }
}
}
@@ -84,7 +91,9 @@ object CommandUtils extends Logging {
size
}
- locationUri.map { p =>
+ val startTime = System.nanoTime()
+ logInfo(s"Starting to calculate the total file size under path $locationUri.")
+ val size = locationUri.map { p =>
val path = new Path(p)
try {
val fs = path.getFileSystem(sessionState.newHadoopConf())
@@ -97,6 +106,10 @@ object CommandUtils extends Logging {
0L
}
}.getOrElse(0L)
+ val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
+ logInfo(s"It took $durationInMs ms to calculate the total file size under path $locationUri.")
+
+ size
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ac897c1..ba7ca84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -437,7 +437,20 @@ case class AlterTableAddPartitionCommand(
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
- CommandUtils.updateTableStats(sparkSession, table)
+ if (table.stats.nonEmpty) {
+ if (sparkSession.sessionState.conf.autoUpdateSize) {
+ val addedSize = parts.map { part =>
+ CommandUtils.calculateLocationSize(sparkSession.sessionState, table.identifier,
+ part.storage.locationUri)
+ }.sum
+ if (addedSize > 0) {
+ val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
+ catalog.alterTableStats(table.identifier, Some(newStats))
+ }
+ } else {
+ catalog.alterTableStats(table.identifier, None)
+ }
+ }
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b031c52..d9392de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.test.SQLTestData.ArrayData
import org.apache.spark.sql.types._
@@ -178,36 +178,63 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
test("change stats after set location command") {
val table = "change_stats_set_location_table"
- withTable(table) {
- spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table)
- // analyze to get initial stats
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
- val fetched1 = checkTableStats(
- table, hasSizeInBytes = true, expectedRowCounts = Some(100))
- assert(fetched1.get.sizeInBytes > 0)
- assert(fetched1.get.colStats.size == 2)
-
- // set location command
- withTempDir { newLocation =>
- sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'")
- checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+ Seq(false, true).foreach { autoUpdate =>
+ withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+ withTable(table) {
+ spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table)
+ // analyze to get initial stats
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
+ val fetched1 = checkTableStats(
+ table, hasSizeInBytes = true, expectedRowCounts = Some(100))
+ assert(fetched1.get.sizeInBytes > 0)
+ assert(fetched1.get.colStats.size == 2)
+
+ // set location command
+ val initLocation = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
+ .storage.locationUri.get.toString
+ withTempDir { newLocation =>
+ sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'")
+ if (autoUpdate) {
+ val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched2.get.sizeInBytes == 0)
+ assert(fetched2.get.colStats.isEmpty)
+
+ // set back to the initial location
+ sql(s"ALTER TABLE $table SET LOCATION '$initLocation'")
+ val fetched3 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes)
+ } else {
+ checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+ }
+ }
+ }
}
}
}
test("change stats after insert command for datasource table") {
val table = "change_stats_insert_datasource_table"
- withTable(table) {
- sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
- // analyze to get initial stats
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
- val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
- assert(fetched1.get.sizeInBytes == 0)
- assert(fetched1.get.colStats.size == 2)
-
- // insert into command
- sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
- checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+ Seq(false, true).foreach { autoUpdate =>
+ withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+ withTable(table) {
+ sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
+ // analyze to get initial stats
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+ val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+ assert(fetched1.get.sizeInBytes == 0)
+ assert(fetched1.get.colStats.size == 2)
+
+ // insert into command
+ sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
+ if (autoUpdate) {
+ val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched2.get.sizeInBytes > 0)
+ assert(fetched2.get.colStats.isEmpty)
+ } else {
+ checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 5fd266c..c601038 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -444,88 +444,133 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("change stats after insert command for hive table") {
val table = s"change_stats_insert_hive_table"
- withTable(table) {
- sql(s"CREATE TABLE $table (i int, j string)")
- // analyze to get initial stats
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
- val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
- assert(fetched1.get.sizeInBytes == 0)
- assert(fetched1.get.colStats.size == 2)
-
- // insert into command
- sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
- assert(getStatsProperties(table).isEmpty)
+ Seq(false, true).foreach { autoUpdate =>
+ withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+ withTable(table) {
+ sql(s"CREATE TABLE $table (i int, j string)")
+ // analyze to get initial stats
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+ val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+ assert(fetched1.get.sizeInBytes == 0)
+ assert(fetched1.get.colStats.size == 2)
+
+ // insert into command
+ sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
+ if (autoUpdate) {
+ val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched2.get.sizeInBytes > 0)
+ assert(fetched2.get.colStats.isEmpty)
+ val statsProp = getStatsProperties(table)
+ assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes)
+ } else {
+ assert(getStatsProperties(table).isEmpty)
+ }
+ }
+ }
}
}
test("change stats after load data command") {
val table = "change_stats_load_table"
- withTable(table) {
- sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
- // analyze to get initial stats
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
- val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
- assert(fetched1.get.sizeInBytes == 0)
- assert(fetched1.get.colStats.size == 2)
-
- withTempDir { loadPath =>
- // load data command
- val file = new File(loadPath + "/data")
- val writer = new PrintWriter(file)
- writer.write("2,xyz")
- writer.close()
- sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
- assert(getStatsProperties(table).isEmpty)
+ Seq(false, true).foreach { autoUpdate =>
+ withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+ withTable(table) {
+ sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
+ // analyze to get initial stats
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+ val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+ assert(fetched1.get.sizeInBytes == 0)
+ assert(fetched1.get.colStats.size == 2)
+
+ withTempDir { loadPath =>
+ // load data command
+ val file = new File(loadPath + "/data")
+ val writer = new PrintWriter(file)
+ writer.write("2,xyz")
+ writer.close()
+ sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
+ if (autoUpdate) {
+ val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched2.get.sizeInBytes > 0)
+ assert(fetched2.get.colStats.isEmpty)
+ val statsProp = getStatsProperties(table)
+ assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes)
+ } else {
+ assert(getStatsProperties(table).isEmpty)
+ }
+ }
+ }
}
}
}
test("change stats after add/drop partition command") {
val table = "change_stats_part_table"
- withTable(table) {
- sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)")
- // table has two partitions initially
- for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
- sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'")
- }
- // analyze to get initial stats
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
- val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
- assert(fetched1.get.sizeInBytes > 0)
- assert(fetched1.get.colStats.size == 2)
-
- withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
- val file1 = new File(dir1 + "/data")
- val writer1 = new PrintWriter(file1)
- writer1.write("1,a")
- writer1.close()
-
- val file2 = new File(dir2 + "/data")
- val writer2 = new PrintWriter(file2)
- writer2.write("1,a")
- writer2.close()
-
- // add partition command
- sql(
- s"""
- |ALTER TABLE $table ADD
- |PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}'
- |PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}'
- """.stripMargin)
- assert(getStatsProperties(table).isEmpty)
-
- // generate stats again
- sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
- val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(4))
- assert(fetched2.get.sizeInBytes > 0)
- assert(fetched2.get.colStats.size == 2)
-
- // drop partition command
- sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')")
- // only one partition left
- assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
- .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
- assert(getStatsProperties(table).isEmpty)
+ Seq(false, true).foreach { autoUpdate =>
+ withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+ withTable(table) {
+ sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)")
+ // table has two partitions initially
+ for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
+ sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'")
+ }
+ // analyze to get initial stats
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+ val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
+ assert(fetched1.get.sizeInBytes > 0)
+ assert(fetched1.get.colStats.size == 2)
+
+ withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
+ val file1 = new File(dir1 + "/data")
+ val writer1 = new PrintWriter(file1)
+ writer1.write("1,a")
+ writer1.close()
+
+ val file2 = new File(dir2 + "/data")
+ val writer2 = new PrintWriter(file2)
+ writer2.write("1,a")
+ writer2.close()
+
+ // add partition command
+ sql(
+ s"""
+ |ALTER TABLE $table ADD
+ |PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}'
+ |PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}'
+ """.stripMargin)
+ if (autoUpdate) {
+ val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched2.get.sizeInBytes > fetched1.get.sizeInBytes)
+ assert(fetched2.get.colStats.isEmpty)
+ val statsProp = getStatsProperties(table)
+ assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes)
+ } else {
+ assert(getStatsProperties(table).isEmpty)
+ }
+
+ // now the table has four partitions, generate stats again
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+ val fetched3 = checkTableStats(
+ table, hasSizeInBytes = true, expectedRowCounts = Some(4))
+ assert(fetched3.get.sizeInBytes > 0)
+ assert(fetched3.get.colStats.size == 2)
+
+ // drop partition command
+ sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')")
+ assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
+ .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
+ // only one partition left
+ if (autoUpdate) {
+ val fetched4 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetched4.get.sizeInBytes < fetched1.get.sizeInBytes)
+ assert(fetched4.get.colStats.isEmpty)
+ val statsProp = getStatsProperties(table)
+ assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched4.get.sizeInBytes)
+ } else {
+ assert(getStatsProperties(table).isEmpty)
+ }
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org