You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (JIRA)" <ji...@apache.org> on 2019/05/26 15:04:00 UTC
[jira] [Updated] (SPARK-27843) Remove duplicate logic of calculate
table size
[ https://issues.apache.org/jira/browse/SPARK-27843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuming Wang updated SPARK-27843:
--------------------------------
Description:
{code:scala}
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
} else {
session.sessionState.conf.defaultSizeInBytes
}
val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
relation.copy(tableMeta = withStats)
}
}
{code}
and
{code:scala}
def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {
val sessionState = spark.sessionState
if (catalogTable.partitionColumnNames.isEmpty) {
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val pathFilter = new PathFilter with Serializable {
override def accept(path: Path): Boolean = {
DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir)
}
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, sessionState.newHadoopConf(), pathFilter, spark)
fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
} else {
partitions.map { p =>
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
}.sum
}
}
}
{code}
> Remove duplicate logic of calculate table size
> ----------------------------------------------
>
> Key: SPARK-27843
> URL: https://issues.apache.org/jira/browse/SPARK-27843
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Yuming Wang
> Priority: Major
>
> {code:scala}
> class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
> override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
> case relation: HiveTableRelation
> if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
> val table = relation.tableMeta
> val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
> try {
> val hadoopConf = session.sessionState.newHadoopConf()
> val tablePath = new Path(table.location)
> val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
> fs.getContentSummary(tablePath).getLength
> } catch {
> case e: IOException =>
> logWarning("Failed to get table size from hdfs.", e)
> session.sessionState.conf.defaultSizeInBytes
> }
> } else {
> session.sessionState.conf.defaultSizeInBytes
> }
> val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
> relation.copy(tableMeta = withStats)
> }
> }
> {code}
> and
> {code:scala}
> def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {
> val sessionState = spark.sessionState
> if (catalogTable.partitionColumnNames.isEmpty) {
> calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
> } else {
> // Calculate table size as a sum of the visible partitions. See SPARK-21079
> val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
> if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
> val paths = partitions.map(x => new Path(x.storage.locationUri.get))
> val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
> val pathFilter = new PathFilter with Serializable {
> override def accept(path: Path): Boolean = {
> DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir)
> }
> }
> val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
> paths, sessionState.newHadoopConf(), pathFilter, spark)
> fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
> } else {
> partitions.map { p =>
> calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
> }.sum
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org