You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/14 01:58:04 UTC
spark git commit: [SPARK-20986][SQL] Reset table's statistics after
PruneFileSourcePartitions rule.
Repository: spark
Updated Branches:
refs/heads/master 9eb095243 -> 8b5b2e272
[SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePartitions rule.
## What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.
## How was this patch tested?
add unit test.
Author: lianhuiwang <li...@gmail.com>
Closes #18205 from lianhuiwang/SPARK-20986.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b5b2e27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b5b2e27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b5b2e27
Branch: refs/heads/master
Commit: 8b5b2e272f48f7ddf8aeece0205cb4a5853c364e
Parents: 9eb0952
Author: lianhuiwang <li...@gmail.com>
Authored: Wed Jun 14 09:57:56 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jun 14 09:57:56 2017 +0800
----------------------------------------------------------------------
.../datasources/PruneFileSourcePartitions.scala | 8 +++++--
.../PruneFileSourcePartitionsSuite.scala | 25 ++++++++++++++++++++
2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8b5b2e27/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 905b868..f5df184 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
@@ -59,8 +60,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileIndex)(sparkSession)
- val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
-
+ // Change table stats based on the sizeInBytes of pruned files
+ val withStats = logicalRelation.catalogTable.map(_.copy(
+ stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
+ val prunedLogicalRelation = logicalRelation.copy(
+ relation = prunedFsRelation, catalogTable = withStats)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
val filter = Filter(filterExpression, prunedLogicalRelation)
http://git-wip-us.apache.org/repos/asf/spark/blob/8b5b2e27/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index f818e29..d91f25a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
@@ -66,4 +67,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
}
}
}
+
+ test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
+ withTable("tbl") {
+ spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")
+ sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS")
+ val tableStats = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")).stats
+ assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost")
+
+ val df = sql("SELECT * FROM tbl WHERE p = 1")
+ val sizes1 = df.queryExecution.analyzed.collect {
+ case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes
+ }
+ assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}")
+ assert(sizes1(0) == tableStats.get.sizeInBytes)
+
+ val relations = df.queryExecution.optimizedPlan.collect {
+ case relation: LogicalRelation => relation
+ }
+ assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}")
+ val size2 = relations(0).computeStats(conf).sizeInBytes
+ assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes)
+ assert(size2 < tableStats.get.sizeInBytes)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org