You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by fjh100456 <gi...@git.apache.org> on 2018/10/11 06:57:04 UTC

[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

GitHub user fjh100456 opened a pull request:

    https://github.com/apache/spark/pull/22693

    [SPARK-25701][SQL] Supports calculation of table statistics from partition's catalog statistics.

    ## What changes were proposed in this pull request?
    
    When determine table statistics, if the `totalSize` of the table is not defined, we fallback to HDFS to get the table statistics when `spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default value(`spark.sql.defaultSizeInBytes`) will be taken, which will lead to tables without `totalSize` property may not be broadcast(Except parquet). 
    
    Fortunately, in most case the data is written into the table by a insertion command which will save the data-size in metastore, so it's possible to use metastore to calculate the table statistics.
    
    ## How was this patch tested?
    Add test.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fjh100456/spark StatisticCommit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22693
    
----
commit e610477063b4f326b8261d59b55abce83cbb82e7
Author: fjh100456 <fu...@...>
Date:   2018-10-11T06:43:52Z

    [SPARK-25701][SQL] Supports calculation of table statistics from partition's catalog statistics.
    
    ## What changes were proposed in this pull request?
    
    When obtaining table statistics, if the `totalSize` of the table is not defined, we fallback to HDFS to get the table statistics when `spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default value(`spark.sql.defaultSizeInBytes`) will be taken.
    
    Fortunately, in most case the data is written into the table by a insertion command which will save the data-size in metastore, so it's possible to use metastore to calculate the table statistics.
    
    ## How was this patch tested?
    Add test.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22693
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22693#discussion_r230554142
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
     
     class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +    case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
    +      val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
    +      computeTableStats(relation, predicates)
         case relation: HiveTableRelation
             if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
    -      val table = relation.tableMeta
    -      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    -        try {
    -          val hadoopConf = session.sessionState.newHadoopConf()
    -          val tablePath = new Path(table.location)
    -          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    -          fs.getContentSummary(tablePath).getLength
    -        } catch {
    -          case e: IOException =>
    -            logWarning("Failed to get table size from hdfs.", e)
    -            session.sessionState.conf.defaultSizeInBytes
    -        }
    -      } else {
    -        session.sessionState.conf.defaultSizeInBytes
    +      computeTableStats(relation)
    +  }
    +
    +  private def computeTableStats(
    +      relation: HiveTableRelation,
    +      predicates: Seq[Expression] = Nil): LogicalPlan = {
    +    val table = relation.tableMeta
    +    val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    +      try {
    +        val hadoopConf = session.sessionState.newHadoopConf()
    +        val tablePath = new Path(table.location)
    +        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    +        BigInt(fs.getContentSummary(tablePath).getLength)
    +      } catch {
    +        case e: IOException =>
    +          logWarning("Failed to get table size from hdfs.", e)
    +          getSizeInBytesFromTablePartitions(table.identifier, predicates)
           }
    +    } else {
    +      getSizeInBytesFromTablePartitions(table.identifier, predicates)
    +    }
    +    val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
    +    relation.copy(tableMeta = withStats)
    +  }
     
    -      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
    -      relation.copy(tableMeta = withStats)
    +  private def getSizeInBytesFromTablePartitions(
    +      tableIdentifier: TableIdentifier,
    +      predicates: Seq[Expression] = Nil): BigInt = {
    +    session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
    --- End diff --
    
    Have you tested the performance of `session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates)` and `fs.getContentSummary(tablePath).getLength`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22693
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22693#discussion_r230609824
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
     
     class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +    case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
    +      val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
    +      computeTableStats(relation, predicates)
         case relation: HiveTableRelation
             if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
    -      val table = relation.tableMeta
    -      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    -        try {
    -          val hadoopConf = session.sessionState.newHadoopConf()
    -          val tablePath = new Path(table.location)
    -          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    -          fs.getContentSummary(tablePath).getLength
    -        } catch {
    -          case e: IOException =>
    -            logWarning("Failed to get table size from hdfs.", e)
    -            session.sessionState.conf.defaultSizeInBytes
    -        }
    -      } else {
    -        session.sessionState.conf.defaultSizeInBytes
    +      computeTableStats(relation)
    +  }
    +
    +  private def computeTableStats(
    +      relation: HiveTableRelation,
    +      predicates: Seq[Expression] = Nil): LogicalPlan = {
    +    val table = relation.tableMeta
    +    val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    +      try {
    +        val hadoopConf = session.sessionState.newHadoopConf()
    +        val tablePath = new Path(table.location)
    +        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    +        BigInt(fs.getContentSummary(tablePath).getLength)
    +      } catch {
    +        case e: IOException =>
    +          logWarning("Failed to get table size from hdfs.", e)
    +          getSizeInBytesFromTablePartitions(table.identifier, predicates)
           }
    +    } else {
    +      getSizeInBytesFromTablePartitions(table.identifier, predicates)
    +    }
    +    val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
    +    relation.copy(tableMeta = withStats)
    +  }
     
    -      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
    -      relation.copy(tableMeta = withStats)
    +  private def getSizeInBytesFromTablePartitions(
    +      tableIdentifier: TableIdentifier,
    +      predicates: Seq[Expression] = Nil): BigInt = {
    +    session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
    --- End diff --
    
    The perf will be pretty bad when the number of partitions is huge. Thus, I think we can close this PR. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...

Posted by fjh100456 <gi...@git.apache.org>.
Github user fjh100456 commented on the issue:

    https://github.com/apache/spark/pull/22693
  
    This may not be a good solution, and a better one had been provided by @wangyum 
    Close it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22693#discussion_r232556859
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
     
     class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +    case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
    +      val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
    +      computeTableStats(relation, predicates)
         case relation: HiveTableRelation
             if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
    -      val table = relation.tableMeta
    -      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    -        try {
    -          val hadoopConf = session.sessionState.newHadoopConf()
    -          val tablePath = new Path(table.location)
    -          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    -          fs.getContentSummary(tablePath).getLength
    -        } catch {
    -          case e: IOException =>
    -            logWarning("Failed to get table size from hdfs.", e)
    -            session.sessionState.conf.defaultSizeInBytes
    -        }
    -      } else {
    -        session.sessionState.conf.defaultSizeInBytes
    +      computeTableStats(relation)
    +  }
    +
    +  private def computeTableStats(
    +      relation: HiveTableRelation,
    +      predicates: Seq[Expression] = Nil): LogicalPlan = {
    +    val table = relation.tableMeta
    +    val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    +      try {
    +        val hadoopConf = session.sessionState.newHadoopConf()
    +        val tablePath = new Path(table.location)
    +        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    +        BigInt(fs.getContentSummary(tablePath).getLength)
    +      } catch {
    +        case e: IOException =>
    +          logWarning("Failed to get table size from hdfs.", e)
    +          getSizeInBytesFromTablePartitions(table.identifier, predicates)
           }
    +    } else {
    +      getSizeInBytesFromTablePartitions(table.identifier, predicates)
    +    }
    +    val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
    +    relation.copy(tableMeta = withStats)
    +  }
     
    -      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
    -      relation.copy(tableMeta = withStats)
    +  private def getSizeInBytesFromTablePartitions(
    +      tableIdentifier: TableIdentifier,
    +      predicates: Seq[Expression] = Nil): BigInt = {
    +    session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
    --- End diff --
    
    How come https://github.com/apache/spark/pull/22743 solves this problem? That PR targets to invalidate cache when configurations are changed. This PR targets to compute stats from HDFS when they are not available.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22693
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22693#discussion_r230639634
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
     
     class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +    case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
    +      val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
    +      computeTableStats(relation, predicates)
         case relation: HiveTableRelation
             if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
    -      val table = relation.tableMeta
    -      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    -        try {
    -          val hadoopConf = session.sessionState.newHadoopConf()
    -          val tablePath = new Path(table.location)
    -          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    -          fs.getContentSummary(tablePath).getLength
    -        } catch {
    -          case e: IOException =>
    -            logWarning("Failed to get table size from hdfs.", e)
    -            session.sessionState.conf.defaultSizeInBytes
    -        }
    -      } else {
    -        session.sessionState.conf.defaultSizeInBytes
    +      computeTableStats(relation)
    +  }
    +
    +  private def computeTableStats(
    +      relation: HiveTableRelation,
    +      predicates: Seq[Expression] = Nil): LogicalPlan = {
    +    val table = relation.tableMeta
    +    val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    +      try {
    +        val hadoopConf = session.sessionState.newHadoopConf()
    +        val tablePath = new Path(table.location)
    +        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    +        BigInt(fs.getContentSummary(tablePath).getLength)
    +      } catch {
    +        case e: IOException =>
    +          logWarning("Failed to get table size from hdfs.", e)
    +          getSizeInBytesFromTablePartitions(table.identifier, predicates)
           }
    +    } else {
    +      getSizeInBytesFromTablePartitions(table.identifier, predicates)
    +    }
    +    val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
    +    relation.copy(tableMeta = withStats)
    +  }
     
    -      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
    -      relation.copy(tableMeta = withStats)
    +  private def getSizeInBytesFromTablePartitions(
    +      tableIdentifier: TableIdentifier,
    +      predicates: Seq[Expression] = Nil): BigInt = {
    +    session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
    --- End diff --
    
    After [this refactor](https://github.com/apache/spark/pull/22743). We can avoid compute stats if `LogicalRelation` already cached. because the computed stats will not take effect. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Posted by fjh100456 <gi...@git.apache.org>.
Github user fjh100456 closed the pull request at:

    https://github.com/apache/spark/pull/22693


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

Posted by fjh100456 <gi...@git.apache.org>.
Github user fjh100456 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22693#discussion_r230726170
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
     
     class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +    case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
    +      val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
    +      computeTableStats(relation, predicates)
         case relation: HiveTableRelation
             if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
    -      val table = relation.tableMeta
    -      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    -        try {
    -          val hadoopConf = session.sessionState.newHadoopConf()
    -          val tablePath = new Path(table.location)
    -          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    -          fs.getContentSummary(tablePath).getLength
    -        } catch {
    -          case e: IOException =>
    -            logWarning("Failed to get table size from hdfs.", e)
    -            session.sessionState.conf.defaultSizeInBytes
    -        }
    -      } else {
    -        session.sessionState.conf.defaultSizeInBytes
    +      computeTableStats(relation)
    +  }
    +
    +  private def computeTableStats(
    +      relation: HiveTableRelation,
    +      predicates: Seq[Expression] = Nil): LogicalPlan = {
    +    val table = relation.tableMeta
    +    val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
    +      try {
    +        val hadoopConf = session.sessionState.newHadoopConf()
    +        val tablePath = new Path(table.location)
    +        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
    +        BigInt(fs.getContentSummary(tablePath).getLength)
    +      } catch {
    +        case e: IOException =>
    +          logWarning("Failed to get table size from hdfs.", e)
    +          getSizeInBytesFromTablePartitions(table.identifier, predicates)
           }
    +    } else {
    +      getSizeInBytesFromTablePartitions(table.identifier, predicates)
    +    }
    +    val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
    +    relation.copy(tableMeta = withStats)
    +  }
     
    -      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
    -      relation.copy(tableMeta = withStats)
    +  private def getSizeInBytesFromTablePartitions(
    +      tableIdentifier: TableIdentifier,
    +      predicates: Seq[Expression] = Nil): BigInt = {
    +    session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
    --- End diff --
    
    Your solution seems much better.  @wangyum 
    Thank you.  I'll close it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org