You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangyum <gi...@git.apache.org> on 2018/11/03 12:50:01 UTC
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user wangyum commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r230554142
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
+ val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
--- End diff --
Have you tested the performance of `session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates)` and `fs.getContentSummary(tablePath).getLength`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org