You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by fjh100456 <gi...@git.apache.org> on 2018/10/11 06:57:04 UTC
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
GitHub user fjh100456 opened a pull request:
https://github.com/apache/spark/pull/22693
[SPARK-25701][SQL] Supports calculation of table statistics from partition's catalog statistics.
## What changes were proposed in this pull request?
When determine table statistics, if the `totalSize` of the table is not defined, we fallback to HDFS to get the table statistics when `spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default value(`spark.sql.defaultSizeInBytes`) will be taken, which will lead to tables without `totalSize` property may not be broadcast(Except parquet).
Fortunately, in most case the data is written into the table by a insertion command which will save the data-size in metastore, so it's possible to use metastore to calculate the table statistics.
## How was this patch tested?
Add test.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/fjh100456/spark StatisticCommit
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22693.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22693
----
commit e610477063b4f326b8261d59b55abce83cbb82e7
Author: fjh100456 <fu...@...>
Date: 2018-10-11T06:43:52Z
[SPARK-25701][SQL] Supports calculation of table statistics from partition's catalog statistics.
## What changes were proposed in this pull request?
When obtaining table statistics, if the `totalSize` of the table is not defined, we fallback to HDFS to get the table statistics when `spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default value(`spark.sql.defaultSizeInBytes`) will be taken.
Fortunately, in most case the data is written into the table by a insertion command which will save the data-size in metastore, so it's possible to use metastore to calculate the table statistics.
## How was this patch tested?
Add test.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22693
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r230554142
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
+ val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
--- End diff --
Have you tested the performance of `session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates)` and `fs.getContentSummary(tablePath).getLength`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22693
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r230609824
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
+ val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
--- End diff --
The perf will be pretty bad when the number of partitions is huge. Thus, I think we can close this PR.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...
Posted by fjh100456 <gi...@git.apache.org>.
Github user fjh100456 commented on the issue:
https://github.com/apache/spark/pull/22693
This may not be a good solution, and a better one had been provided by @wangyum
Close it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r232556859
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
+ val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
--- End diff --
How come https://github.com/apache/spark/pull/22743 solves this problem? That PR targets to invalidate cache when configurations are changed. This PR targets to compute stats from HDFS when they are not available.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22693: [SPARK-25701][SQL] Supports calculation of table statist...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22693
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r230639634
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
+ val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
--- End diff --
After [this refactor](https://github.com/apache/spark/pull/22743). We can avoid compute stats if `LogicalRelation` already cached. because the computed stats will not take effect.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Posted by fjh100456 <gi...@git.apache.org>.
Github user fjh100456 closed the pull request at:
https://github.com/apache/spark/pull/22693
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Posted by fjh100456 <gi...@git.apache.org>.
Github user fjh100456 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r230726170
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) =>
+ val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match {
--- End diff --
Your solution seems much better. @wangyum
Thank you. I'll close it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org