You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (JIRA)" <ji...@apache.org> on 2019/05/26 15:04:00 UTC

[jira] [Updated] (SPARK-27843) Remove duplicate logic of calculate table size

     [ https://issues.apache.org/jira/browse/SPARK-27843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuming Wang updated SPARK-27843:
--------------------------------
    Description: 
{code:scala}
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    case relation: HiveTableRelation
        if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      val table = relation.tableMeta
      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
        try {
          val hadoopConf = session.sessionState.newHadoopConf()
          val tablePath = new Path(table.location)
          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
          fs.getContentSummary(tablePath).getLength
        } catch {
          case e: IOException =>
            logWarning("Failed to get table size from hdfs.", e)
            session.sessionState.conf.defaultSizeInBytes
        }
      } else {
        session.sessionState.conf.defaultSizeInBytes
      }

      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
      relation.copy(tableMeta = withStats)
  }
}
{code}
and
{code:scala}
def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {
    val sessionState = spark.sessionState
    if (catalogTable.partitionColumnNames.isEmpty) {
      calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
    } else {
      // Calculate table size as a sum of the visible partitions. See SPARK-21079
      val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
      if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
        val paths = partitions.map(x => new Path(x.storage.locationUri.get))
        val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
        val pathFilter = new PathFilter with Serializable {
          override def accept(path: Path): Boolean = {
            DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir)
          }
        }
        val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
          paths, sessionState.newHadoopConf(), pathFilter, spark)
        fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
      } else {
        partitions.map { p =>
          calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
        }.sum
      }
    }
  }
{code}

> Remove duplicate logic of calculate table size
> ----------------------------------------------
>
>                 Key: SPARK-27843
>                 URL: https://issues.apache.org/jira/browse/SPARK-27843
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> {code:scala}
> class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
>   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
>     case relation: HiveTableRelation
>         if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
>       val table = relation.tableMeta
>       val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
>         try {
>           val hadoopConf = session.sessionState.newHadoopConf()
>           val tablePath = new Path(table.location)
>           val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
>           fs.getContentSummary(tablePath).getLength
>         } catch {
>           case e: IOException =>
>             logWarning("Failed to get table size from hdfs.", e)
>             session.sessionState.conf.defaultSizeInBytes
>         }
>       } else {
>         session.sessionState.conf.defaultSizeInBytes
>       }
>       val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
>       relation.copy(tableMeta = withStats)
>   }
> }
> {code}
> and
> {code:scala}
> def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {
>     val sessionState = spark.sessionState
>     if (catalogTable.partitionColumnNames.isEmpty) {
>       calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
>     } else {
>       // Calculate table size as a sum of the visible partitions. See SPARK-21079
>       val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
>       if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
>         val paths = partitions.map(x => new Path(x.storage.locationUri.get))
>         val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
>         val pathFilter = new PathFilter with Serializable {
>           override def accept(path: Path): Boolean = {
>             DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir)
>           }
>         }
>         val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
>           paths, sessionState.newHadoopConf(), pathFilter, spark)
>         fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
>       } else {
>         partitions.map { p =>
>           calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
>         }.sum
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org