You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/09/29 16:52:58 UTC
[spark] branch branch-3.0 updated: [SPARK-33018][SQL] Fix estimate
statistics issue if child has 0 bytes
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ae8b35a [SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes
ae8b35a is described below
commit ae8b35a0d24f8c83bbbb597d668875793a8dbca6
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Sep 29 16:46:04 2020 +0000
[SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes
### What changes were proposed in this pull request?
This pr fix estimate statistics issue if child has 0 bytes.
### Why are the changes needed?
The `sizeInBytes` can be `0` when AQE and CBO are enabled(`spark.sql.adaptive.enabled`=true, `spark.sql.cbo.enabled`=true and `spark.sql.cbo.planStats.enabled`=true). This will generate incorrect BroadcastJoin, resulting in Driver OOM. For example:
![SPARK-33018](https://user-images.githubusercontent.com/5399861/94457606-647e3d00-01e7-11eb-85ee-812ae6efe7bb.jpg)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #29894 from wangyum/SPARK-33018.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 711d8dd28afd9af92b025f9908534e5f1d575042)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../SizeInBytesOnlyStatsPlanVisitor.scala | 3 ++-
.../statsEstimation/JoinEstimationSuite.scala | 22 ++++++++++++++++++++++
.../statsEstimation/StatsEstimationTestBase.scala | 9 ++++++---
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
index da36db7..a586988 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
@@ -53,7 +53,8 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
*/
override def default(p: LogicalPlan): Statistics = p match {
case p: LeafNode => p.computeStats()
- case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product)
+ case _: LogicalPlan =>
+ Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product)
}
override def visitAggregate(p: Aggregate): Statistics = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
index 6c5a2b2..cdfc863 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
@@ -551,4 +551,26 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
attributeStats = AttributeMap(Nil))
assert(join.stats == expectedStats)
}
+
+ test("SPARK-33018 Fix estimate statistics issue if child has 0 bytes") {
+ case class MyStatsTestPlan(
+ outputList: Seq[Attribute],
+ sizeInBytes: BigInt) extends LeafNode {
+ override def output: Seq[Attribute] = outputList
+ override def computeStats(): Statistics = Statistics(sizeInBytes = sizeInBytes)
+ }
+
+ val left = MyStatsTestPlan(
+ outputList = Seq("key-1-2", "key-2-4").map(nameToAttr),
+ sizeInBytes = BigInt(100))
+
+ val right = MyStatsTestPlan(
+ outputList = Seq("key-1-2", "key-2-3").map(nameToAttr),
+ sizeInBytes = BigInt(0))
+
+ val join = Join(left, right, LeftOuter,
+ Some(EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3"))), JoinHint.NONE)
+
+ assert(join.stats == Statistics(sizeInBytes = 100))
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
index 9dceca5..0a27e31 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -26,17 +26,20 @@ import org.apache.spark.sql.types.{IntegerType, StringType}
trait StatsEstimationTestBase extends SparkFunSuite {
- var originalValue: Boolean = false
+ var originalCBOValue: Boolean = false
+ var originalPlanStatsValue: Boolean = false
override def beforeAll(): Unit = {
super.beforeAll()
// Enable stats estimation based on CBO.
- originalValue = SQLConf.get.getConf(SQLConf.CBO_ENABLED)
+ originalCBOValue = SQLConf.get.getConf(SQLConf.CBO_ENABLED)
+ originalPlanStatsValue = SQLConf.get.getConf(SQLConf.PLAN_STATS_ENABLED)
SQLConf.get.setConf(SQLConf.CBO_ENABLED, true)
}
override def afterAll(): Unit = {
- SQLConf.get.setConf(SQLConf.CBO_ENABLED, originalValue)
+ SQLConf.get.setConf(SQLConf.CBO_ENABLED, originalCBOValue)
+ SQLConf.get.setConf(SQLConf.PLAN_STATS_ENABLED, originalPlanStatsValue)
super.afterAll()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org