You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/01/11 06:34:47 UTC
spark git commit: [SPARK-19149][SQL] Unify two sets of statistics in
LogicalPlan
Repository: spark
Updated Branches:
refs/heads/master 3b19c74e7 -> a61551356
[SPARK-19149][SQL] Unify two sets of statistics in LogicalPlan
## What changes were proposed in this pull request?
Currently we have two sets of statistics in LogicalPlan: a simple stats and a stats estimated by cbo, but the computing logic and naming are quite confusing, we need to unify these two sets of stats.
## How was this patch tested?
Just modify existing tests.
Author: wangzhenhua <wa...@huawei.com>
Author: Zhenhua Wang <wz...@163.com>
Closes #16529 from wzhfy/unifyStats.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6155135
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6155135
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6155135
Branch: refs/heads/master
Commit: a6155135690433988aa0cbf22f260f52a235e9f5
Parents: 3b19c74
Author: wangzhenhua <wa...@huawei.com>
Authored: Tue Jan 10 22:34:44 2017 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jan 10 22:34:44 2017 -0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 2 +-
.../catalyst/plans/logical/LocalRelation.scala | 6 +-
.../catalyst/plans/logical/LogicalPlan.scala | 48 +++++-------
.../plans/logical/basicLogicalOperators.scala | 82 +++++++++++---------
.../statsEstimation/AggregateEstimation.scala | 7 +-
.../statsEstimation/EstimationUtils.scala | 5 +-
.../statsEstimation/ProjectEstimation.scala | 7 +-
.../analysis/DecimalPrecisionSuite.scala | 1 -
.../SubstituteUnresolvedOrdinalsSuite.scala | 1 -
.../optimizer/AggregateOptimizeSuite.scala | 2 +-
.../optimizer/EliminateSortsSuite.scala | 2 +-
.../optimizer/JoinOptimizationSuite.scala | 2 +-
.../catalyst/optimizer/LimitPushdownSuite.scala | 8 +-
.../RewriteDistinctAggregatesSuite.scala | 2 +-
.../spark/sql/catalyst/plans/PlanTest.scala | 4 +
.../statsEstimation/AggEstimationSuite.scala | 2 +-
.../ProjectEstimationSuite.scala | 2 +-
.../statsEstimation/StatsConfSuite.scala | 64 +++++++++++++++
.../StatsEstimationTestBase.scala | 6 +-
.../spark/sql/execution/ExistingRDD.scala | 6 +-
.../spark/sql/execution/SparkStrategies.scala | 12 +--
.../execution/columnar/InMemoryRelation.scala | 4 +-
.../execution/datasources/LogicalRelation.scala | 3 +-
.../spark/sql/execution/streaming/memory.scala | 4 +-
.../org/apache/spark/sql/CachedTableSuite.scala | 2 +-
.../org/apache/spark/sql/DatasetSuite.scala | 2 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 2 +-
.../spark/sql/StatisticsCollectionSuite.scala | 22 +++---
.../columnar/InMemoryColumnarQuerySuite.scala | 2 +-
.../datasources/HadoopFsRelationSuite.scala | 2 +-
.../execution/streaming/MemorySinkSuite.scala | 8 +-
.../statsEstimation/StatsEstimationSuite.scala | 67 ----------------
.../org/apache/spark/sql/test/SQLTestData.scala | 3 +
.../spark/sql/test/SharedSQLContext.scala | 1 +
.../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +-
.../spark/sql/hive/MetastoreRelation.scala | 3 +-
.../apache/spark/sql/hive/StatisticsSuite.scala | 10 +--
37 files changed, 212 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d1f90e6..cef17b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -253,7 +253,7 @@ case class LimitPushDown(conf: CatalystConf) extends Rule[LogicalPlan] {
case FullOuter =>
(left.maxRows, right.maxRows) match {
case (None, None) =>
- if (left.planStats(conf).sizeInBytes >= right.planStats(conf).sizeInBytes) {
+ if (left.stats(conf).sizeInBytes >= right.stats(conf).sizeInBytes) {
join.copy(left = maybePushLimit(exp, left))
} else {
join.copy(right = maybePushLimit(exp, right))
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index 91633f5..1faabcf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
import org.apache.spark.sql.types.{StructField, StructType}
@@ -74,9 +74,9 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
}
- override lazy val statistics =
+ override def computeStats(conf: CatalystConf): Statistics =
Statistics(sizeInBytes =
- (output.map(n => BigInt(n.dataType.defaultSize))).sum * data.length)
+ output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
def toSQL(inlineTableName: String): String = {
require(data.nonEmpty)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4f634cb..9e5ba9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -81,6 +81,21 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}
}
+ /** A cache for the estimated statistics, such that it will only be computed once. */
+ private val statsCache = new ThreadLocal[Option[Statistics]] {
+ override protected def initialValue: Option[Statistics] = None
+ }
+
+ def stats(conf: CatalystConf): Statistics = statsCache.get.getOrElse {
+ statsCache.set(Some(computeStats(conf)))
+ statsCache.get.get
+ }
+
+ def invalidateStatsCache(): Unit = {
+ statsCache.set(None)
+ children.foreach(_.invalidateStatsCache())
+ }
+
/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
* cardinality is the product of all child plan's cardinality, i.e. applies in the case
@@ -88,37 +103,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*
* [[LeafNode]]s must override this.
*/
- def statistics: Statistics = {
+ protected def computeStats(conf: CatalystConf): Statistics = {
if (children.isEmpty) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
- Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
+ Statistics(sizeInBytes = children.map(_.stats(conf).sizeInBytes).product)
}
/**
- * Returns the default statistics or statistics estimated by cbo based on configuration.
- */
- final def planStats(conf: CatalystConf): Statistics = {
- if (conf.cboEnabled) {
- if (estimatedStats.isEmpty) {
- estimatedStats = Some(cboStatistics(conf))
- }
- estimatedStats.get
- } else {
- statistics
- }
- }
-
- /**
- * Returns statistics estimated by cbo. If the plan doesn't override this, it returns the
- * default statistics.
- */
- protected def cboStatistics(conf: CatalystConf): Statistics = statistics
-
- /** A cache for the estimated statistics, such that it will only be computed once. */
- private var estimatedStats: Option[Statistics] = None
-
- /**
* Returns the maximum number of rows that this plan may compute.
*
* Any operator that a Limit can be pushed passed should override this function (e.g., Union).
@@ -334,20 +326,20 @@ abstract class UnaryNode extends LogicalPlan {
override protected def validConstraints: Set[Expression] = child.constraints
- override def statistics: Statistics = {
+ override def computeStats(conf: CatalystConf): Statistics = {
// There should be some overhead in Row object, the size should not be zero when there is
// no columns, this help to prevent divide-by-zero error.
val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8
val outputRowSize = output.map(_.dataType.defaultSize).sum + 8
// Assume there will be the same number of rows as child has.
- var sizeInBytes = (child.statistics.sizeInBytes * outputRowSize) / childRowSize
+ var sizeInBytes = (child.stats(conf).sizeInBytes * outputRowSize) / childRowSize
if (sizeInBytes == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
sizeInBytes = 1
}
- child.statistics.copy(sizeInBytes = sizeInBytes)
+ child.stats(conf).copy(sizeInBytes = sizeInBytes)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b97c81c..9bdae5e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -55,8 +55,13 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
override def validConstraints: Set[Expression] =
child.constraints.union(getAliasedConstraints(projectList))
- override lazy val statistics: Statistics =
- ProjectEstimation.estimate(this).getOrElse(super.statistics)
+ override def computeStats(conf: CatalystConf): Statistics = {
+ if (conf.cboEnabled) {
+ ProjectEstimation.estimate(conf, this).getOrElse(super.computeStats(conf))
+ } else {
+ super.computeStats(conf)
+ }
+ }
}
/**
@@ -162,11 +167,11 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
}
}
- override lazy val statistics: Statistics = {
- val leftSize = left.statistics.sizeInBytes
- val rightSize = right.statistics.sizeInBytes
+ override def computeStats(conf: CatalystConf): Statistics = {
+ val leftSize = left.stats(conf).sizeInBytes
+ val rightSize = right.stats(conf).sizeInBytes
val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
- val isBroadcastable = left.statistics.isBroadcastable || right.statistics.isBroadcastable
+ val isBroadcastable = left.stats(conf).isBroadcastable || right.stats(conf).isBroadcastable
Statistics(sizeInBytes = sizeInBytes, isBroadcastable = isBroadcastable)
}
@@ -179,8 +184,8 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le
override protected def validConstraints: Set[Expression] = leftConstraints
- override lazy val statistics: Statistics = {
- left.statistics.copy()
+ override def computeStats(conf: CatalystConf): Statistics = {
+ left.stats(conf).copy()
}
}
@@ -218,8 +223,8 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
children.length > 1 && childrenResolved && allChildrenCompatible
}
- override lazy val statistics: Statistics = {
- val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
+ override def computeStats(conf: CatalystConf): Statistics = {
+ val sizeInBytes = children.map(_.stats(conf).sizeInBytes).sum
Statistics(sizeInBytes = sizeInBytes)
}
@@ -327,14 +332,14 @@ case class Join(
case _ => resolvedExceptNatural
}
- override lazy val statistics: Statistics = joinType match {
+ override def computeStats(conf: CatalystConf): Statistics = joinType match {
case LeftAnti | LeftSemi =>
// LeftSemi and LeftAnti won't ever be bigger than left
- left.statistics.copy()
+ left.stats(conf).copy()
case _ =>
// make sure we don't propagate isBroadcastable in other joins, because
// they could explode the size.
- super.statistics.copy(isBroadcastable = false)
+ super.computeStats(conf).copy(isBroadcastable = false)
}
}
@@ -345,7 +350,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
// set isBroadcastable to true so the child will be broadcasted
- override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
+ override def computeStats(conf: CatalystConf): Statistics =
+ super.computeStats(conf).copy(isBroadcastable = true)
}
/**
@@ -462,7 +468,7 @@ case class Range(
override def newInstance(): Range = copy(output = output.map(_.newInstance()))
- override lazy val statistics: Statistics = {
+ override def computeStats(conf: CatalystConf): Statistics = {
val sizeInBytes = LongType.defaultSize * numElements
Statistics( sizeInBytes = sizeInBytes )
}
@@ -495,11 +501,19 @@ case class Aggregate(
child.constraints.union(getAliasedConstraints(nonAgg))
}
- override lazy val statistics: Statistics = AggregateEstimation.estimate(this).getOrElse {
- if (groupingExpressions.isEmpty) {
- super.statistics.copy(sizeInBytes = 1)
+ override def computeStats(conf: CatalystConf): Statistics = {
+ def simpleEstimation: Statistics = {
+ if (groupingExpressions.isEmpty) {
+ super.computeStats(conf).copy(sizeInBytes = 1)
+ } else {
+ super.computeStats(conf)
+ }
+ }
+
+ if (conf.cboEnabled) {
+ AggregateEstimation.estimate(conf, this).getOrElse(simpleEstimation)
} else {
- super.statistics
+ simpleEstimation
}
}
}
@@ -600,8 +614,8 @@ case class Expand(
override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))
- override lazy val statistics: Statistics = {
- val sizeInBytes = super.statistics.sizeInBytes * projections.length
+ override def computeStats(conf: CatalystConf): Statistics = {
+ val sizeInBytes = super.computeStats(conf).sizeInBytes * projections.length
Statistics(sizeInBytes = sizeInBytes)
}
@@ -671,7 +685,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
case _ => None
}
}
- override lazy val statistics: Statistics = {
+ override def computeStats(conf: CatalystConf): Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = if (limit == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
@@ -680,7 +694,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
} else {
(limit: Long) * output.map(a => a.dataType.defaultSize).sum
}
- child.statistics.copy(sizeInBytes = sizeInBytes)
+ child.stats(conf).copy(sizeInBytes = sizeInBytes)
}
}
@@ -692,7 +706,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
case _ => None
}
}
- override lazy val statistics: Statistics = {
+ override def computeStats(conf: CatalystConf): Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = if (limit == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
@@ -701,7 +715,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
} else {
(limit: Long) * output.map(a => a.dataType.defaultSize).sum
}
- child.statistics.copy(sizeInBytes = sizeInBytes)
+ child.stats(conf).copy(sizeInBytes = sizeInBytes)
}
}
@@ -735,14 +749,14 @@ case class Sample(
override def output: Seq[Attribute] = child.output
- override lazy val statistics: Statistics = {
+ override def computeStats(conf: CatalystConf): Statistics = {
val ratio = upperBound - lowerBound
// BigInt can't multiply with Double
- var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
+ var sizeInBytes = child.stats(conf).sizeInBytes * (ratio * 100).toInt / 100
if (sizeInBytes == 0) {
sizeInBytes = 1
}
- child.statistics.copy(sizeInBytes = sizeInBytes)
+ child.stats(conf).copy(sizeInBytes = sizeInBytes)
}
override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil
@@ -796,13 +810,5 @@ case class RepartitionByExpression(
case object OneRowRelation extends LeafNode {
override def maxRows: Option[Long] = Some(1)
override def output: Seq[Attribute] = Nil
-
- /**
- * Computes [[Statistics]] for this plan. The default implementation assumes the output
- * cardinality is the product of all child plan's cardinality, i.e. applies in the case
- * of cartesian joins.
- *
- * [[LeafNode]]s must override this.
- */
- override lazy val statistics: Statistics = Statistics(sizeInBytes = 1)
+ override def computeStats(conf: CatalystConf): Statistics = Statistics(sizeInBytes = 1)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index 33ebc38..af67343 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Statistics}
@@ -28,13 +29,13 @@ object AggregateEstimation {
* Estimate the number of output rows based on column stats of group-by columns, and propagate
* column stats for aggregate expressions.
*/
- def estimate(agg: Aggregate): Option[Statistics] = {
- val childStats = agg.child.statistics
+ def estimate(conf: CatalystConf, agg: Aggregate): Option[Statistics] = {
+ val childStats = agg.child.stats(conf)
// Check if we have column stats for all group-by columns.
val colStatsExist = agg.groupingExpressions.forall { e =>
e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute])
}
- if (rowCountsExist(agg.child) && colStatsExist) {
+ if (rowCountsExist(conf, agg.child) && colStatsExist) {
// Multiply distinct counts of group-by columns. This is an upper bound, which assumes
// the data contains all combinations of distinct values of group-by columns.
var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index f099e32..c7eb6f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.types.StringType
@@ -25,8 +26,8 @@ import org.apache.spark.sql.types.StringType
object EstimationUtils {
/** Check if each plan has rowCount in its statistics. */
- def rowCountsExist(plans: LogicalPlan*): Boolean =
- plans.forall(_.statistics.rowCount.isDefined)
+ def rowCountsExist(conf: CatalystConf, plans: LogicalPlan*): Boolean =
+ plans.forall(_.stats(conf).rowCount.isDefined)
/** Get column stats for output attributes. */
def getOutputMap(inputMap: AttributeMap[ColumnStat], output: Seq[Attribute])
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
index 6d63b09..69c546b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
@@ -17,15 +17,16 @@
package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{Project, Statistics}
object ProjectEstimation {
import EstimationUtils._
- def estimate(project: Project): Option[Statistics] = {
- if (rowCountsExist(project.child)) {
- val childStats = project.child.statistics
+ def estimate(conf: CatalystConf, project: Project): Option[Statistics] = {
+ if (rowCountsExist(conf, project.child)) {
+ val childStats = project.child.stats(conf)
val inputAttrStats = childStats.attributeStats
// Match alias with its child's column stat
val aliasStats = project.expressions.collect {
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 66d9b4c..6995fae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.types._
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
- private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
private val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
private val analyzer = new Analyzer(catalog, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala
index 3c429eb..88f68eb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.SimpleCatalystConf
class SubstituteUnresolvedOrdinalsSuite extends AnalysisTest {
- private lazy val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
private lazy val a = testRelation2.output(0)
private lazy val b = testRelation2.output(1)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
index aecf59a..b45bd97 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
class AggregateOptimizeSuite extends PlanTest {
- val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false)
+ override val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
val analyzer = new Analyzer(catalog, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 7402918..c5f9cc1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
class EliminateSortsSuite extends PlanTest {
- val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
+ override val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
val analyzer = new Analyzer(catalog, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index 087718b..65dd622 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -143,7 +143,7 @@ class JoinOptimizationSuite extends PlanTest {
comparePlans(optimized, expected)
val broadcastChildren = optimized.collect {
- case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
+ case Join(_, r, _, _) if r.stats(conf).sizeInBytes == 1 => r
}
assert(broadcastChildren.size == 1)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 9ec9983..0f3ba6c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -33,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Limit pushdown", FixedPoint(100),
- LimitPushDown(SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ LimitPushDown(conf),
CombineLimits,
ConstantFolding,
BooleanSimplification) :: Nil
@@ -111,7 +111,7 @@ class LimitPushdownSuite extends PlanTest {
}
test("full outer join where neither side is limited and both sides have same statistics") {
- assert(x.statistics.sizeInBytes === y.statistics.sizeInBytes)
+ assert(x.stats(conf).sizeInBytes === y.stats(conf).sizeInBytes)
val originalQuery = x.join(y, FullOuter).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, FullOuter)).analyze
@@ -120,7 +120,7 @@ class LimitPushdownSuite extends PlanTest {
test("full outer join where neither side is limited and left side has larger statistics") {
val xBig = testRelation.copy(data = Seq.fill(2)(null)).subquery('x)
- assert(xBig.statistics.sizeInBytes > y.statistics.sizeInBytes)
+ assert(xBig.stats(conf).sizeInBytes > y.stats(conf).sizeInBytes)
val originalQuery = xBig.join(y, FullOuter).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, xBig).join(y, FullOuter)).analyze
@@ -129,7 +129,7 @@ class LimitPushdownSuite extends PlanTest {
test("full outer join where neither side is limited and right side has larger statistics") {
val yBig = testRelation.copy(data = Seq.fill(2)(null)).subquery('y)
- assert(x.statistics.sizeInBytes < yBig.statistics.sizeInBytes)
+ assert(x.stats(conf).sizeInBytes < yBig.stats(conf).sizeInBytes)
val originalQuery = x.join(yBig, FullOuter).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, x.join(LocalLimit(1, yBig), FullOuter)).analyze
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
index 5c1faae..350a1c2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LocalRela
import org.apache.spark.sql.types.{IntegerType, StringType}
class RewriteDistinctAggregatesSuite extends PlanTest {
- val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false)
+ override val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
val analyzer = new Analyzer(catalog, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 64e2687..3b7e5e9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical._
@@ -27,6 +28,9 @@ import org.apache.spark.sql.catalyst.util._
* Provides helper methods for comparing plans.
*/
abstract class PlanTest extends SparkFunSuite with PredicateHelper {
+
+ protected val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
+
/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggEstimationSuite.scala
index 42ce2f8..ff79122 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggEstimationSuite.scala
@@ -130,6 +130,6 @@ class AggEstimationSuite extends StatsEstimationTestBase {
rowCount = Some(expectedRowCount),
attributeStats = expectedAttrStats)
- assert(testAgg.statistics == expectedStats)
+ assert(testAgg.stats(conf) == expectedStats)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
index 4a1bed8..a613f0f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
@@ -46,6 +46,6 @@ class ProjectEstimationSuite extends StatsEstimationTestBase {
sizeInBytes = 2 * getRowSize(project.output, expectedAttrStats),
rowCount = Some(2),
attributeStats = expectedAttrStats)
- assert(project.statistics == expectedStats)
+ assert(project.stats(conf) == expectedStats)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsConfSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsConfSuite.scala
new file mode 100644
index 0000000..212d57a
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsConfSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
+import org.apache.spark.sql.types.IntegerType
+
+
+class StatsConfSuite extends StatsEstimationTestBase {
+ test("estimate statistics when the conf changes") {
+ val expectedDefaultStats =
+ Statistics(
+ sizeInBytes = 40,
+ rowCount = Some(10),
+ attributeStats = AttributeMap(Seq(
+ AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))),
+ isBroadcastable = false)
+ val expectedCboStats =
+ Statistics(
+ sizeInBytes = 4,
+ rowCount = Some(1),
+ attributeStats = AttributeMap(Seq(
+ AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))),
+ isBroadcastable = false)
+
+ val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
+ // Return the statistics estimated by cbo
+ assert(plan.stats(conf.copy(cboEnabled = true)) == expectedCboStats)
+ // Invalidate statistics
+ plan.invalidateStatsCache()
+ // Return the simple statistics
+ assert(plan.stats(conf.copy(cboEnabled = false)) == expectedDefaultStats)
+ }
+}
+
+/**
+ * This class is used for unit-testing the cbo switch, it mimics a logical plan which computes
+ * a simple statistics or a cbo estimated statistics based on the conf.
+ */
+private case class DummyLogicalPlan(
+ defaultStats: Statistics,
+ cboStats: Statistics) extends LogicalPlan {
+ override def output: Seq[Attribute] = Nil
+ override def children: Seq[LogicalPlan] = Nil
+ override def computeStats(conf: CatalystConf): Statistics =
+ if (conf.cboEnabled) cboStats else defaultStats
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
index 0d81aa3..0635309 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.statsEstimation
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.types.IntegerType
@@ -25,6 +26,9 @@ import org.apache.spark.sql.types.IntegerType
class StatsEstimationTestBase extends SparkFunSuite {
+ /** Enable stats estimation based on CBO. */
+ protected val conf = SimpleCatalystConf(caseSensitiveAnalysis = true, cboEnabled = true)
+
def attr(colName: String): AttributeReference = AttributeReference(colName, IntegerType)()
/** Convert (column name, column stat) pairs to an AttributeMap based on plan output. */
@@ -40,5 +44,5 @@ class StatsEstimationTestBase extends SparkFunSuite {
*/
protected case class StatsTestPlan(outputList: Seq[Attribute], stats: Statistics) extends LeafNode {
override def output: Seq[Attribute] = outputList
- override lazy val statistics = stats
+ override def computeStats(conf: CatalystConf): Statistics = stats
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index aab087c..49336f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -95,7 +95,7 @@ case class ExternalRDD[T](
override protected def stringArgs: Iterator[Any] = Iterator(output)
- @transient override lazy val statistics: Statistics = Statistics(
+ @transient override def computeStats(conf: CatalystConf): Statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
@@ -170,7 +170,7 @@ case class LogicalRDD(
override protected def stringArgs: Iterator[Any] = Iterator(output)
- @transient override lazy val statistics: Statistics = Statistics(
+ @transient override def computeStats(conf: CatalystConf): Statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 1257d17..fafb919 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -114,9 +114,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
- plan.planStats(conf).isBroadcastable ||
- (plan.planStats(conf).sizeInBytes >= 0 &&
- plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
+ plan.stats(conf).isBroadcastable ||
+ (plan.stats(conf).sizeInBytes >= 0 &&
+ plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
}
/**
@@ -126,7 +126,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
- plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
+ plan.stats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
@@ -137,7 +137,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
- a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes
+ a.stats(conf).sizeInBytes * 3 <= b.stats(conf).sizeInBytes
}
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
@@ -206,7 +206,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, joinType, condition) =>
val buildSide =
- if (right.planStats(conf).sizeInBytes <= left.planStats(conf).sizeInBytes) {
+ if (right.stats(conf).sizeInBytes <= left.stats(conf).sizeInBytes) {
BuildRight
} else {
BuildLeft
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 03cc046..37bd95e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -21,7 +21,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
@@ -69,7 +69,7 @@ case class InMemoryRelation(
@transient val partitionStatistics = new PartitionStatistics(output)
- override lazy val statistics: Statistics = {
+ override def computeStats(conf: CatalystConf): Statistics = {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3fd4038..04a764b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
@@ -72,7 +73,7 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)
- @transient override lazy val statistics: Statistics = {
+ @transient override def computeStats(conf: CatalystConf): Statistics = {
catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
Statistics(sizeInBytes = relation.sizeInBytes))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 91da6b3..6d34d51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
@@ -229,5 +230,6 @@ case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode
private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum
- override def statistics: Statistics = Statistics(sizePerRow * sink.allData.size)
+ override def computeStats(conf: CatalystConf): Statistics =
+ Statistics(sizePerRow * sink.allData.size)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index fb4812a..339262a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -305,7 +305,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
spark.table("testData").queryExecution.withCachedData.collect {
case cached: InMemoryRelation =>
val actualSizeInBytes = (1 to 100).map(i => 4 + i.toString.length + 4).sum
- assert(cached.statistics.sizeInBytes === actualSizeInBytes)
+ assert(cached.stats(sqlConf).sizeInBytes === actualSizeInBytes)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c27b815..731a28c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1115,7 +1115,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
// instead of Int for avoiding possible overflow.
val ds = (0 to 10000).map( i =>
(i, Seq((i, Seq((i, "This is really not that long of a string")))))).toDS()
- val sizeInBytes = ds.logicalPlan.statistics.sizeInBytes
+ val sizeInBytes = ds.logicalPlan.stats(sqlConf).sizeInBytes
// sizeInBytes is 2404280404, before the fix, it overflows to a negative number
assert(sizeInBytes > 0)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 913b2ae..f780fc0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -32,7 +32,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
setupTestData()
def statisticSizeInByte(df: DataFrame): BigInt = {
- df.queryExecution.optimizedPlan.statistics.sizeInBytes
+ df.queryExecution.optimizedPlan.stats(sqlConf).sizeInBytes
}
test("equi-join is hash-join") {
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 18abb18..bd1ce8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -59,7 +59,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
val df = df1.join(df2, Seq("k"), "left")
val sizes = df.queryExecution.analyzed.collect { case g: Join =>
- g.statistics.sizeInBytes
+ g.stats(conf).sizeInBytes
}
assert(sizes.size === 1, s"number of Join nodes is wrong:\n ${df.queryExecution}")
@@ -106,9 +106,9 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType))
- assert(df.queryExecution.analyzed.statistics.sizeInBytes >
+ assert(df.queryExecution.analyzed.stats(conf).sizeInBytes >
spark.sessionState.conf.autoBroadcastJoinThreshold)
- assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes >
+ assert(df.selectExpr("a").queryExecution.analyzed.stats(conf).sizeInBytes >
spark.sessionState.conf.autoBroadcastJoinThreshold)
}
@@ -120,14 +120,14 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
val df = sql(s"""SELECT * FROM test limit $limit""")
val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit =>
- g.statistics.sizeInBytes
+ g.stats(conf).sizeInBytes
}
assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizesGlobalLimit.head === BigInt(expected),
s"expected exact size $expected for table 'test', got: ${sizesGlobalLimit.head}")
val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit =>
- l.statistics.sizeInBytes
+ l.stats(conf).sizeInBytes
}
assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizesLocalLimit.head === BigInt(expected),
@@ -250,13 +250,13 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
test("SPARK-18856: non-empty partitioned table should not report zero size") {
withTable("ds_tbl", "hive_tbl") {
spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl")
- val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.statistics
+ val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats(conf)
assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
- val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.statistics
+ val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats(conf)
assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
}
}
@@ -296,10 +296,10 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
// Check relation statistics
- assert(relation.statistics.sizeInBytes == 0)
- assert(relation.statistics.rowCount == Some(0))
- assert(relation.statistics.attributeStats.size == 1)
- val (attribute, colStat) = relation.statistics.attributeStats.head
+ assert(relation.stats(conf).sizeInBytes == 0)
+ assert(relation.stats(conf).rowCount == Some(0))
+ assert(relation.stats(conf).attributeStats.size == 1)
+ val (attribute, colStat) = relation.stats(conf).attributeStats.head
assert(attribute.name == "c1")
assert(colStat == emptyColStat)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index afeb478..f355a52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -123,7 +123,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
.toDF().createOrReplaceTempView("sizeTst")
spark.catalog.cacheTable("sizeTst")
assert(
- spark.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes >
+ spark.table("sizeTst").queryExecution.analyzed.stats(sqlConf).sizeInBytes >
spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index 89d5765..7679e85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -36,7 +36,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
})
val totalSize = allFiles.map(_.length()).sum
val df = spark.read.parquet(dir.toString)
- assert(df.queryExecution.logical.statistics.sizeInBytes === BigInt(totalSize))
+ assert(df.queryExecution.logical.stats(sqlConf).sizeInBytes === BigInt(totalSize))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
index 8f23f98..24a7b77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
@@ -216,13 +216,15 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
// Before adding data, check output
checkAnswer(sink.allData, Seq.empty)
- assert(plan.statistics.sizeInBytes === 0)
+ assert(plan.stats(sqlConf).sizeInBytes === 0)
sink.addBatch(0, 1 to 3)
- assert(plan.statistics.sizeInBytes === 12)
+ plan.invalidateStatsCache()
+ assert(plan.stats(sqlConf).sizeInBytes === 12)
sink.addBatch(1, 4 to 6)
- assert(plan.statistics.sizeInBytes === 24)
+ plan.invalidateStatsCache()
+ assert(plan.stats(sqlConf).sizeInBytes === 24)
}
ignore("stress test") {
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
deleted file mode 100644
index 78f2ce1..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.statsEstimation
-
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.IntegerType
-
-
-class StatsEstimationSuite extends SharedSQLContext {
- test("statistics for a plan based on the cbo switch") {
- val expectedDefaultStats =
- Statistics(
- sizeInBytes = 40,
- rowCount = Some(10),
- attributeStats = AttributeMap(Seq(
- AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))),
- isBroadcastable = false)
- val expectedCboStats =
- Statistics(
- sizeInBytes = 4,
- rowCount = Some(1),
- attributeStats = AttributeMap(Seq(
- AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))),
- isBroadcastable = false)
-
- val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
- withSQLConf("spark.sql.cbo.enabled" -> "true") {
- // Use the statistics estimated by cbo
- assert(plan.planStats(spark.sessionState.conf) == expectedCboStats)
- }
- withSQLConf("spark.sql.cbo.enabled" -> "false") {
- // Use the default statistics
- assert(plan.planStats(spark.sessionState.conf) == expectedDefaultStats)
- }
- }
-}
-
-/**
- * This class is used for unit-testing the cbo switch, it mimics a logical plan which has both
- * default statistics and cbo estimated statistics.
- */
-private case class DummyLogicalPlan(
- defaultStats: Statistics,
- cboStats: Statistics) extends LogicalPlan {
- override lazy val statistics = defaultStats
- override def cboStatistics(conf: CatalystConf): Statistics = cboStats
- override def output: Seq[Attribute] = Nil
- override def children: Seq[LogicalPlan] = Nil
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala
index 0cfe260..f9b3ff8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext, SQLImplicits}
+import org.apache.spark.sql.internal.SQLConf
/**
* A collection of sample data used in SQL tests.
@@ -28,6 +29,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext, SQLImplicits}
private[sql] trait SQLTestData { self =>
protected def spark: SparkSession
+ protected def sqlConf: SQLConf = spark.sessionState.conf
+
// Helper object to import SQL implicits without a concrete SQLContext
private object internalImplicits extends SQLImplicits {
protected override def _sqlContext: SQLContext = self.spark.sqlContext
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 2239f10..36dc236 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.internal.SQLConf
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0407cf6..ee4589f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -232,7 +232,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSchema))
val logicalRelation = cached.getOrElse {
- val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
+ val sizeInBytes =
+ metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
val fileCatalog = {
val catalog = new CatalogFileIndex(
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 2e60cba..7254f73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
@@ -112,7 +113,7 @@ private[hive] case class MetastoreRelation(
new HiveTable(tTable)
}
- @transient override lazy val statistics: Statistics = {
+ @transient override def computeStats(conf: CatalystConf): Statistics = {
catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6155135/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index b040f26..0053aa1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -69,7 +69,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0")
assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
- val sizeInBytes = relation.statistics.sizeInBytes
+ val sizeInBytes = relation.stats(conf).sizeInBytes
assert(sizeInBytes === BigInt(file1.length() + file2.length()))
}
} finally {
@@ -80,7 +80,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
+ spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).stats(conf).sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -481,7 +481,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("estimates the size of a test MetastoreRelation") {
val df = sql("""SELECT * FROM src""")
val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
- mr.statistics.sizeInBytes
+ mr.stats(conf).sizeInBytes
}
assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes(0).equals(BigInt(5812)),
@@ -501,7 +501,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// Assert src has a size smaller than the threshold.
val sizes = df.queryExecution.analyzed.collect {
- case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
+ case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.stats(conf).sizeInBytes
}
assert(sizes.size === 2 && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold
&& sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
@@ -557,7 +557,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val sizes = df.queryExecution.analyzed.collect {
case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass
.isAssignableFrom(r.getClass) =>
- r.statistics.sizeInBytes
+ r.stats(conf).sizeInBytes
}
assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
&& sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org