You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangyum <gi...@git.apache.org> on 2018/11/03 12:50:01 UTC

[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22693#discussion_r230554142
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
     
     class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +    case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
    +      val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
    +      computeTableStats(relation, predicates)
         case relation: HiveTableRelation
             if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
    -      val table = relation.tableMeta
    -      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    -        try {
    -          val hadoopConf = session.sessionState.newHadoopConf()
    -          val tablePath = new Path(table.location)
    -          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    -          fs.getContentSummary(tablePath).getLength
    -        } catch {
    -          case e: IOException =>
    -            logWarning("Failed to get table size from hdfs.", e)
    -            session.sessionState.conf.defaultSizeInBytes
    -        }
    -      } else {
    -        session.sessionState.conf.defaultSizeInBytes
    +      computeTableStats(relation)
    +  }
    +
    +  private def computeTableStats(
    +      relation: HiveTableRelation,
    +      predicates: Seq[Expression] = Nil): LogicalPlan = {
    +    val table = relation.tableMeta
    +    val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    +      try {
    +        val hadoopConf = session.sessionState.newHadoopConf()
    +        val tablePath = new Path(table.location)
    +        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    +        BigInt(fs.getContentSummary(tablePath).getLength)
    +      } catch {
    +        case e: IOException =>
    +          logWarning("Failed to get table size from hdfs.", e)
    +          getSizeInBytesFromTablePartitions(table.identifier, predicates)
           }
    +    } else {
    +      getSizeInBytesFromTablePartitions(table.identifier, predicates)
    +    }
    +    val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
    +    relation.copy(tableMeta = withStats)
    +  }
     
    -      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
    -      relation.copy(tableMeta = withStats)
    +  private def getSizeInBytesFromTablePartitions(
    +      tableIdentifier: TableIdentifier,
    +      predicates: Seq[Expression] = Nil): BigInt = {
    +    session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
    --- End diff --
    
    Have you tested the performance of `session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates)` and `fs.getContentSummary(tablePath).getLength`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org